1
1
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

consolidate sender

This commit is contained in:
Zack Scholl 2019-04-09 15:21:54 -07:00
parent a185c1fd68
commit c615168fc2
7 changed files with 213 additions and 228 deletions

View file

@ -35,7 +35,7 @@ func main() {
// PathToFile: "../wskeystore/README.md", // PathToFile: "../wskeystore/README.md",
// PathToFile: "./src/croc/croc.go", // PathToFile: "./src/croc/croc.go",
// PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"}, // PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"},
PathToFiles: []string{"100mb.file"}, PathToFiles: []string{"croc.exe","croc2.exe"},
KeepPathInRemote: false, KeepPathInRemote: false,
}) })
} else { } else {

View file

@ -1,8 +0,0 @@
package sender
import "io"
// SDPProvider returns the underlying SDPProvider
func (s *Session) SDPProvider() io.Reader {
return s.sess.SDPProvider()
}

View file

@ -1,67 +0,0 @@
package sender
import (
"github.com/pion/webrtc/v2"
)
const (
bufferThreshold = 512 * 1024 // 512kB
)
// // Initialize creates the connection, the datachannel and creates the offer
// func (s *Session) Initialize() error {
// if s.initialized {
// return nil
// }
// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.createDataChannel(); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.sess.CreateOffer(); err != nil {
// log.Errorln(err)
// return err
// }
// s.initialized = true
// return nil
// }
// // Start the connection and the file transfer
// func (s *Session) Start() error {
// if err := s.Initialize(); err != nil {
// return err
// }
// go s.readFile()
// if err := s.sess.ReadSDP(); err != nil {
// log.Errorln(err)
// return err
// }
// <-s.sess.Done
// s.sess.OnCompletion()
// return nil
// }
func (s *Session) CreateDataChannel() error {
ordered := true
maxPacketLifeTime := uint16(10000)
dataChannel, err := s.sess.CreateDataChannel(&webrtc.DataChannelInit{
Ordered: &ordered,
MaxPacketLifeTime: &maxPacketLifeTime,
})
if err != nil {
return err
}
s.dataChannel = dataChannel
s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow())
s.dataChannel.SetBufferedAmountLowThreshold(bufferThreshold)
s.dataChannel.OnOpen(s.onOpenHandler())
s.dataChannel.OnClose(s.onCloseHandler())
return nil
}

View file

@ -1,87 +0,0 @@
package sender
import (
"fmt"
"io"
"os"
log "github.com/sirupsen/logrus"
)
func (s *Session) readFile(pathToFile string) error {
f, err := os.Open(pathToFile)
if err != nil {
return err
}
log.Infof("Starting to read data...")
s.readingStats.Start()
defer func() {
f.Close()
s.readingStats.Pause()
log.Infof("Stopped reading data...")
close(s.output)
}()
for {
// Read file
s.dataBuff = s.dataBuff[:cap(s.dataBuff)]
n, err := f.Read(s.dataBuff)
if err != nil {
if err == io.EOF {
s.readingStats.Stop()
log.Debugf("Got EOF after %v bytes!\n", s.readingStats.Bytes())
return nil
}
log.Errorf("Read Error: %v\n", err)
return err
}
s.dataBuff = s.dataBuff[:n]
s.readingStats.AddBytes(uint64(n))
s.output <- outputMsg{
n: n,
// Make a copy of the buffer
buff: append([]byte(nil), s.dataBuff...),
}
}
return nil
}
func (s *Session) onBufferedAmountLow() func() {
return func() {
data := <-s.output
if data.n != 0 {
s.msgToBeSent = append(s.msgToBeSent, data)
} else if len(s.msgToBeSent) == 0 && s.dataChannel.BufferedAmount() == 0 {
s.sess.NetworkStats.Stop()
s.close(false)
return
}
currentSpeed := s.sess.NetworkStats.Bandwidth()
fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed)
for len(s.msgToBeSent) != 0 {
cur := s.msgToBeSent[0]
if err := s.dataChannel.Send(cur.buff); err != nil {
log.Errorf("Error, cannot send to client: %v\n", err)
return
}
s.sess.NetworkStats.AddBytes(uint64(cur.n))
s.msgToBeSent = s.msgToBeSent[1:]
}
}
}
func (s *Session) writeToNetwork() {
// Set callback, as transfer may be paused
fmt.Println("\nwriting")
s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow())
fmt.Println("\ndone")
<-s.stopSending
fmt.Println("\nstopped sending")
s.dataChannel.OnBufferedAmountLow(nil)
log.Infof("Pausing network I/O... (remaining at least %v packets)\n", len(s.output))
s.sess.NetworkStats.Pause()
}

