diff --git a/main.go b/main.go index 55033d00..75a1a5b7 100644 --- a/main.go +++ b/main.go @@ -35,7 +35,7 @@ func main() { // PathToFile: "../wskeystore/README.md", // PathToFile: "./src/croc/croc.go", // PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"}, - PathToFiles: []string{"100mb.file"}, + PathToFiles: []string{"croc.exe","croc2.exe"}, KeepPathInRemote: false, }) } else { diff --git a/pkg/session/sender/getters.go b/pkg/session/sender/getters.go deleted file mode 100644 index c9c6dad8..00000000 --- a/pkg/session/sender/getters.go +++ /dev/null @@ -1,8 +0,0 @@ -package sender - -import "io" - -// SDPProvider returns the underlying SDPProvider -func (s *Session) SDPProvider() io.Reader { - return s.sess.SDPProvider() -} diff --git a/pkg/session/sender/init.go b/pkg/session/sender/init.go deleted file mode 100644 index ca995e01..00000000 --- a/pkg/session/sender/init.go +++ /dev/null @@ -1,67 +0,0 @@ -package sender - -import ( - "github.com/pion/webrtc/v2" -) - -const ( - bufferThreshold = 512 * 1024 // 512kB -) - -// // Initialize creates the connection, the datachannel and creates the offer -// func (s *Session) Initialize() error { -// if s.initialized { -// return nil -// } - -// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { -// log.Errorln(err) -// return err -// } -// if err := s.createDataChannel(); err != nil { -// log.Errorln(err) -// return err -// } -// if err := s.sess.CreateOffer(); err != nil { -// log.Errorln(err) -// return err -// } - -// s.initialized = true -// return nil -// } - -// // Start the connection and the file transfer -// func (s *Session) Start() error { -// if err := s.Initialize(); err != nil { -// return err -// } -// go s.readFile() -// if err := s.sess.ReadSDP(); err != nil { -// log.Errorln(err) -// return err -// } -// <-s.sess.Done -// s.sess.OnCompletion() -// return nil -// } - -func (s *Session) CreateDataChannel() error { - ordered := true - maxPacketLifeTime := uint16(10000) - dataChannel, err := s.sess.CreateDataChannel(&webrtc.DataChannelInit{ - Ordered: &ordered, - MaxPacketLifeTime: &maxPacketLifeTime, - }) - if err != nil { - return err - } - - s.dataChannel = dataChannel - s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow()) - s.dataChannel.SetBufferedAmountLowThreshold(bufferThreshold) - s.dataChannel.OnOpen(s.onOpenHandler()) - s.dataChannel.OnClose(s.onCloseHandler()) - - return nil -} diff --git a/pkg/session/sender/io.go b/pkg/session/sender/io.go deleted file mode 100644 index d30e62e8..00000000 --- a/pkg/session/sender/io.go +++ /dev/null @@ -1,87 +0,0 @@ -package sender - -import ( - "fmt" - "io" - "os" - - log "github.com/sirupsen/logrus" -) - -func (s *Session) readFile(pathToFile string) error { - f, err := os.Open(pathToFile) - if err != nil { - return err - } - log.Infof("Starting to read data...") - s.readingStats.Start() - defer func() { - f.Close() - s.readingStats.Pause() - log.Infof("Stopped reading data...") - close(s.output) - }() - - for { - // Read file - s.dataBuff = s.dataBuff[:cap(s.dataBuff)] - n, err := f.Read(s.dataBuff) - if err != nil { - if err == io.EOF { - s.readingStats.Stop() - log.Debugf("Got EOF after %v bytes!\n", s.readingStats.Bytes()) - return nil - } - log.Errorf("Read Error: %v\n", err) - return err - } - s.dataBuff = s.dataBuff[:n] - s.readingStats.AddBytes(uint64(n)) - - s.output <- outputMsg{ - n: n, - // Make a copy of the buffer - buff: append([]byte(nil), s.dataBuff...), - } - } - return nil -} - -func (s *Session) onBufferedAmountLow() func() { - return func() { - data := <-s.output - if data.n != 0 { - s.msgToBeSent = append(s.msgToBeSent, data) - } else if len(s.msgToBeSent) == 0 && s.dataChannel.BufferedAmount() == 0 { - s.sess.NetworkStats.Stop() - s.close(false) - return - } - - currentSpeed := s.sess.NetworkStats.Bandwidth() - fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed) - - for len(s.msgToBeSent) != 0 { - cur := s.msgToBeSent[0] - - if err := s.dataChannel.Send(cur.buff); err != nil { - log.Errorf("Error, cannot send to client: %v\n", err) - return - } - s.sess.NetworkStats.AddBytes(uint64(cur.n)) - s.msgToBeSent = s.msgToBeSent[1:] - } - } -} - -func (s *Session) writeToNetwork() { - // Set callback, as transfer may be paused - fmt.Println("\nwriting") - s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow()) - fmt.Println("\ndone") - <-s.stopSending - fmt.Println("\nstopped sending") - s.dataChannel.OnBufferedAmountLow(nil) - log.Infof("Pausing network I/O... (remaining at least %v packets)\n", len(s.output)) - s.sess.NetworkStats.Pause() -} diff --git a/pkg/session/sender/sender.go b/pkg/session/sender/sender.go index 04113a5f..9b046cf8 100644 --- a/pkg/session/sender/sender.go +++ b/pkg/session/sender/sender.go @@ -1,19 +1,33 @@ package sender import ( + "fmt" + "io" + "os" "sync" + colorable "github.com/mattn/go-colorable" "github.com/pion/webrtc/v2" internalSess "github.com/schollz/croc/v5/internal/session" "github.com/schollz/croc/v5/pkg/session/common" "github.com/schollz/croc/v5/pkg/stats" + "github.com/sirupsen/logrus" ) const ( // Must be <= 16384 - senderBuffSize = 16384 + senderBuffSize = 16384 + bufferThreshold = 512 * 1024 // 512kB ) +var log = logrus.New() + +func init() { + log.SetFormatter(&logrus.TextFormatter{ForceColors: true}) + log.SetOutput(colorable.NewColorableStdout()) + log.SetLevel(logrus.DebugLevel) +} + type outputMsg struct { n int buff []byte @@ -81,3 +95,198 @@ func (s *Session) TransferFile(pathToFile string) { s.readFile(pathToFile) s.sess.OnCompletion() } + +// SDPProvider returns the underlying SDPProvider +func (s *Session) SDPProvider() io.Reader { + return s.sess.SDPProvider() +} + +// // Initialize creates the connection, the datachannel and creates the offer +// func (s *Session) Initialize() error { +// if s.initialized { +// return nil +// } + +// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { +// log.Errorln(err) +// return err +// } +// if err := s.createDataChannel(); err != nil { +// log.Errorln(err) +// return err +// } +// if err := s.sess.CreateOffer(); err != nil { +// log.Errorln(err) +// return err +// } + +// s.initialized = true +// return nil +// } + +// // Start the connection and the file transfer +// func (s *Session) Start() error { +// if err := s.Initialize(); err != nil { +// return err +// } +// go s.readFile() +// if err := s.sess.ReadSDP(); err != nil { +// log.Errorln(err) +// return err +// } +// <-s.sess.Done +// s.sess.OnCompletion() +// return nil +// } + +func (s *Session) CreateDataChannel() error { + ordered := true + maxPacketLifeTime := uint16(10000) + dataChannel, err := s.sess.CreateDataChannel(&webrtc.DataChannelInit{ + Ordered: &ordered, + MaxPacketLifeTime: &maxPacketLifeTime, + }) + if err != nil { + return err + } + + s.dataChannel = dataChannel + s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow()) + s.dataChannel.SetBufferedAmountLowThreshold(bufferThreshold) + s.dataChannel.OnOpen(s.onOpenHandler()) + s.dataChannel.OnClose(s.onCloseHandler()) + + return nil +} + +func (s *Session) readFile(pathToFile string) error { + f, err := os.Open(pathToFile) + if err != nil { + return err + } + log.Infof("Starting to read data...") + s.readingStats.Start() + defer func() { + f.Close() + s.readingStats.Pause() + log.Infof("Stopped reading data...") + close(s.output) + }() + + for { + // Read file + s.dataBuff = s.dataBuff[:cap(s.dataBuff)] + n, err := f.Read(s.dataBuff) + if err != nil { + if err == io.EOF { + s.readingStats.Stop() + log.Debugf("Got EOF after %v bytes!\n", s.readingStats.Bytes()) + return nil + } + log.Errorf("Read Error: %v\n", err) + return err + } + s.dataBuff = s.dataBuff[:n] + s.readingStats.AddBytes(uint64(n)) + + s.output <- outputMsg{ + n: n, + // Make a copy of the buffer + buff: append([]byte(nil), s.dataBuff...), + } + } + return nil +} + +func (s *Session) onBufferedAmountLow() func() { + return func() { + data := <-s.output + if data.n != 0 { + s.msgToBeSent = append(s.msgToBeSent, data) + } else if len(s.msgToBeSent) == 0 && s.dataChannel.BufferedAmount() == 0 { + s.sess.NetworkStats.Stop() + s.close(false) + return + } + + currentSpeed := s.sess.NetworkStats.Bandwidth() + fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed) + + for len(s.msgToBeSent) != 0 { + cur := s.msgToBeSent[0] + + if err := s.dataChannel.Send(cur.buff); err != nil { + log.Errorf("Error, cannot send to client: %v\n", err) + return + } + s.sess.NetworkStats.AddBytes(uint64(cur.n)) + s.msgToBeSent = s.msgToBeSent[1:] + } + } +} + +func (s *Session) writeToNetwork() { + // Set callback, as transfer may be paused + fmt.Println("\nwriting") + s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow()) + fmt.Println("\ndone") + <-s.stopSending + fmt.Println("\nstopped sending") + s.dataChannel.OnBufferedAmountLow(nil) + log.Infof("Pausing network I/O... (remaining at least %v packets)\n", len(s.output)) + s.sess.NetworkStats.Pause() +} + +func (s *Session) StopSending() { + s.stopSending <- struct{}{} +} + +func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) { + return func(connectionState webrtc.ICEConnectionState) { + log.Infof("ICE Connection State has changed: %s\n", connectionState.String()) + if connectionState == webrtc.ICEConnectionStateDisconnected { + s.StopSending() + } + } +} + +func (s *Session) onOpenHandler() func() { + return func() { + s.sess.NetworkStats.Start() + + log.Infof("Starting to send data...") + defer log.Infof("Stopped sending data...") + + s.writeToNetwork() + } +} + +func (s *Session) onCloseHandler() func() { + return func() { + s.close(true) + } +} + +func (s *Session) close(calledFromCloseHandler bool) { + if !calledFromCloseHandler { + s.dataChannel.Close() + } + + // Sometime, onCloseHandler is not invoked, so it's a work-around + s.doneCheckLock.Lock() + if s.doneCheck { + s.doneCheckLock.Unlock() + return + } + s.doneCheck = true + s.doneCheckLock.Unlock() + s.dumpStats() + close(s.sess.Done) +} + +func (s *Session) dumpStats() { + fmt.Printf(` +Disk : %s +Network: %s +`, s.readingStats.String(), s.sess.NetworkStats.String()) +} diff --git a/pkg/session/sender/state.go b/pkg/session/sender/state.go deleted file mode 100644 index cac51287..00000000 --- a/pkg/session/sender/state.go +++ /dev/null @@ -1,62 +0,0 @@ -package sender - -import ( - "fmt" - - "github.com/pion/webrtc/v2" - log "github.com/sirupsen/logrus" -) - -func (s *Session) StopSending() { - s.stopSending <- struct{}{} -} - -func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) { - return func(connectionState webrtc.ICEConnectionState) { - log.Infof("ICE Connection State has changed: %s\n", connectionState.String()) - if connectionState == webrtc.ICEConnectionStateDisconnected { - s.StopSending() - } - } -} - -func (s *Session) onOpenHandler() func() { - return func() { - s.sess.NetworkStats.Start() - - log.Infof("Starting to send data...") - defer log.Infof("Stopped sending data...") - - s.writeToNetwork() - } -} - -func (s *Session) onCloseHandler() func() { - return func() { - s.close(true) - } -} - -func (s *Session) close(calledFromCloseHandler bool) { - if !calledFromCloseHandler { - s.dataChannel.Close() - } - - // Sometime, onCloseHandler is not invoked, so it's a work-around - s.doneCheckLock.Lock() - if s.doneCheck { - s.doneCheckLock.Unlock() - return - } - s.doneCheck = true - s.doneCheckLock.Unlock() - s.dumpStats() - close(s.sess.Done) -} - -func (s *Session) dumpStats() { - fmt.Printf(` -Disk : %s -Network: %s -`, s.readingStats.String(), s.sess.NetworkStats.String()) -} diff --git a/src/croc/croc.go b/src/croc/croc.go index d2f99a34..62b76a03 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -27,11 +27,11 @@ import ( "github.com/sirupsen/logrus" ) -var log = logrus.New() - const BufferSize = 4096 * 10 const Channels = 1 +var log = logrus.New() + func init() { log.SetFormatter(&logrus.TextFormatter{ForceColors: true}) log.SetOutput(colorable.NewColorableStdout())