0
0
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

use detached channels

This commit is contained in:
Zack Scholl 2019-04-05 13:15:06 -07:00
parent 800ee4a0d3
commit 3d9cac01e6

View file

@ -568,8 +568,15 @@ func (c *Client) dataChannelReceive(num int) (err error) {
}, },
} }
// Create a SettingEngine and enable Detach
s := webrtc.SettingEngine{}
s.DetachDataChannels()
// Create an API object with the engine
api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
// Create a new RTCPeerConnection // Create a new RTCPeerConnection
c.peerConnection[num], err = webrtc.NewPeerConnection(config) c.peerConnection[num], err = api.NewPeerConnection(config)
if err != nil { if err != nil {
return return
} }
@ -586,46 +593,60 @@ func (c *Client) dataChannelReceive(num int) (err error) {
// Register channel opening handling // Register channel opening handling
d.OnOpen(func() { d.OnOpen(func() {
c.log.Debugf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID()) c.log.Debugf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label(), d.ID())
})
startTime := false // Detach the data channel
timer := time.Now() raw, dErr := d.Detach()
piecesToDo := make(map[int64]bool) if dErr != nil {
for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += BufferSize { panic(dErr)
piecesToDo[i] = true }
}
d.OnMessage(func(msg webrtc.DataChannelMessage) { startTime := false
if bytes.Equal([]byte("done"), msg.Data) { timer := time.Now()
c.log.Debug(time.Since(timer))
c.log.Debug("telling transfer is over") // Handle reading from the data channel
err = c.redisdb.Publish(c.nameOutChannel, Message{ go func(d io.Reader) {
Type: "close-sender", for {
Num: num, buffer := make([]byte, BufferSize*2)
}.String()).Err() n, err := d.Read(buffer)
if err != nil { if err != nil {
panic(err) fmt.Println("Datachannel closed; Exit the readloop:", err)
return
}
if bytes.Equal([]byte("done"), buffer[:n]) {
c.log.Debug(time.Since(timer))
c.log.Debug("telling transfer is over")
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "close-sender",
Num: num,
}.String()).Err()
if err != nil {
panic(err)
}
return
}
if !startTime {
startTime = true
timer = time.Now()
}
var chunk Chunk
errM := json.Unmarshal(buffer[:n], &chunk)
if errM != nil {
panic(errM)
}
var nBytes int
c.mutex.Lock()
nBytes, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location)
// c.log.Debugf("wrote %d bytes to %d (%d)", n, chunk.Location,num)
c.mutex.Unlock()
if err != nil {
panic(err)
}
c.bar.Add(nBytes)
} }
return }(raw)
}
if !startTime {
startTime = true
timer = time.Now()
}
var chunk Chunk
errM := json.Unmarshal(msg.Data, &chunk)
if errM != nil {
panic(errM)
}
var n int
c.mutex.Lock()
n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location)
// c.log.Debugf("wrote %d bytes to %d (%d)", n, chunk.Location,num)
c.mutex.Unlock()
if err != nil {
panic(err)
}
c.bar.Add(n)
}) })
}) })
@ -646,8 +667,15 @@ func (c *Client) dataChannelSend(num int) (err error) {
}, },
} }
// Create a SettingEngine and enable Detach
s := webrtc.SettingEngine{}
s.DetachDataChannels()
// Create an API object with the engine
api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
// Create a new RTCPeerConnection // Create a new RTCPeerConnection
c.peerConnection[num], err = webrtc.NewPeerConnection(config) c.peerConnection[num], err = api.NewPeerConnection(config)
if err != nil { if err != nil {
return return
} }
@ -666,54 +694,62 @@ func (c *Client) dataChannelSend(num int) (err error) {
c.dataChannel[num].OnOpen(func() { c.dataChannel[num].OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", c.dataChannel[num].Label(), c.dataChannel[num].ID()) fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", c.dataChannel[num].Label(), c.dataChannel[num].ID())
time.Sleep(100 * time.Microsecond) // Detach the data channel
raw, dErr := c.dataChannel[num].Detach()
pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) if dErr != nil {
c.log.Debugf("sending '%s'", pathToFile) panic(dErr)
file, err := os.Open(pathToFile)
if err != nil {
c.log.Debug(err)
return
} }
defer file.Close()
buffer := make([]byte, BufferSize) go func(d io.Writer) {
var location int64 pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name)
chunkNum := 0.0 c.log.Debugf("sending '%s'", pathToFile)
for {
bytesread, err := file.Read(buffer) file, err := os.Open(pathToFile)
if err != nil { if err != nil {
if err != io.EOF { c.log.Debug(err)
c.log.Debug(err) return
}
break
} }
defer file.Close()
if math.Mod(chunkNum, float64(Channels)) == float64(num) { buffer := make([]byte, BufferSize)
mSend := Chunk{ var location int64
Bytes: buffer[:bytesread], chunkNum := 0.0
Location: location, for {
} bytesread, err := file.Read(buffer)
dataToSend, _ := json.Marshal(mSend)
c.bar.Add(bytesread)
err = c.dataChannel[num].Send(dataToSend)
if err != nil { if err != nil {
c.log.Debug("Could not send on data channel", err.Error()) if err != io.EOF {
continue c.log.Debug(err)
}
break
} }
time.Sleep(100 * time.Microsecond)
}
location += int64(bytesread) if math.Mod(chunkNum, float64(Channels)) == float64(num) {
chunkNum += 1.0 mSend := Chunk{
} Bytes: buffer[:bytesread],
c.log.Debug("sending done signal") Location: location,
err = c.dataChannel[num].Send([]byte("done")) }
if err != nil { dataToSend, _ := json.Marshal(mSend)
c.log.Debug(err) c.bar.Add(bytesread)
} _, err = d.Write(dataToSend)
time.Sleep(1 * time.Second) if err != nil {
c.log.Debug("Could not send on data channel", err.Error())
continue
}
time.Sleep(100 * time.Microsecond)
}
location += int64(bytesread)
chunkNum += 1.0
}
c.log.Debug("sending done signal")
_, err = d.Write([]byte("done"))
if err != nil {
c.log.Debug(err)
}
time.Sleep(1 * time.Second)
}(raw)
}) })
// Register text message handling // Register text message handling