View file

@ -1,19 +1,33 @@
package sender package sender
import ( import (
"fmt"
"io"
"os"
"sync" "sync"
colorable "github.com/mattn/go-colorable"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
internalSess "github.com/schollz/croc/v5/internal/session" internalSess "github.com/schollz/croc/v5/internal/session"
"github.com/schollz/croc/v5/pkg/session/common" "github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/pkg/stats" "github.com/schollz/croc/v5/pkg/stats"
"github.com/sirupsen/logrus"
) )
const ( const (
// Must be <= 16384 // Must be <= 16384
senderBuffSize = 16384 senderBuffSize = 16384
bufferThreshold = 512 * 1024 // 512kB
) )
var log = logrus.New()
func init() {
log.SetFormatter(&logrus.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout())
log.SetLevel(logrus.DebugLevel)
}
type outputMsg struct { type outputMsg struct {
n int n int
buff []byte buff []byte
@ -81,3 +95,198 @@ func (s *Session) TransferFile(pathToFile string) {
s.readFile(pathToFile) s.readFile(pathToFile)
s.sess.OnCompletion() s.sess.OnCompletion()
} }
// SDPProvider returns the underlying SDPProvider
func (s *Session) SDPProvider() io.Reader {
return s.sess.SDPProvider()
}
// // Initialize creates the connection, the datachannel and creates the offer
// func (s *Session) Initialize() error {
// if s.initialized {
// return nil
// }
// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.createDataChannel(); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.sess.CreateOffer(); err != nil {
// log.Errorln(err)
// return err
// }
// s.initialized = true
// return nil
// }
// // Start the connection and the file transfer
// func (s *Session) Start() error {
// if err := s.Initialize(); err != nil {
// return err
// }
// go s.readFile()
// if err := s.sess.ReadSDP(); err != nil {
// log.Errorln(err)
// return err
// }
// <-s.sess.Done
// s.sess.OnCompletion()
// return nil
// }
func (s *Session) CreateDataChannel() error {
ordered := true
maxPacketLifeTime := uint16(10000)
dataChannel, err := s.sess.CreateDataChannel(&webrtc.DataChannelInit{
Ordered: &ordered,
MaxPacketLifeTime: &maxPacketLifeTime,
})
if err != nil {
return err
}
s.dataChannel = dataChannel
s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow())
s.dataChannel.SetBufferedAmountLowThreshold(bufferThreshold)
s.dataChannel.OnOpen(s.onOpenHandler())
s.dataChannel.OnClose(s.onCloseHandler())
return nil
}
func (s *Session) readFile(pathToFile string) error {
f, err := os.Open(pathToFile)
if err != nil {
return err
}
log.Infof("Starting to read data...")
s.readingStats.Start()
defer func() {
f.Close()
s.readingStats.Pause()
log.Infof("Stopped reading data...")
close(s.output)
}()
for {
// Read file
s.dataBuff = s.dataBuff[:cap(s.dataBuff)]
n, err := f.Read(s.dataBuff)
if err != nil {
if err == io.EOF {
s.readingStats.Stop()
log.Debugf("Got EOF after %v bytes!\n", s.readingStats.Bytes())
return nil
}
log.Errorf("Read Error: %v\n", err)
return err
}
s.dataBuff = s.dataBuff[:n]
s.readingStats.AddBytes(uint64(n))
s.output <- outputMsg{
n: n,
// Make a copy of the buffer
buff: append([]byte(nil), s.dataBuff...),
}
}
return nil
}
func (s *Session) onBufferedAmountLow() func() {
return func() {
data := <-s.output
if data.n != 0 {
s.msgToBeSent = append(s.msgToBeSent, data)
} else if len(s.msgToBeSent) == 0 && s.dataChannel.BufferedAmount() == 0 {
s.sess.NetworkStats.Stop()
s.close(false)
return
}
currentSpeed := s.sess.NetworkStats.Bandwidth()
fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed)
for len(s.msgToBeSent) != 0 {
cur := s.msgToBeSent[0]
if err := s.dataChannel.Send(cur.buff); err != nil {
log.Errorf("Error, cannot send to client: %v\n", err)
return
}
s.sess.NetworkStats.AddBytes(uint64(cur.n))
s.msgToBeSent = s.msgToBeSent[1:]
}
}
}
func (s *Session) writeToNetwork() {
// Set callback, as transfer may be paused
fmt.Println("\nwriting")
s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow())
fmt.Println("\ndone")
<-s.stopSending
fmt.Println("\nstopped sending")
s.dataChannel.OnBufferedAmountLow(nil)
log.Infof("Pausing network I/O... (remaining at least %v packets)\n", len(s.output))
s.sess.NetworkStats.Pause()
}
func (s *Session) StopSending() {
s.stopSending <- struct{}{}
}
func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) {
return func(connectionState webrtc.ICEConnectionState) {
log.Infof("ICE Connection State has changed: %s\n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateDisconnected {
s.StopSending()
}
}
}
func (s *Session) onOpenHandler() func() {
return func() {
s.sess.NetworkStats.Start()
log.Infof("Starting to send data...")
defer log.Infof("Stopped sending data...")
s.writeToNetwork()
}
}
func (s *Session) onCloseHandler() func() {
return func() {
s.close(true)
}
}
func (s *Session) close(calledFromCloseHandler bool) {
if !calledFromCloseHandler {
s.dataChannel.Close()
}
// Sometime, onCloseHandler is not invoked, so it's a work-around
s.doneCheckLock.Lock()
if s.doneCheck {
s.doneCheckLock.Unlock()
return
}
s.doneCheck = true
s.doneCheckLock.Unlock()
s.dumpStats()
close(s.sess.Done)
}
func (s *Session) dumpStats() {
fmt.Printf(`
Disk : %s
Network: %s
`, s.readingStats.String(), s.sess.NetworkStats.String())
}

