diff --git a/internal/session/getters.go b/internal/session/getters.go deleted file mode 100644 index 435e5371..00000000 --- a/internal/session/getters.go +++ /dev/null @@ -1,8 +0,0 @@ -package session - -import "io" - -// SDPProvider returns the SDP input -func (s *Session) SDPProvider() io.Reader { - return s.sdpInput -} diff --git a/internal/session/session.go b/internal/session/session.go index 0c2899a3..cf849d3f 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -10,6 +10,11 @@ import ( "github.com/schollz/croc/v5/pkg/utils" ) +// SDPProvider returns the SDP input +func (s *Session) SDPProvider() io.Reader { + return s.sdpInput +} + // CompletionHandler to be called when transfer is done type CompletionHandler func() diff --git a/pkg/session/receiver/init.go b/pkg/session/receiver/init.go deleted file mode 100644 index 545a1312..00000000 --- a/pkg/session/receiver/init.go +++ /dev/null @@ -1,103 +0,0 @@ -package receiver - -import ( - "fmt" - "os" - - "github.com/pion/webrtc/v2" - log "github.com/sirupsen/logrus" -) - -// // Initialize creates the connection, the datachannel and creates the offer -// func (s *Session) Initialize() error { -// if s.initialized { -// return nil -// } -// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { -// log.Errorln(err) -// return err -// } -// s.createDataHandler() -// if err := s.sess.ReadSDP(); err != nil { -// log.Errorln(err) -// return err -// } -// if err := s.sess.CreateAnswer(); err != nil { -// log.Errorln(err) -// return err -// } - -// s.initialized = true -// return nil -// } - -// // Start initializes the connection and the file transfer -// func (s *Session) Start() error { -// if err := s.Initialize(); err != nil { -// return err -// } - -// // Handle data -// s.receiveData() -// s.sess.OnCompletion() -// return nil -// } - -func (s *Session) CreateDataHandler() { - s.sess.OnDataChannel(func(d *webrtc.DataChannel) { - log.Debugf("New DataChannel %s %d\n", d.Label(), d.ID()) - s.sess.NetworkStats.Start() - d.OnMessage(s.onMessage()) - d.OnClose(s.onClose()) - }) -} - -func (s *Session) ReceiveData(pathToFile string) { - s.receiveData(pathToFile) - s.sess.OnCompletion() -} - -func (s *Session) receiveData(pathToFile string) error { - log.Infoln("Starting to receive data...") - f, err := os.OpenFile(pathToFile, os.O_RDWR|os.O_CREATE, 0755) - if err != nil { - return err - } - defer func() { - log.Infoln("Stopped receiving data...") - f.Close() - }() - // Consume the message channel, until done - // Does not stop on error - for { - select { - case <-s.sess.Done: - s.sess.NetworkStats.Stop() - fmt.Printf("\nNetwork: %s\n", s.sess.NetworkStats.String()) - return nil - case msg := <-s.msgChannel: - n, err := f.Write(msg.Data) - - if err != nil { - return err - } else { - currentSpeed := s.sess.NetworkStats.Bandwidth() - fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed) - s.sess.NetworkStats.AddBytes(uint64(n)) - } - } - } - return nil -} - -func (s *Session) CreateConnection() (err error) { - return s.sess.CreateConnection(s.onConnectionStateChange()) -} - -func (s *Session) SetSDP(sdp string) error { - return s.sess.SetSDP(sdp) -} - -func (s *Session) CreateAnswer() (string, error) { - return s.sess.CreateAnswer() -} diff --git a/pkg/session/receiver/receiver.go b/pkg/session/receiver/receiver.go index 093ed250..e1b04717 100644 --- a/pkg/session/receiver/receiver.go +++ b/pkg/session/receiver/receiver.go @@ -1,11 +1,14 @@ package receiver import ( + "fmt" "io" + "os" "github.com/pion/webrtc/v2" internalSess "github.com/schollz/croc/v5/internal/session" "github.com/schollz/croc/v5/pkg/session/common" + log "github.com/sirupsen/logrus" ) // Session is a receiver session @@ -38,3 +41,116 @@ type Config struct { func NewWith(c Config) *Session { return new(internalSess.New(c.SDPProvider, c.SDPOutput)) } + +func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) { + return func(connectionState webrtc.ICEConnectionState) { + log.Infof("ICE Connection State has changed: %s\n", connectionState.String()) + } +} + +func (s *Session) onMessage() func(msg webrtc.DataChannelMessage) { + return func(msg webrtc.DataChannelMessage) { + // Store each message in the message channel + s.msgChannel <- msg + } +} + +func (s *Session) onClose() func() { + return func() { + close(s.sess.Done) + } +} + +// // Initialize creates the connection, the datachannel and creates the offer +// func (s *Session) Initialize() error { +// if s.initialized { +// return nil +// } +// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { +// log.Errorln(err) +// return err +// } +// s.createDataHandler() +// if err := s.sess.ReadSDP(); err != nil { +// log.Errorln(err) +// return err +// } +// if err := s.sess.CreateAnswer(); err != nil { +// log.Errorln(err) +// return err +// } + +// s.initialized = true +// return nil +// } + +// // Start initializes the connection and the file transfer +// func (s *Session) Start() error { +// if err := s.Initialize(); err != nil { +// return err +// } + +// // Handle data +// s.receiveData() +// s.sess.OnCompletion() +// return nil +// } + +func (s *Session) CreateDataHandler() { + s.sess.OnDataChannel(func(d *webrtc.DataChannel) { + log.Debugf("New DataChannel %s %d\n", d.Label(), d.ID()) + s.sess.NetworkStats.Start() + d.OnMessage(s.onMessage()) + d.OnClose(s.onClose()) + }) +} + +func (s *Session) ReceiveData(pathToFile string) { + s.receiveData(pathToFile) + s.sess.OnCompletion() +} + +func (s *Session) receiveData(pathToFile string) error { + log.Infoln("Starting to receive data...") + f, err := os.OpenFile(pathToFile, os.O_RDWR|os.O_CREATE, 0755) + if err != nil { + return err + } + defer func() { + log.Infoln("Stopped receiving data...") + f.Close() + }() + // Consume the message channel, until done + // Does not stop on error + for { + select { + case <-s.sess.Done: + s.sess.NetworkStats.Stop() + fmt.Printf("\nNetwork: %s\n", s.sess.NetworkStats.String()) + return nil + case msg := <-s.msgChannel: + n, err := f.Write(msg.Data) + + if err != nil { + return err + } else { + currentSpeed := s.sess.NetworkStats.Bandwidth() + fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed) + s.sess.NetworkStats.AddBytes(uint64(n)) + } + } + } + return nil +} + +func (s *Session) CreateConnection() (err error) { + return s.sess.CreateConnection(s.onConnectionStateChange()) +} + +func (s *Session) SetSDP(sdp string) error { + return s.sess.SetSDP(sdp) +} + +func (s *Session) CreateAnswer() (string, error) { + return s.sess.CreateAnswer() +} diff --git a/pkg/session/receiver/state.go b/pkg/session/receiver/state.go deleted file mode 100644 index 99257f63..00000000 --- a/pkg/session/receiver/state.go +++ /dev/null @@ -1,25 +0,0 @@ -package receiver - -import ( - "github.com/pion/webrtc/v2" - log "github.com/sirupsen/logrus" -) - -func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) { - return func(connectionState webrtc.ICEConnectionState) { - log.Infof("ICE Connection State has changed: %s\n", connectionState.String()) - } -} - -func (s *Session) onMessage() func(msg webrtc.DataChannelMessage) { - return func(msg webrtc.DataChannelMessage) { - // Store each message in the message channel - s.msgChannel <- msg - } -} - -func (s *Session) onClose() func() { - return func() { - close(s.sess.Done) - } -}