From 3d9cac01e6600282850baa5b73be77ab6202f423 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Fri, 5 Apr 2019 13:15:06 -0700 Subject: [PATCH] use detached channels --- src/croc/croc.go | 194 ++++++++++++++++++++++++++++------------------- 1 file changed, 115 insertions(+), 79 deletions(-) diff --git a/src/croc/croc.go b/src/croc/croc.go index 8b067393..95e99ed4 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -568,8 +568,15 @@ func (c *Client) dataChannelReceive(num int) (err error) { }, } + // Create a SettingEngine and enable Detach + s := webrtc.SettingEngine{} + s.DetachDataChannels() + + // Create an API object with the engine + api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) + // Create a new RTCPeerConnection - c.peerConnection[num], err = webrtc.NewPeerConnection(config) + c.peerConnection[num], err = api.NewPeerConnection(config) if err != nil { return } @@ -586,46 +593,60 @@ func (c *Client) dataChannelReceive(num int) (err error) { // Register channel opening handling d.OnOpen(func() { c.log.Debugf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID()) - }) - startTime := false - timer := time.Now() - piecesToDo := make(map[int64]bool) - for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += BufferSize { - piecesToDo[i] = true - } - d.OnMessage(func(msg webrtc.DataChannelMessage) { - if bytes.Equal([]byte("done"), msg.Data) { - c.log.Debug(time.Since(timer)) - c.log.Debug("telling transfer is over") - err = c.redisdb.Publish(c.nameOutChannel, Message{ - Type: "close-sender", - Num: num, - }.String()).Err() - if err != nil { - panic(err) + // Detach the data channel + raw, dErr := d.Detach() + if dErr != nil { + panic(dErr) + } + + startTime := false + timer := time.Now() + + // Handle reading from the data channel + go func(d io.Reader) { + for { + buffer := make([]byte, BufferSize*2) + n, err := d.Read(buffer) + if err != nil { + fmt.Println("Datachannel closed; Exit the readloop:", err) + return + } + if bytes.Equal([]byte("done"), buffer[:n]) { + c.log.Debug(time.Since(timer)) + c.log.Debug("telling transfer is over") + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "close-sender", + Num: num, + }.String()).Err() + if err != nil { + panic(err) + } + return + } + + if !startTime { + startTime = true + timer = time.Now() + } + var chunk Chunk + errM := json.Unmarshal(buffer[:n], &chunk) + if errM != nil { + panic(errM) + } + var nBytes int + c.mutex.Lock() + nBytes, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) + // c.log.Debugf("wrote %d bytes to %d (%d)", n, chunk.Location,num) + c.mutex.Unlock() + if err != nil { + panic(err) + } + c.bar.Add(nBytes) + } - return - } + }(raw) - if !startTime { - startTime = true - timer = time.Now() - } - var chunk Chunk - errM := json.Unmarshal(msg.Data, &chunk) - if errM != nil { - panic(errM) - } - var n int - c.mutex.Lock() - n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) - // c.log.Debugf("wrote %d bytes to %d (%d)", n, chunk.Location,num) - c.mutex.Unlock() - if err != nil { - panic(err) - } - c.bar.Add(n) }) }) @@ -646,8 +667,15 @@ func (c *Client) dataChannelSend(num int) (err error) { }, } + // Create a SettingEngine and enable Detach + s := webrtc.SettingEngine{} + s.DetachDataChannels() + + // Create an API object with the engine + api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) + // Create a new RTCPeerConnection - c.peerConnection[num], err = webrtc.NewPeerConnection(config) + c.peerConnection[num], err = api.NewPeerConnection(config) if err != nil { return } @@ -666,54 +694,62 @@ func (c *Client) dataChannelSend(num int) (err error) { c.dataChannel[num].OnOpen(func() { fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", c.dataChannel[num].Label(), c.dataChannel[num].ID()) - time.Sleep(100 * time.Microsecond) - - pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) - c.log.Debugf("sending '%s'", pathToFile) - - file, err := os.Open(pathToFile) - if err != nil { - c.log.Debug(err) - return + // Detach the data channel + raw, dErr := c.dataChannel[num].Detach() + if dErr != nil { + panic(dErr) } - defer file.Close() - buffer := make([]byte, BufferSize) - var location int64 - chunkNum := 0.0 - for { - bytesread, err := file.Read(buffer) + go func(d io.Writer) { + pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) + c.log.Debugf("sending '%s'", pathToFile) + + file, err := os.Open(pathToFile) if err != nil { - if err != io.EOF { - c.log.Debug(err) - } - break + c.log.Debug(err) + return } + defer file.Close() - if math.Mod(chunkNum, float64(Channels)) == float64(num) { - mSend := Chunk{ - Bytes: buffer[:bytesread], - Location: location, - } - dataToSend, _ := json.Marshal(mSend) - c.bar.Add(bytesread) - err = c.dataChannel[num].Send(dataToSend) + buffer := make([]byte, BufferSize) + var location int64 + chunkNum := 0.0 + for { + bytesread, err := file.Read(buffer) if err != nil { - c.log.Debug("Could not send on data channel", err.Error()) - continue + if err != io.EOF { + c.log.Debug(err) + } + break } - time.Sleep(100 * time.Microsecond) - } - location += int64(bytesread) - chunkNum += 1.0 - } - c.log.Debug("sending done signal") - err = c.dataChannel[num].Send([]byte("done")) - if err != nil { - c.log.Debug(err) - } - time.Sleep(1 * time.Second) + if math.Mod(chunkNum, float64(Channels)) == float64(num) { + mSend := Chunk{ + Bytes: buffer[:bytesread], + Location: location, + } + dataToSend, _ := json.Marshal(mSend) + c.bar.Add(bytesread) + _, err = d.Write(dataToSend) + if err != nil { + c.log.Debug("Could not send on data channel", err.Error()) + continue + } + time.Sleep(100 * time.Microsecond) + } + + location += int64(bytesread) + chunkNum += 1.0 + } + c.log.Debug("sending done signal") + _, err = d.Write([]byte("done")) + if err != nil { + c.log.Debug(err) + } + time.Sleep(1 * time.Second) + + }(raw) + }) // Register text message handling