diff --git a/cmd/bench/cmd.go b/cmd/bench/cmd.go index 00eeca2b..93dddd35 100644 --- a/cmd/bench/cmd.go +++ b/cmd/bench/cmd.go @@ -1,8 +1,8 @@ package bench import ( - "github.com/schollz/croc/pkg/session/bench" - "github.com/schollz/croc/pkg/session/common" + "github.com/schollz/croc/v5/pkg/session/bench" + "github.com/schollz/croc/v5/pkg/session/common" log "github.com/sirupsen/logrus" "gopkg.in/urfave/cli.v1" ) diff --git a/cmd/receive/cmd.go b/cmd/receive/cmd.go index 15ac7883..63af6486 100644 --- a/cmd/receive/cmd.go +++ b/cmd/receive/cmd.go @@ -6,7 +6,7 @@ import ( log "github.com/sirupsen/logrus" - "github.com/schollz/croc/pkg/session/receiver" + "github.com/schollz/croc/v5/pkg/session/receiver" "gopkg.in/urfave/cli.v1" ) diff --git a/cmd/send/cmd.go b/cmd/send/cmd.go index 48e07ee7..b15ecd09 100644 --- a/cmd/send/cmd.go +++ b/cmd/send/cmd.go @@ -4,8 +4,8 @@ import ( "fmt" "os" - "github.com/schollz/croc/pkg/session/common" - "github.com/schollz/croc/pkg/session/sender" + "github.com/schollz/croc/v5/pkg/session/common" + "github.com/schollz/croc/v5/pkg/session/sender" log "github.com/sirupsen/logrus" "gopkg.in/urfave/cli.v1" ) diff --git a/go.mod b/go.mod index 4a39e839..0431868b 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/go-redis/redis v6.15.2+incompatible github.com/mattn/go-colorable v0.1.1 + github.com/pion/webrtc/v2 v2.0.2 github.com/pions/webrtc v1.2.1-0.20190404195536-1202dbaa06ad github.com/schollz/mnemonicode v1.0.1 github.com/schollz/pake v1.1.0 diff --git a/go.sum b/go.sum index 5573b231..fbbff06d 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,32 @@ github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pion/datachannel v1.3.0 h1:gxt/xGufDn8Yylk0uJB231xbGQVlFjVps+KdUAUl5Ls= +github.com/pion/datachannel v1.3.0/go.mod h1:lxFbZLIT+EBPmy5AiCv8M0CXkcuTL53A4cyagZiRrDo= +github.com/pion/dtls v1.3.0 h1:5jcC5bBzRcLfxmUH60zp/slIe/tjCLmz6AUZagPYmhA= +github.com/pion/dtls v1.3.0/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk= +github.com/pion/ice v0.2.1 h1:DhYn8s52H54SBbS5qu3XoGvTfseU47pe15yV3udNpww= +github.com/pion/ice v0.2.1/go.mod h1:igvbO76UeYthbSu0UsUTqjyWpFT3diUmM+x2vt4p4fw= +github.com/pion/logging v0.2.1-0.20190404202522-3c79a8accd0a/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/logging v0.2.1 h1:LwASkBKZ+2ysGJ+jLv1E/9H1ge0k1nTfi1X+5zirkDk= +github.com/pion/logging v0.2.1/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/quic v0.1.1/go.mod h1:zEU51v7ru8Mp4AUBJvj6psrSth5eEFNnVQK5K48oV3k= +github.com/pion/rtcp v1.1.5 h1:UO4u+U3IYVzA1tWCNrR+hUo02tpOrn4elwZ9pQzBVKo= +github.com/pion/rtcp v1.1.5/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM= +github.com/pion/rtp v1.1.1 h1:lag+9/lSOLBEYeYB/28KXm/ka1H++4wkmSj/WkttV6Y= +github.com/pion/rtp v1.1.1/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE= +github.com/pion/sctp v1.5.0 h1:VcixluIP/XBKL3wRRYIzpvbkFQFVs2yUWJo1NUivy7k= +github.com/pion/sctp v1.5.0/go.mod h1:btfZTRxsoVwp7PfvorgOKqkxV/BKHGGrNf1YUKnMGRQ= +github.com/pion/sdp/v2 v2.1.1 h1:i3fAyjiLuQseYNo0BtCOPfzp91Ppb7vasRGmUUTog28= +github.com/pion/sdp/v2 v2.1.1/go.mod h1:idSlWxhfWQDtTy9J05cgxpHBu/POwXN2VDRGYxT/EjU= +github.com/pion/srtp v1.2.1 h1:t31SdcMM22MI1Slu591uhX/aVrvNSPpO0XnR62v9x7k= +github.com/pion/srtp v1.2.1/go.mod h1:clAbcxURqAYE9KrsByaBCPK7vUC553yKJ99oHnso5YY= +github.com/pion/stun v0.2.1 h1:rSKJ0ynYkRalRD8BifmkaGLeepCFuGTwG6FxPsrPK8o= +github.com/pion/stun v0.2.1/go.mod h1:TChCNKgwnFiFG/c9K+zqEdd6pO6tlODb9yN1W/zVfsE= +github.com/pion/transport v0.6.0 h1:WAoyJg/6OI8dhCVFl/0JHTMd1iu2iHgGUXevptMtJ3U= +github.com/pion/transport v0.6.0/go.mod h1:iWZ07doqOosSLMhZ+FXUTq+TamDoXSllxpbGcfkCmbE= +github.com/pion/webrtc/v2 v2.0.2 h1:tiDcCHGuZACM0hCv2oV9lLmFWw6OUmu5o/4RkDFNQ8c= +github.com/pion/webrtc/v2 v2.0.2/go.mod h1:k5JH7wA2/QjMTRb4/zxsC9psvHHVh/snXTmCrLuPRu0= github.com/pions/datachannel v1.2.1 h1:zbSwtPqVcjqUM81A3VTR7lGfeAnQgxJhheXSfBk16SM= github.com/pions/datachannel v1.2.1/go.mod h1:Vd79tKnghclBJLCYNqN0xQzw3u3P2N/7QHYoLCLh2+Q= github.com/pions/dtls v1.0.2 h1:VOIp3whGooFWS4X0RBOXaykNORYr05yjs8mlNYqRc+4= @@ -71,6 +97,7 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/schollz/croc v3.0.6+incompatible h1:rCfc8MGgcGjNW2/qSoulPh8CRGH+Ej4i3RWYOwhX9pE= github.com/schollz/mnemonicode v1.0.1 h1:LiH5hwADZwjwnfXsaD4xgnMyTAtaKHN+e5AyjRU6WSU= github.com/schollz/mnemonicode v1.0.1/go.mod h1:cl4UAOhUV0mkdjMj/QYaUZbZZdF8BnOqoz8rHMzwboY= github.com/schollz/pake v1.1.0 h1:+tYqsPVkuirFpmeRePjYTUhIHHKLufdmd7QfuspaXCk= diff --git a/internal/session/session.go b/internal/session/session.go index d3fa92c5..9e2cd675 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -1,13 +1,12 @@ package session import ( - "fmt" "io" "os" - "github.com/schollz/croc/pkg/stats" - "github.com/schollz/croc/pkg/utils" "github.com/pion/webrtc/v2" + "github.com/schollz/croc/v5/pkg/stats" + "github.com/schollz/croc/v5/pkg/utils" ) // CompletionHandler to be called when transfer is done @@ -60,19 +59,12 @@ func (s *Session) CreateConnection(onConnectionStateChange func(connectionState return nil } -// ReadSDP from the SDP input stream -func (s *Session) ReadSDP() error { +// SetSDP sets the SDP +func (s *Session) SetSDP(encoded string) error { var sdp webrtc.SessionDescription - - fmt.Println("Please, paste the remote SDP:") - for { - encoded, err := utils.MustReadStream(s.sdpInput) - if err == nil { - if err := utils.Decode(encoded, &sdp); err == nil { - break - } - } - fmt.Println("Invalid SDP, try again...") + err := utils.Decode(encoded, &sdp) + if err != nil { + return err } return s.peerConnection.SetRemoteDescription(sdp) } @@ -88,41 +80,35 @@ func (s *Session) OnDataChannel(handler func(d *webrtc.DataChannel)) { } // CreateAnswer set the local description and print the answer SDP -func (s *Session) CreateAnswer() error { +func (s *Session) CreateAnswer() (string, error) { // Create an answer answer, err := s.peerConnection.CreateAnswer(nil) if err != nil { - return err + return "", err } return s.createSessionDescription(answer) } // CreateOffer set the local description and print the offer SDP -func (s *Session) CreateOffer() error { +func (s *Session) CreateOffer() (string, error) { // Create an offer answer, err := s.peerConnection.CreateOffer(nil) if err != nil { - return err + return "", err } return s.createSessionDescription(answer) } // createSessionDescription set the local description and print the SDP -func (s *Session) createSessionDescription(desc webrtc.SessionDescription) error { +func (s *Session) createSessionDescription(desc webrtc.SessionDescription) (string, error) { // Sets the LocalDescription, and starts our UDP listeners if err := s.peerConnection.SetLocalDescription(desc); err != nil { - return err + return "", err } desc.SDP = utils.StripSDP(desc.SDP) // Output the SDP in base64 so we can paste it in browser - resp, err := utils.Encode(desc) - if err != nil { - return err - } - fmt.Println("Send this SDP:") - fmt.Fprintf(s.sdpOutput, "%s\n", resp) - return nil + return utils.Encode(desc) } // OnCompletion is called when session ends diff --git a/pkg/session/bench/benchmark.go b/pkg/session/bench/benchmark.go index ac8ff55c..56fcf294 100644 --- a/pkg/session/bench/benchmark.go +++ b/pkg/session/bench/benchmark.go @@ -4,9 +4,9 @@ import ( "sync" "time" - internalSess "github.com/schollz/croc/internal/session" - "github.com/schollz/croc/pkg/session/common" - "github.com/schollz/croc/pkg/stats" + internalSess "github.com/schollz/croc/v5/internal/session" + "github.com/schollz/croc/v5/pkg/session/common" + "github.com/schollz/croc/v5/pkg/stats" ) const ( diff --git a/pkg/session/bench/benchmark_test.go b/pkg/session/bench/benchmark_test.go index 5c027b1e..dd547f5a 100644 --- a/pkg/session/bench/benchmark_test.go +++ b/pkg/session/bench/benchmark_test.go @@ -4,9 +4,9 @@ import ( "testing" "time" - "github.com/schollz/croc/internal/buffer" - "github.com/schollz/croc/pkg/session/common" - "github.com/schollz/croc/pkg/utils" + "github.com/schollz/croc/v5/internal/buffer" + "github.com/schollz/croc/v5/pkg/session/common" + "github.com/schollz/croc/v5/pkg/utils" "github.com/stretchr/testify/assert" ) diff --git a/pkg/session/common/config.go b/pkg/session/common/config.go index 73fa077b..b85b6bf4 100644 --- a/pkg/session/common/config.go +++ b/pkg/session/common/config.go @@ -3,7 +3,7 @@ package common import ( "io" - "github.com/schollz/croc/internal/session" + "github.com/schollz/croc/v5/internal/session" ) // Configuration common to both Sender and Receiver session diff --git a/pkg/session/receiver/init.go b/pkg/session/receiver/init.go index 12beb324..5732d72b 100644 --- a/pkg/session/receiver/init.go +++ b/pkg/session/receiver/init.go @@ -7,42 +7,42 @@ import ( log "github.com/sirupsen/logrus" ) -// Initialize creates the connection, the datachannel and creates the offer -func (s *Session) Initialize() error { - if s.initialized { - return nil - } - if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { - log.Errorln(err) - return err - } - s.createDataHandler() - if err := s.sess.ReadSDP(); err != nil { - log.Errorln(err) - return err - } - if err := s.sess.CreateAnswer(); err != nil { - log.Errorln(err) - return err - } +// // Initialize creates the connection, the datachannel and creates the offer +// func (s *Session) Initialize() error { +// if s.initialized { +// return nil +// } +// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { +// log.Errorln(err) +// return err +// } +// s.createDataHandler() +// if err := s.sess.ReadSDP(); err != nil { +// log.Errorln(err) +// return err +// } +// if err := s.sess.CreateAnswer(); err != nil { +// log.Errorln(err) +// return err +// } - s.initialized = true - return nil -} +// s.initialized = true +// return nil +// } -// Start initializes the connection and the file transfer -func (s *Session) Start() error { - if err := s.Initialize(); err != nil { - return err - } +// // Start initializes the connection and the file transfer +// func (s *Session) Start() error { +// if err := s.Initialize(); err != nil { +// return err +// } - // Handle data - s.receiveData() - s.sess.OnCompletion() - return nil -} +// // Handle data +// s.receiveData() +// s.sess.OnCompletion() +// return nil +// } -func (s *Session) createDataHandler() { +func (s *Session) CreateDataHandler() { s.sess.OnDataChannel(func(d *webrtc.DataChannel) { log.Debugf("New DataChannel %s %d\n", d.Label(), d.ID()) s.sess.NetworkStats.Start() @@ -76,3 +76,15 @@ func (s *Session) receiveData() { } } } + +func (s *Session) CreateConnection() (err error) { + return s.sess.CreateConnection(s.onConnectionStateChange()) +} + +func (s *Session) SetSDP(sdp string) error { + return s.sess.SetSDP(sdp) +} + +func (s *Session) CreateAnswer() (string, error) { + return s.sess.CreateAnswer() +} diff --git a/pkg/session/receiver/receiver.go b/pkg/session/receiver/receiver.go index f4232deb..064e9f89 100644 --- a/pkg/session/receiver/receiver.go +++ b/pkg/session/receiver/receiver.go @@ -3,8 +3,8 @@ package receiver import ( "io" - internalSess "github.com/schollz/croc/internal/session" - "github.com/schollz/croc/pkg/session/common" + internalSess "github.com/schollz/croc/v5/internal/session" + "github.com/schollz/croc/v5/pkg/session/common" "github.com/pion/webrtc/v2" ) diff --git a/pkg/session/sender/init.go b/pkg/session/sender/init.go index eeebc0d1..ca995e01 100644 --- a/pkg/session/sender/init.go +++ b/pkg/session/sender/init.go @@ -2,52 +2,51 @@ package sender import ( "github.com/pion/webrtc/v2" - log "github.com/sirupsen/logrus" ) const ( bufferThreshold = 512 * 1024 // 512kB ) -// Initialize creates the connection, the datachannel and creates the offer -func (s *Session) Initialize() error { - if s.initialized { - return nil - } +// // Initialize creates the connection, the datachannel and creates the offer +// func (s *Session) Initialize() error { +// if s.initialized { +// return nil +// } - if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { - log.Errorln(err) - return err - } - if err := s.createDataChannel(); err != nil { - log.Errorln(err) - return err - } - if err := s.sess.CreateOffer(); err != nil { - log.Errorln(err) - return err - } +// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil { +// log.Errorln(err) +// return err +// } +// if err := s.createDataChannel(); err != nil { +// log.Errorln(err) +// return err +// } +// if err := s.sess.CreateOffer(); err != nil { +// log.Errorln(err) +// return err +// } - s.initialized = true - return nil -} +// s.initialized = true +// return nil +// } -// Start the connection and the file transfer -func (s *Session) Start() error { - if err := s.Initialize(); err != nil { - return err - } - go s.readFile() - if err := s.sess.ReadSDP(); err != nil { - log.Errorln(err) - return err - } - <-s.sess.Done - s.sess.OnCompletion() - return nil -} +// // Start the connection and the file transfer +// func (s *Session) Start() error { +// if err := s.Initialize(); err != nil { +// return err +// } +// go s.readFile() +// if err := s.sess.ReadSDP(); err != nil { +// log.Errorln(err) +// return err +// } +// <-s.sess.Done +// s.sess.OnCompletion() +// return nil +// } -func (s *Session) createDataChannel() error { +func (s *Session) CreateDataChannel() error { ordered := true maxPacketLifeTime := uint16(10000) dataChannel, err := s.sess.CreateDataChannel(&webrtc.DataChannelInit{ diff --git a/pkg/session/sender/sender.go b/pkg/session/sender/sender.go index 16e32772..9b4b70ab 100644 --- a/pkg/session/sender/sender.go +++ b/pkg/session/sender/sender.go @@ -5,9 +5,9 @@ import ( "sync" "github.com/pion/webrtc/v2" - internalSess "github.com/schollz/croc/internal/session" - "github.com/schollz/croc/pkg/session/common" - "github.com/schollz/croc/pkg/stats" + internalSess "github.com/schollz/croc/v5/internal/session" + "github.com/schollz/croc/v5/pkg/session/common" + "github.com/schollz/croc/v5/pkg/stats" ) const ( @@ -73,3 +73,15 @@ func NewWith(c Config) *Session { func (s *Session) SetStream(stream io.Reader) { s.stream = stream } + +func (s *Session) CreateConnection() (err error) { + return s.sess.CreateConnection(s.onConnectionStateChange()) +} + +func (s *Session) CreateOffer() (string, error) { + return s.sess.CreateOffer() +} + +func (s *Session) SetSDP(sdp string) error { + return s.sess.SetSDP(sdp) +} diff --git a/pkg/session/session_test.go b/pkg/session/session_test.go index 2632590a..fee6ad65 100644 --- a/pkg/session/session_test.go +++ b/pkg/session/session_test.go @@ -5,11 +5,11 @@ import ( "fmt" "testing" - "github.com/schollz/croc/internal/buffer" - "github.com/schollz/croc/pkg/session/common" - "github.com/schollz/croc/pkg/session/receiver" - "github.com/schollz/croc/pkg/session/sender" - "github.com/schollz/croc/pkg/utils" + "github.com/schollz/croc/v5/internal/buffer" + "github.com/schollz/croc/v5/pkg/session/common" + "github.com/schollz/croc/v5/pkg/session/receiver" + "github.com/schollz/croc/v5/pkg/session/sender" + "github.com/schollz/croc/v5/pkg/utils" "github.com/stretchr/testify/assert" ) diff --git a/src/croc/croc.go b/src/croc/croc.go index 369cf40b..95d51458 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -18,6 +18,8 @@ import ( "github.com/go-redis/redis" "github.com/mattn/go-colorable" "github.com/pions/webrtc" + recvSess "github.com/schollz/croc/v5/pkg/session/receiver" + sendSess "github.com/schollz/croc/v5/pkg/session/sender" "github.com/schollz/croc/v5/src/utils" "github.com/schollz/pake" "github.com/schollz/progressbar/v2" @@ -58,6 +60,9 @@ type Client struct { CurrentFile *os.File CurrentFileChunks []int64 + sendSess sendSess.Session + recvSess recvSess.Session + // channel data incomingMessageChannel <-chan *redis.Message nameOutChannel string @@ -364,41 +369,25 @@ func (c *Client) processMessage(m Message) (err error) { _, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) c.log.Debug("writing chunk", chunk.Location) case "datachannel-offer": - offer := webrtc.SessionDescription{} - err = Decode(m.Message, &offer) + err = c.recvSess.SetSDP(m.Message) if err != nil { return } - c.log.Debugf("got offer for %d:", m.Num) - // Set the remote SessionDescription - err = c.peerConnection[m.Num].SetRemoteDescription(offer) + var answer string + answer, err = c.recvSess.CreateAnswer() if err != nil { return } - - // Sets the LocalDescription, and starts our UDP listeners - var answer webrtc.SessionDescription - answer, err = c.peerConnection[m.Num].CreateAnswer(nil) - if err != nil { - return - } - // Output the answer in base64 so we can paste it in browser err = c.redisdb.Publish(c.nameOutChannel, Message{ Type: "datachannel-answer", - Message: Encode(answer), + Message: answer, Num: m.Num, }.String()).Err() case "datachannel-answer": c.log.Debug("got answer:", m.Message) - var answer webrtc.SessionDescription - - err = Decode(m.Message, &answer) - if err != nil { - return - } // Apply the answer as the remote description - err = c.peerConnection[m.Num].SetRemoteDescription(answer) + err = c.sendSess.SetSDP(m.Message) case "close-sender": c.peerConnection[m.Num].Close() c.peerConnection[m.Num] = nil @@ -559,221 +548,33 @@ func (c *Client) updateState() (err error) { } func (c *Client) dataChannelReceive(num int) (err error) { - // Prepare the configuration - config := webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - { - URLs: []string{"stun:stun.l.google.com:19302"}, - }, - }, - } - - // Create a SettingEngine and enable Detach - s := webrtc.SettingEngine{} - s.DetachDataChannels() - - // Create an API object with the engine - api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) - - // Create a new RTCPeerConnection - c.peerConnection[num], err = api.NewPeerConnection(config) + err = c.recvSess.CreateConnection() if err != nil { return } - - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - c.peerConnection[num].OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) - }) - - // Register data channel creation handling - c.peerConnection[num].OnDataChannel(func(d *webrtc.DataChannel) { - - // Register channel opening handling - d.OnOpen(func() { - c.log.Debugf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID()) - - // Detach the data channel - raw, dErr := d.Detach() - if dErr != nil { - panic(dErr) - } - - startTime := false - timer := time.Now() - - // Handle reading from the data channel - go func(d io.Reader) { - for { - buffer := make([]byte, BufferSize*2) - n, err := d.Read(buffer) - if err != nil { - fmt.Println("Datachannel closed; Exit the readloop:", err) - return - } - if bytes.Equal([]byte("done"), buffer[:n]) { - c.log.Debug(time.Since(timer)) - c.log.Debug("telling transfer is over") - err = c.redisdb.Publish(c.nameOutChannel, Message{ - Type: "close-sender", - Num: num, - }.String()).Err() - if err != nil { - panic(err) - } - return - } - - if !startTime { - startTime = true - timer = time.Now() - } - var chunk Chunk - errM := json.Unmarshal(buffer[:n], &chunk) - if errM != nil { - panic(errM) - } - var nBytes int - c.mutex.Lock() - nBytes, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) - // c.log.Debugf("wrote %d bytes to %d (%d)", n, chunk.Location,num) - c.mutex.Unlock() - if err != nil { - panic(err) - } - c.bar.Add(nBytes) - - } - }(raw) - - }) - - }) - - // Block forever + c.recvSess.CreateDataHandler() return } func (c *Client) dataChannelSend(num int) (err error) { - // Everything below is the pion-WebRTC API! Thanks for using it ❤️. - - // Prepare the configuration - config := webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - { - URLs: []string{"stun:stun.l.google.com:19302"}, - }, - }, + if err := c.sendSess.CreateConnection(); err != nil { + log.Error(err) + return err } - - // Create a SettingEngine and enable Detach - s := webrtc.SettingEngine{} - s.DetachDataChannels() - - // Create an API object with the engine - api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) - - // Create a new RTCPeerConnection - c.peerConnection[num], err = api.NewPeerConnection(config) + if err := c.sendSess.CreateDataChannel(); err != nil { + log.Error(err) + return err + } + offer, err := c.sendSess.CreateOffer() if err != nil { - return + log.Error(err) + return err } - // Create a datachannel with label 'data' - c.dataChannel[num], err = c.peerConnection[num].CreateDataChannel("data", nil) - if err != nil { - return - } - - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - c.peerConnection[num].OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) - }) - - c.dataChannel[num].OnOpen(func() { - fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", c.dataChannel[num].Label(), c.dataChannel[num].ID()) - // Detach the data channel - raw, dErr := c.dataChannel[num].Detach() - if dErr != nil { - panic(dErr) - } - - go func(d io.Writer) { - pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) - c.log.Debugf("sending '%s'", pathToFile) - - file, err := os.Open(pathToFile) - if err != nil { - c.log.Debug(err) - return - } - defer file.Close() - - buffer := make([]byte, BufferSize) - var location int64 - chunkNum := 0.0 - for { - bytesread, err := file.Read(buffer) - if err != nil { - if err != io.EOF { - c.log.Debug(err) - } - break - } - - if math.Mod(chunkNum, float64(Channels)) == float64(num) { - mSend := Chunk{ - Bytes: buffer[:bytesread], - Location: location, - } - dataToSend, _ := json.Marshal(mSend) - c.bar.Add(bytesread) - _, err = d.Write(dataToSend) - if err != nil { - c.log.Debug("Could not send on data channel", err.Error()) - continue - } - time.Sleep(10000 * time.Microsecond) - } - - location += int64(bytesread) - chunkNum += 1.0 - } - c.log.Debug("sending done signal") - _, err = d.Write([]byte("done")) - if err != nil { - c.log.Debug(err) - } - time.Sleep(1 * time.Second) - - }(raw) - - }) - - // Register text message handling - c.dataChannel[num].OnMessage(func(msg webrtc.DataChannelMessage) { - fmt.Printf("Message from DataChannel '%s': '%s'\n", c.dataChannel[num].Label(), string(msg.Data)) - }) - // Create an offer to send to the browser - offer, err := c.peerConnection[num].CreateOffer(nil) - if err != nil { - return - } - - // Sets the LocalDescription, and starts our UDP listeners - err = c.peerConnection[num].SetLocalDescription(offer) - if err != nil { - return - } - - // Output the offer in base64 so we can paste it in browser - c.log.Debug("sending offer") + // sending offer err = c.redisdb.Publish(c.nameOutChannel, Message{ Type: "datachannel-offer", - Message: Encode(offer), - Num: num, + Message: offer, }.String()).Err() if err != nil { return