View file

@ -1,62 +0,0 @@
package sender
import (
"fmt"
"github.com/pion/webrtc/v2"
log "github.com/sirupsen/logrus"
)
func (s *Session) StopSending() {
s.stopSending <- struct{}{}
}
func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) {
return func(connectionState webrtc.ICEConnectionState) {
log.Infof("ICE Connection State has changed: %s\n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateDisconnected {
s.StopSending()
}
}
}
func (s *Session) onOpenHandler() func() {
return func() {
s.sess.NetworkStats.Start()
log.Infof("Starting to send data...")
defer log.Infof("Stopped sending data...")
s.writeToNetwork()
}
}
func (s *Session) onCloseHandler() func() {
return func() {
s.close(true)
}
}
func (s *Session) close(calledFromCloseHandler bool) {
if !calledFromCloseHandler {
s.dataChannel.Close()
}
// Sometime, onCloseHandler is not invoked, so it's a work-around
s.doneCheckLock.Lock()
if s.doneCheck {
s.doneCheckLock.Unlock()
return
}
s.doneCheck = true
s.doneCheckLock.Unlock()
s.dumpStats()
close(s.sess.Done)
}
func (s *Session) dumpStats() {
fmt.Printf(`
Disk : %s
Network: %s
`, s.readingStats.String(), s.sess.NetworkStats.String())
}

View file

@ -27,11 +27,11 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var log = logrus.New()
const BufferSize = 4096 * 10 const BufferSize = 4096 * 10
const Channels = 1 const Channels = 1
var log = logrus.New()
func init() { func init() {
log.SetFormatter(&logrus.TextFormatter{ForceColors: true}) log.SetFormatter(&logrus.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout()) log.SetOutput(colorable.NewColorableStdout())