From 082c5e80dd3a6bec54957068d793cf1f6c478f25 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Thu, 4 Apr 2019 13:45:18 -0700 Subject: [PATCH] use latest api --- src/croc/croc.go | 198 ++++++++++++++++++++++++----------------------- 1 file changed, 101 insertions(+), 97 deletions(-) diff --git a/src/croc/croc.go b/src/croc/croc.go index bc5ced5c..6a1a53a7 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -3,6 +3,7 @@ package croc import ( "bytes" "crypto/elliptic" + "encoding/base64" "encoding/json" "fmt" "io" @@ -17,9 +18,6 @@ import ( "github.com/go-redis/redis" "github.com/mattn/go-colorable" "github.com/pions/webrtc" - "github.com/pions/webrtc/examples/util" - "github.com/pions/webrtc/pkg/datachannel" - "github.com/pions/webrtc/pkg/ice" "github.com/schollz/croc/v5/src/utils" "github.com/schollz/pake" "github.com/schollz/progressbar/v2" @@ -65,8 +63,8 @@ type Client struct { nameInChannel string // webrtc connections - peerConnection *webrtc.RTCPeerConnection - dataChannel *webrtc.RTCDataChannel + peerConnection *webrtc.PeerConnection + dataChannel *webrtc.DataChannel quit chan bool } @@ -360,7 +358,11 @@ func (c *Client) processMessage(m Message) (err error) { _, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) c.log.Debug("writing chunk", chunk.Location) case "datachannel-offer": - offer := util.Decode(m.Message) + offer := webrtc.SessionDescription{} + err = Decode(m.Message, &offer) + if err != nil { + return + } c.log.Debug("got offer:", m.Message) // Set the remote SessionDescription err = c.peerConnection.SetRemoteDescription(offer) @@ -369,7 +371,7 @@ func (c *Client) processMessage(m Message) (err error) { } // Sets the LocalDescription, and starts our UDP listeners - var answer webrtc.RTCSessionDescription + var answer webrtc.SessionDescription answer, err = c.peerConnection.CreateAnswer(nil) if err != nil { return @@ -378,10 +380,15 @@ func (c *Client) processMessage(m Message) (err error) { // Output the answer in base64 so we can paste it in browser err = c.redisdb.Publish(c.nameOutChannel, Message{ Type: "datachannel-answer", - Message: util.Encode(answer), + Message: Encode(answer), }.String()).Err() case "datachannel-answer": - answer := util.Decode(m.Message) + var answer webrtc.SessionDescription + + err = Decode(m.Message, &answer) + if err != nil { + return + } // Apply the answer as the remote description err = c.peerConnection.SetRemoteDescription(answer) case "close-sender": @@ -522,8 +529,8 @@ func (c *Client) updateState() (err error) { func (c *Client) dataChannelReceive() (err error) { // Prepare the configuration - config := webrtc.RTCConfiguration{ - IceServers: []webrtc.RTCIceServer{ + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, @@ -531,87 +538,70 @@ func (c *Client) dataChannelReceive() (err error) { } // Create a new RTCPeerConnection - c.peerConnection, err = webrtc.New(config) + c.peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected - c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) { + c.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) }) // Register data channel creation handling - c.peerConnection.OnDataChannel(func(d *webrtc.RTCDataChannel) { + c.peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID) - sendBytes := make(chan []byte, 1024) - // Register channel opening handling - d.OnOpen(func() { - log.Debugf("Data channel '%s'-'%d' open", d.Label, d.ID) - for { - data := <-sendBytes - err := d.Send(datachannel.PayloadBinary{Data: data}) - if err != nil { - c.log.Debug(err) - } - } - }) + }) - startTime := false - timer := time.Now() - var mutex = &sync.Mutex{} - piecesToDo := make(map[int64]bool) - for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += 4096 { - piecesToDo[i] = true + sendBytes := make(chan []byte, 1024) + // Register channel opening handling + c.dataChannel.OnOpen(func() { + log.Debugf("Data channel '%s'-'%d' open", c.dataChannel.Label, c.dataChannel.ID) + for { + data := <-sendBytes + err := c.dataChannel.Send(data) + if err != nil { + c.log.Debug(err) + } } - // Register message handling - bar := progressbar.NewOptions64( - c.FilesToTransfer[c.FilesToTransferCurrentNum].Size, - progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes64(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size), - progressbar.OptionSetWriter(os.Stderr), - progressbar.OptionThrottle(1/60*time.Second), - ) - d.OnMessage(func(payload datachannel.Payload) { + }) - switch p := payload.(type) { - case *datachannel.PayloadString: - fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data)) - if bytes.Equal(p.Data, []byte("done")) { - c.CurrentFile.Close() - c.log.Debug(time.Since(timer)) - c.log.Debug("telling transfer is over") - err = c.redisdb.Publish(c.nameOutChannel, Message{ - Type: "close-sender", - }.String()).Err() - if err != nil { - panic(err) - } - } - case *datachannel.PayloadBinary: - if !startTime { - startTime = true - timer = time.Now() - } - var chunk Chunk - errM := json.Unmarshal(p.Data, &chunk) - if errM != nil { - panic(errM) - } - var n int - mutex.Lock() - n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) - mutex.Unlock() - if err != nil { - panic(err) - } - // c.log.Debugf("wrote %d bytes to %d", n, chunk.Location) - bar.Add(n) - default: - fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label) - } - }) + startTime := false + timer := time.Now() + var mutex = &sync.Mutex{} + piecesToDo := make(map[int64]bool) + for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += 4096 { + piecesToDo[i] = true + } + // Register message handling + bar := progressbar.NewOptions64( + c.FilesToTransfer[c.FilesToTransferCurrentNum].Size, + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionSetBytes64(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size), + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionThrottle(1/60*time.Second), + ) + c.dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { + + if !startTime { + startTime = true + timer = time.Now() + } + var chunk Chunk + errM := json.Unmarshal(msg.Data, &chunk) + if errM != nil { + panic(errM) + } + var n int + mutex.Lock() + n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) + mutex.Unlock() + if err != nil { + panic(err) + } + // c.log.Debugf("wrote %d bytes to %d", n, chunk.Location) + bar.Add(n) }) // Block forever @@ -619,20 +609,19 @@ func (c *Client) dataChannelReceive() (err error) { } func (c *Client) dataChannelSend() (err error) { - recievedBytes := make(chan []byte, 1024) // Everything below is the pion-WebRTC API! Thanks for using it ❤️. // Prepare the configuration - config := webrtc.RTCConfiguration{ - IceServers: []webrtc.RTCIceServer{ + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ { - URLs: []string{"stun:stun1.l.google.com:19305"}, + URLs: []string{"stun:stun.l.google.com:19302"}, }, }, } // Create a new RTCPeerConnection - c.peerConnection, err = webrtc.New(config) + c.peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } @@ -645,7 +634,7 @@ func (c *Client) dataChannelSend() (err error) { // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected - c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) { + c.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) }) @@ -691,7 +680,7 @@ func (c *Client) dataChannelSend() (err error) { dataToSend, _ := json.Marshal(mSend) bar.Add(bytesread) - err = c.dataChannel.Send(datachannel.PayloadBinary{Data: dataToSend}) + err = c.dataChannel.Send(dataToSend) if err != nil { c.log.Debug("Could not send on data channel", err.Error()) continue @@ -710,7 +699,7 @@ func (c *Client) dataChannelSend() (err error) { if c.dataChannel == nil { return } - err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")}) + err = c.dataChannel.Send([]byte("done")) if err != nil { c.log.Debug(err) } @@ -720,16 +709,8 @@ func (c *Client) dataChannelSend() (err error) { }) // Register the OnMessage to handle incoming messages - c.dataChannel.OnMessage(func(payload datachannel.Payload) { - switch p := payload.(type) { - case *datachannel.PayloadString: - fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), c.dataChannel.Label, string(p.Data)) - case *datachannel.PayloadBinary: - fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), c.dataChannel.Label, p.Data) - recievedBytes <- p.Data - default: - fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), c.dataChannel.Label) - } + c.dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { + fmt.Printf("Message from DataChannel '%s': '%s'\n", c.dataChannel.Label(), string(msg.Data)) }) // Create an offer to send to the browser @@ -742,7 +723,7 @@ func (c *Client) dataChannelSend() (err error) { c.log.Debug("sending offer") err = c.redisdb.Publish(c.nameOutChannel, Message{ Type: "datachannel-offer", - Message: util.Encode(offer), + Message: Encode(offer), }.String()).Err() if err != nil { return @@ -788,3 +769,26 @@ func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) { } return } + +// Encode encodes the input in base64 +// It can optionally zip the input before encoding +func Encode(obj interface{}) string { + b, err := json.Marshal(obj) + if err != nil { + panic(err) + } + + return base64.StdEncoding.EncodeToString(b) +} + +// Decode decodes the input from base64 +// It can optionally unzip the input after decoding +func Decode(in string, obj interface{}) (err error) { + b, err := base64.StdEncoding.DecodeString(in) + if err != nil { + return + } + + err = json.Unmarshal(b, obj) + return +}