diff --git a/README.md b/README.md index 2f6203dd..5c78e903 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ src="https://user-images.githubusercontent.com/6550035/46709024-9b23ad00-cbf6-11e8-9fb2-ca8b20b7dbec.jpg" width="408px" border="0" alt="croc">
-Version +Version Code coverage Build
@@ -150,6 +150,21 @@ which will generate the file that you can edit.
 Any changes you make to the configuration file will be applied *before* the command-line flags, if any.
 
 
+## Notes
+
+How to determine if p2p is working:
+
+
+```
+watch -n1 netstat -au
+```
+
+Look for the port transmitting data
+
+```
+sudo tcpdump -i wlp3s0 udp port XX -vv -n
+```
+
 ## License
 
 MIT
diff --git a/main.go b/main.go
index dcbea1f3..7a79d251 100644
--- a/main.go
+++ b/main.go
@@ -1,544 +1,24 @@
 package main
 
 import (
-	0 { + c.Filename = fnameOption[0] + } + + // create channel for quitting + // quit with c.quit <- true + c.quit = make(chan bool) + + // if recipient, initialize with sending pake information + c.log.Debug("ready") + if !c.IsSender && !c.Step1ChannelSecured { + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "pake", + Bytes: c.Pake.Bytes(), + }.String()).Err() + if err != nil { + return + } + } + + // listen for incoming messages and process them + for { + select { + case <-c.quit: + return + case msg := <-c.incomingMessageChannel: + var m Message + err = json.Unmarshal([]byte(msg.Payload), &m) + if err != nil { + return + } + err = c.processMessage(m) + if err != nil { + return + } + default: + time.Sleep(1 * time.Millisecond) + } + } + return +} + +func (c *Client) sendOverRedis() (err error) { + go func() { + bar := progressbar.NewOptions( + int(c.FileInfo.Size), + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionSetBytes(int(c.FileInfo.Size)), + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionThrottle(1/60*time.Second), + ) + c.f, err = os.Open(c.FileInfo.Name) + if err != nil { + panic(err) + } + location := int64(0) + for { + buf := make([]byte, 4096*128) + n, errRead := c.f.Read(buf) + bar.Add(n) + chunk := Chunk{ + Bytes: buf[:n], + Location: location, + } + chunkB, _ := json.Marshal(chunk) + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "chunk", + Bytes: chunkB, + }.String()).Err() + if err != nil { + panic(err) + } + location += int64(n) + if errRead == io.EOF { + break + } + if errRead != nil { + panic(errRead) + } + } + }() + return +} + +func (c *Client) processMessage(m Message) (err error) { + switch m.Type { + case "pake": + notVerified := !c.Pake.IsVerified() + err = c.Pake.Update(m.Bytes) + if err != nil { + return + } + if (notVerified && c.Pake.IsVerified() && !c.IsSender) || !c.Pake.IsVerified() { + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "pake", + Bytes: c.Pake.Bytes(), + }.String()).Err() + } + if c.Pake.IsVerified() { + c.log.Debug(c.Pake.SessionKey()) + c.Step1ChannelSecured = true + } + case "fileinfo": + err = json.Unmarshal(m.Bytes, &c.FileInfo) + if err != nil { + return + } + c.log.Debug(c.FileInfo) + c.f, err = os.Create("d.txt") + if err != nil { + return + } + err = c.f.Truncate(c.FileInfo.Size) + if err != nil { + return + } + c.Step2FileInfoTransfered = true + case "recipientready": + c.Step3RecipientReady = true + case "chunk": + var chunk Chunk + err = json.Unmarshal(m.Bytes, &chunk) + if err != nil { + return + } + _, err = c.f.WriteAt(chunk.Bytes, chunk.Location) + c.log.Debug("writing chunk", chunk.Location) + case "datachannel-offer": + offer := util.Decode(m.Message) + c.log.Debug("got offer:", m.Message) + // Set the remote SessionDescription + err = c.peerConnection.SetRemoteDescription(offer) + if err != nil { + return + } + + // Sets the LocalDescription, and starts our UDP listeners + var answer webrtc.RTCSessionDescription + answer, err = c.peerConnection.CreateAnswer(nil) + if err != nil { + return + } + + // Output the answer in base64 so we can paste it in browser + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "datachannel-answer", + Message: util.Encode(answer), + }.String()).Err() + case "datachannel-answer": + answer := util.Decode(m.Message) + // Apply the answer as the remote description + err = c.peerConnection.SetRemoteDescription(answer) + } + if err != nil { + return + } + err = c.updateState() + + return +} + +func (c *Client) updateState() (err error) { + if c.IsSender && c.Step1ChannelSecured && !c.Step2FileInfoTransfered { + var fstats os.FileInfo + fstats, err = os.Stat(c.Filename) + if err != nil { + return + } + c.FileInfo = FileInfo{ + Name: fstats.Name(), + Size: fstats.Size(), + ModTime: fstats.ModTime(), + IsDir: fstats.IsDir(), + } + b, _ := json.Marshal(c.FileInfo) + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "fileinfo", + Bytes: b, + }.String()).Err() + if err != nil { + return + } + c.Step2FileInfoTransfered = true + } + if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientReady { + // TODO: recipient requests the chunk locations (if empty, then should receive all chunks) + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "recipientready", + }.String()).Err() + if err != nil { + return + } + c.Step3RecipientReady = true + // start receiving data + go func() { + err = c.dataChannelReceive() + if err != nil { + panic(err) + } + }() + } + if c.IsSender && c.Step3RecipientReady && !c.Step4SendingData { + c.log.Debug("start sending data!") + c.Step4SendingData = true + go func() { + err = c.dataChannelSend() + if err != nil { + panic(err) + } + }() + } + return +} + +func (c *Client) dataChannelReceive() (err error) { + // Prepare the configuration + config := webrtc.RTCConfiguration{ + IceServers: []webrtc.RTCIceServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + } + + // Create a new RTCPeerConnection + c.peerConnection, err = webrtc.New(config) + if err != nil { + return + } + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) { + fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) + }) + + // Register data channel creation handling + c.peerConnection.OnDataChannel(func(d *webrtc.RTCDataChannel) { + fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID) + sendBytes := make(chan []byte, 1024) + // Register channel opening handling + d.OnOpen(func() { + fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label, d.ID) + for { + data := <-sendBytes + err := d.Send(datachannel.PayloadBinary{Data: data}) + if err != nil { + c.log.Debug(err) + } + } + }) + + startTime := false + timer := time.Now() + var mutex = &sync.Mutex{} + piecesToDo := make(map[int64]bool) + for i := int64(0); i < c.FileInfo.Size; i += 4096 { + piecesToDo[i] = true + } + // Register message handling + d.OnMessage(func(payload datachannel.Payload) { + + switch p := payload.(type) { + case *datachannel.PayloadString: + fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data)) + if bytes.Equal(p.Data, []byte("done")) { + c.f.Close() + c.log.Debug(time.Since(timer)) + } + case *datachannel.PayloadBinary: + if !startTime { + startTime = true + timer = time.Now() + } + var chunk Chunk + errM := json.Unmarshal(p.Data, &chunk) + if errM != nil { + panic(errM) + } + var n int + mutex.Lock() + n, err = c.f.WriteAt(chunk.Bytes, chunk.Location) + mutex.Unlock() + if err != nil { + panic(err) + } + c.log.Debugf("wrote %d bytes to %d", n, chunk.Location) + mutex.Lock() + piecesToDo[chunk.Location] = false + mutex.Unlock() + go func() { + numToDo := 0 + thingsToDo := make([]int64, len(piecesToDo)) + mutex.Lock() + for k := range piecesToDo { + if piecesToDo[k] { + thingsToDo[numToDo] = k + numToDo++ + } + } + mutex.Unlock() + thingsToDo = thingsToDo[:numToDo] + c.log.Debug("num to do: ", len(thingsToDo)) + if len(thingsToDo) < 10 { + c.log.Debug(thingsToDo) + } + }() + default: + fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label) + } + }) + }) + + // Block forever + return +} + +func (c *Client) dataChannelSend() (err error) { + recievedBytes := make(chan []byte, 1024) + // Everything below is the pion-WebRTC API! Thanks for using it ❤️. + + // Prepare the configuration + config := webrtc.RTCConfiguration{ + IceServers: []webrtc.RTCIceServer{ + { + URLs: []string{"stun:stun1.l.google.com:19305"}, + }, + }, + } + + // Create a new RTCPeerConnection + c.peerConnection, err = webrtc.New(config) + if err != nil { + return + } + + // Create a datachannel with label 'data' + c.dataChannel, err = c.peerConnection.CreateDataChannel("data", nil) + if err != nil { + return + } + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) { + fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) + }) + + // Register channel opening handling + c.dataChannel.OnOpen(func() { + fmt.Printf("Data channel '%s'-'%d' open\n", c.dataChannel.Label, c.dataChannel.ID) + time.Sleep(100 * time.Microsecond) + + c.log.Debug("sending file") + const BufferSize = 4096 + file, err := os.Open("test.txt") + if err != nil { + c.log.Debug(err) + return + } + defer file.Close() + + buffer := make([]byte, BufferSize) + var location int64 + for { + bytesread, err := file.Read(buffer) + if err != nil { + if err != io.EOF { + c.log.Debug(err) + } + break + } + + mSend := Chunk{ + Bytes: buffer[:bytesread], + Location: location, + } + dataToSend, _ := json.Marshal(mSend) + + c.log.Debugf("sending %d bytes at %d", bytesread, location) + err = c.dataChannel.Send(datachannel.PayloadBinary{Data: dataToSend}) + if err != nil { + c.log.Debug("Could not send on data channel", err.Error()) + continue + } + location += int64(bytesread) + time.Sleep(100 * time.Microsecond) + } + c.log.Debug("sending done signal") + err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")}) + if err != nil { + c.log.Debug(err) + } + }) + + // Register the OnMessage to handle incoming messages + c.dataChannel.OnMessage(func(payload datachannel.Payload) { + switch p := payload.(type) { + case *datachannel.PayloadString: + fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), c.dataChannel.Label, string(p.Data)) + case *datachannel.PayloadBinary: + fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), c.dataChannel.Label, p.Data) + recievedBytes <- p.Data + default: + fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), c.dataChannel.Label) + } + }) + + // Create an offer to send to the browser + offer, err := c.peerConnection.CreateOffer(nil) + if err != nil { + return + } + + // Output the offer in base64 so we can paste it in browser + c.log.Debug("sending offer") + err = c.redisdb.Publish(c.nameOutChannel, Message{ + Type: "datachannel-offer", + Message: util.Encode(offer), + }.String()).Err() + if err != nil { + return + } + + return +}