1
1
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

make module

This commit is contained in:
Zack Scholl 2019-04-03 21:03:07 -07:00
parent ed6533878d
commit 527825b53d
3 changed files with 608 additions and 528 deletions

View file

@ -4,7 +4,7 @@
src="https://user-images.githubusercontent.com/6550035/46709024-9b23ad00-cbf6-11e8-9fb2-ca8b20b7dbec.jpg" src="https://user-images.githubusercontent.com/6550035/46709024-9b23ad00-cbf6-11e8-9fb2-ca8b20b7dbec.jpg"
width="408px" border="0" alt="croc"> width="408px" border="0" alt="croc">
<br> <br>
<a href="https://github.com/schollz/croc/releases/latest"><img src="https://img.shields.io/badge/version-4.1.5-brightgreen.svg?style=flat-square" alt="Version"></a> <a href="https://github.com/schollz/croc/releases/latest"><img src="https://img.shields.io/badge/version-5.0.0-brightgreen.svg?style=flat-square" alt="Version"></a>
<img src="https://img.shields.io/badge/coverage-77%25-brightgreen.svg?style=flat-square" alt="Code coverage"> <img src="https://img.shields.io/badge/coverage-77%25-brightgreen.svg?style=flat-square" alt="Code coverage">
<a href="https://travis-ci.org/schollz/croc"><img <a href="https://travis-ci.org/schollz/croc"><img
src="https://img.shields.io/travis/schollz/croc.svg?style=flat-square" alt="Build src="https://img.shields.io/travis/schollz/croc.svg?style=flat-square" alt="Build
@ -150,6 +150,21 @@ which will generate the file that you can edit.
Any changes you make to the configuration file will be applied *before* the command-line flags, if any. Any changes you make to the configuration file will be applied *before* the command-line flags, if any.
## Notes
How to determine if p2p is working:
```
watch -n1 netstat -au
```
Look for the port transmitting data
```
sudo tcpdump -i wlp3s0 udp port XX -vv -n
```
## License ## License
MIT MIT

534
main.go
View file

@ -1,544 +1,24 @@
package main package main
import ( import (
"bytes"
"crypto/elliptic"
"encoding/json"
"flag" "flag"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/go-redis/redis" "github.com/schollz/croc/v5/src/croc"
"github.com/pions/webrtc"
"github.com/pions/webrtc/examples/util"
"github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/ice"
"github.com/schollz/pake"
"github.com/schollz/progressbar/v2"
) )
type Client struct {
redisdb *redis.Client
SharedSecret string
IsSender bool
Pake *pake.Pake
// steps involved in forming relationship
Step1ChannelSecured bool
Step2FileInfoTransfered bool
Step3RecipientReady bool
Step4SendingData bool
f *os.File
FileInfo FileInfo
// channel data
incomingMessageChannel <-chan *redis.Message
nameOutChannel string
nameInChannel string
peerConnection *webrtc.RTCPeerConnection
dataChannel *webrtc.RTCDataChannel
quit chan bool
}
type Message struct {
Type string
Message string
Bytes []byte
}
type Chunk struct {
Bytes []byte
Location int64
}
type FileInfo struct {
Name string
Size int64
ModTime time.Time
IsDir bool
SentName string
IsCompressed bool
IsEncrypted bool
}
func (m Message) String() string {
b, _ := json.Marshal(m)
return string(b)
}
func New(sender bool, sharedSecret string) (c *Client, err error) {
c = new(Client)
c.redisdb = redis.NewClient(&redis.Options{
Addr: "198.199.67.130:6372",
Password: "",
DB: 4,
WriteTimeout: 1 * time.Hour,
ReadTimeout: 1 * time.Hour,
})
_, err = c.redisdb.Ping().Result()
if err != nil {
return
}
c.IsSender = sender
c.SharedSecret = sharedSecret
c.SharedSecret = sharedSecret
if sender {
c.nameOutChannel = c.SharedSecret + "2"
c.nameInChannel = c.SharedSecret + "1"
} else {
c.nameOutChannel = c.SharedSecret + "1"
c.nameInChannel = c.SharedSecret + "2"
}
pubsub := c.redisdb.Subscribe(c.nameInChannel)
_, err = pubsub.Receive()
if err != nil {
return
}
c.incomingMessageChannel = pubsub.Channel()
if c.IsSender {
c.Pake, err = pake.Init([]byte{1, 2, 3}, 1, elliptic.P521(), 1*time.Microsecond)
} else {
c.Pake, err = pake.Init([]byte{1, 2, 3}, 0, elliptic.P521(), 1*time.Microsecond)
}
if err != nil {
return
}
return
}
func (c *Client) Transfer(fname string) (err error) {
c.quit = make(chan bool)
// quit with c.quit <- true
if !c.IsSender {
// kick it off
fmt.Println("sending first pake")
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "pake",
Bytes: c.Pake.Bytes(),
}.String()).Err()
if err != nil {
return
}
}
for {
select {
case <-c.quit:
return
case msg := <-c.incomingMessageChannel:
var m Message
err = json.Unmarshal([]byte(msg.Payload), &m)
if err != nil {
return
}
err = c.processMessage(m)
if err != nil {
return
}
default:
time.Sleep(1 * time.Millisecond)
}
if c.IsSender && c.Step1ChannelSecured && !c.Step2FileInfoTransfered {
var fstats os.FileInfo
fstats, err = os.Stat("test.txt")
if err != nil {
return
}
c.FileInfo = FileInfo{
Name: fstats.Name(),
Size: fstats.Size(),
ModTime: fstats.ModTime(),
IsDir: fstats.IsDir(),
}
b, _ := json.Marshal(c.FileInfo)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "fileinfo",
Bytes: b,
}.String()).Err()
if err != nil {
return
}
c.Step2FileInfoTransfered = true
}
if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientReady {
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "recipientready",
}.String()).Err()
if err != nil {
return
}
c.Step3RecipientReady = true
// start receiving data
go func() {
err = c.dataChannelReceive()
if err != nil {
panic(err)
}
}()
}
if c.IsSender && c.Step3RecipientReady && !c.Step4SendingData {
fmt.Println("start sending data!")
c.Step4SendingData = true
go func() {
err = c.dataChannelSend()
if err != nil {
panic(err)
}
}()
}
}
return
}
func (c *Client) sendOverRedis() (err error) {
go func() {
bar := progressbar.NewOptions(
int(c.FileInfo.Size),
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetBytes(int(c.FileInfo.Size)),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(1/60*time.Second),
)
c.f, err = os.Open(c.FileInfo.Name)
if err != nil {
panic(err)
}
location := int64(0)
for {
buf := make([]byte, 4096*128)
n, errRead := c.f.Read(buf)
bar.Add(n)
chunk := Chunk{
Bytes: buf[:n],
Location: location,
}
chunkB, _ := json.Marshal(chunk)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "chunk",
Bytes: chunkB,
}.String()).Err()
if err != nil {
panic(err)
}
location += int64(n)
if errRead == io.EOF {
break
}
if errRead != nil {
panic(errRead)
}
}
}()
return
}
func (c *Client) processMessage(m Message) (err error) {
switch m.Type {
case "pake":
notVerified := !c.Pake.IsVerified()
err = c.Pake.Update(m.Bytes)
if err != nil {
return
}
if (notVerified && c.Pake.IsVerified() && !c.IsSender) || !c.Pake.IsVerified() {
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "pake",
Bytes: c.Pake.Bytes(),
}.String()).Err()
}
if c.Pake.IsVerified() {
fmt.Println(c.Pake.SessionKey())
c.Step1ChannelSecured = true
}
case "fileinfo":
err = json.Unmarshal(m.Bytes, &c.FileInfo)
if err != nil {
return
}
fmt.Println(c.FileInfo)
c.f, err = os.Create("d.txt")
if err != nil {
return
}
err = c.f.Truncate(c.FileInfo.Size)
if err != nil {
return
}
c.Step2FileInfoTransfered = true
case "recipientready":
c.Step3RecipientReady = true
case "chunk":
var chunk Chunk
err = json.Unmarshal(m.Bytes, &chunk)
if err != nil {
return
}
_, err = c.f.WriteAt(chunk.Bytes, chunk.Location)
fmt.Println("writing chunk", chunk.Location)
case "datachannel-offer":
offer := util.Decode(m.Message)
fmt.Println("got offer:", m.Message)
// Set the remote SessionDescription
err = c.peerConnection.SetRemoteDescription(offer)
if err != nil {
return
}
// Sets the LocalDescription, and starts our UDP listeners
var answer webrtc.RTCSessionDescription
answer, err = c.peerConnection.CreateAnswer(nil)
if err != nil {
return
}
// Output the answer in base64 so we can paste it in browser
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "datachannel-answer",
Message: util.Encode(answer),
}.String()).Err()
case "datachannel-answer":
answer := util.Decode(m.Message)
// Apply the answer as the remote description
err = c.peerConnection.SetRemoteDescription(answer)
}
return
}
func (c *Client) dataChannelReceive() (err error) {
// Prepare the configuration
config := webrtc.RTCConfiguration{
IceServers: []webrtc.RTCIceServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
// Create a new RTCPeerConnection
c.peerConnection, err = webrtc.New(config)
if err != nil {
return
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
// Register data channel creation handling
c.peerConnection.OnDataChannel(func(d *webrtc.RTCDataChannel) {
fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID)
sendBytes := make(chan []byte, 1024)
// Register channel opening handling
d.OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label, d.ID)
for {
data := <-sendBytes
err := d.Send(datachannel.PayloadBinary{Data: data})
if err != nil {
log.Println(err)
}
}
})
startTime := false
timer := time.Now()
var mutex = &sync.Mutex{}
piecesToDo := make(map[int64]bool)
for i := int64(0); i < c.FileInfo.Size; i += 4096 {
piecesToDo[i] = true
}
// Register message handling
d.OnMessage(func(payload datachannel.Payload) {
switch p := payload.(type) {
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
if bytes.Equal(p.Data, []byte("done")) {
c.f.Close()
fmt.Println(time.Since(timer))
}
case *datachannel.PayloadBinary:
if !startTime {
startTime = true
timer = time.Now()
}
var chunk Chunk
errM := json.Unmarshal(p.Data, &chunk)
if errM != nil {
panic(errM)
}
var n int
mutex.Lock()
n, err = c.f.WriteAt(chunk.Bytes, chunk.Location)
mutex.Unlock()
if err != nil {
panic(err)
}
log.Printf("wrote %d bytes to %d", n, chunk.Location)
mutex.Lock()
piecesToDo[chunk.Location] = false
mutex.Unlock()
go func() {
numToDo := 0
thingsToDo := make([]int64, len(piecesToDo))
mutex.Lock()
for k := range piecesToDo {
if piecesToDo[k] {
thingsToDo[numToDo] = k
numToDo++
}
}
mutex.Unlock()
thingsToDo = thingsToDo[:numToDo]
fmt.Println("num to do: ", len(thingsToDo))
if len(thingsToDo) < 10 {
fmt.Println(thingsToDo)
}
}()
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
}
})
})
// Block forever
return
}
func (c *Client) dataChannelSend() (err error) {
recievedBytes := make(chan []byte, 1024)
// Everything below is the pion-WebRTC API! Thanks for using it ❤️.
// Prepare the configuration
config := webrtc.RTCConfiguration{
IceServers: []webrtc.RTCIceServer{
{
URLs: []string{"stun:stun1.l.google.com:19305"},
},
},
}
// Create a new RTCPeerConnection
c.peerConnection, err = webrtc.New(config)
if err != nil {
return
}
// Create a datachannel with label 'data'
c.dataChannel, err = c.peerConnection.CreateDataChannel("data", nil)
if err != nil {
return
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
// Register channel opening handling
c.dataChannel.OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open\n", c.dataChannel.Label, c.dataChannel.ID)
time.Sleep(100 * time.Microsecond)
fmt.Println("sending file")
const BufferSize = 4096
file, err := os.Open("test.txt")
if err != nil {
fmt.Println(err)
return
}
defer file.Close()
buffer := make([]byte, BufferSize)
var location int64
for {
bytesread, err := file.Read(buffer)
if err != nil {
if err != io.EOF {
fmt.Println(err)
}
break
}
mSend := Chunk{
Bytes: buffer[:bytesread],
Location: location,
}
dataToSend, _ := json.Marshal(mSend)
log.Printf("sending %d bytes at %d", bytesread, location)
err = c.dataChannel.Send(datachannel.PayloadBinary{Data: dataToSend})
if err != nil {
log.Println("Could not send on data channel", err.Error())
continue
}
location += int64(bytesread)
time.Sleep(100 * time.Microsecond)
}
log.Println("sending done signal")
err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")})
if err != nil {
log.Println(err)
}
})
// Register the OnMessage to handle incoming messages
c.dataChannel.OnMessage(func(payload datachannel.Payload) {
switch p := payload.(type) {
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), c.dataChannel.Label, string(p.Data))
case *datachannel.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), c.dataChannel.Label, p.Data)
recievedBytes <- p.Data
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), c.dataChannel.Label)
}
})
// Create an offer to send to the browser
offer, err := c.peerConnection.CreateOffer(nil)
if err != nil {
return
}
// Output the offer in base64 so we can paste it in browser
fmt.Println("sending offer")
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "datachannel-offer",
Message: util.Encode(offer),
}.String()).Err()
if err != nil {
return
}
return
}
func main() { func main() {
var sender bool var sender bool
flag.BoolVar(&sender, "sender", false, "sender") flag.BoolVar(&sender, "sender", false, "sender")
flag.Parse() flag.Parse()
c, err := New(sender, "foo") c, err := croc.New(sender, "foo")
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = c.Transfer("test.txt") if sender {
err = c.Send("test.txt")
} else {
err = c.Receive()
}
if err != nil { if err != nil {
panic(err) panic(err)
} }

585
src/croc/croc.go Normal file
View file

@ -0,0 +1,585 @@
package croc
import (
"bytes"
"crypto/elliptic"
"encoding/json"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/go-redis/redis"
"github.com/mattn/go-colorable"
"github.com/pions/webrtc"
"github.com/pions/webrtc/examples/util"
"github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/ice"
"github.com/schollz/pake"
"github.com/schollz/progressbar/v2"
"github.com/sirupsen/logrus"
)
var log = logrus.New()
func init() {
log.SetFormatter(&logrus.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout())
log.SetLevel(logrus.DebugLevel)
}
type Client struct {
// basic setup
redisdb *redis.Client
log *logrus.Entry
IsSender bool
SharedSecret string
Pake *pake.Pake
Filename string
// steps involved in forming relationship
Step1ChannelSecured bool
Step2FileInfoTransfered bool
Step3RecipientReady bool
Step4SendingData bool
// send / receive information
f *os.File
FileInfo FileInfo
chunksToSend []int64
// channel data
incomingMessageChannel <-chan *redis.Message
nameOutChannel string
nameInChannel string
// webrtc connections
peerConnection *webrtc.RTCPeerConnection
dataChannel *webrtc.RTCDataChannel
quit chan bool
}
type Message struct {
Type string `json:"t,omitempty"`
Message string `json:"m,omitempty"`
Bytes []byte `json:"b,omitempty"`
}
type Chunk struct {
Bytes []byte `json:"b,omitempty"`
Location int64 `json:"l,omitempty"`
}
type FileInfo struct {
Name string `json:"n,omitempty"`
Size int64 `json:"s,omitempty"`
ModTime time.Time `json:"m,omitempty"`
IsDir bool `json:"d,omitempty"`
IsCompressed bool `json:"c,omitempty"`
IsEncrypted bool `json:"e,omitempty"`
}
func (m Message) String() string {
b, _ := json.Marshal(m)
return string(b)
}
// New establishes a new connection for transfering files between two instances.
func New(sender bool, sharedSecret string) (c *Client, err error) {
c = new(Client)
// setup basic info
c.IsSender = sender
c.SharedSecret = sharedSecret
c.SharedSecret = sharedSecret
if sender {
c.nameOutChannel = c.SharedSecret + "2"
c.nameInChannel = c.SharedSecret + "1"
} else {
c.nameOutChannel = c.SharedSecret + "1"
c.nameInChannel = c.SharedSecret + "2"
}
// initialize redis for communication in establishing channel
c.redisdb = redis.NewClient(&redis.Options{
Addr: "198.199.67.130:6372",
Password: "",
DB: 4,
WriteTimeout: 1 * time.Hour,
ReadTimeout: 1 * time.Hour,
})
_, err = c.redisdb.Ping().Result()
if err != nil {
return
}
// setup channel for listening
pubsub := c.redisdb.Subscribe(c.nameInChannel)
_, err = pubsub.Receive()
if err != nil {
return
}
c.incomingMessageChannel = pubsub.Channel()
// initialize pake
if c.IsSender {
c.Pake, err = pake.Init([]byte{1, 2, 3}, 1, elliptic.P521(), 1*time.Microsecond)
} else {
c.Pake, err = pake.Init([]byte{1, 2, 3}, 0, elliptic.P521(), 1*time.Microsecond)
}
if err != nil {
return
}
// initialize logger
c.log = log.WithFields(logrus.Fields{
"is": "sender",
})
if !c.IsSender {
c.log = log.WithFields(logrus.Fields{
"is": "recipient",
})
}
return
}
// Send will send the specified file
func (c *Client) Send(fname string) (err error) {
return c.transfer(fname)
}
// Receive will receive a file
func (c *Client) Receive() (err error) {
return c.transfer()
}
func (c *Client) transfer(fnameOption ...string) (err error) {
if len(fnameOption) > 0 {
c.Filename = fnameOption[0]
}
// create channel for quitting
// quit with c.quit <- true
c.quit = make(chan bool)
// if recipient, initialize with sending pake information
c.log.Debug("ready")
if !c.IsSender && !c.Step1ChannelSecured {
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "pake",
Bytes: c.Pake.Bytes(),
}.String()).Err()
if err != nil {
return
}
}
// listen for incoming messages and process them
for {
select {
case <-c.quit:
return
case msg := <-c.incomingMessageChannel:
var m Message
err = json.Unmarshal([]byte(msg.Payload), &m)
if err != nil {
return
}
err = c.processMessage(m)
if err != nil {
return
}
default:
time.Sleep(1 * time.Millisecond)
}
}
return
}
func (c *Client) sendOverRedis() (err error) {
go func() {
bar := progressbar.NewOptions(
int(c.FileInfo.Size),
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetBytes(int(c.FileInfo.Size)),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(1/60*time.Second),
)
c.f, err = os.Open(c.FileInfo.Name)
if err != nil {
panic(err)
}
location := int64(0)
for {
buf := make([]byte, 4096*128)
n, errRead := c.f.Read(buf)
bar.Add(n)
chunk := Chunk{
Bytes: buf[:n],
Location: location,
}
chunkB, _ := json.Marshal(chunk)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "chunk",
Bytes: chunkB,
}.String()).Err()
if err != nil {
panic(err)
}
location += int64(n)
if errRead == io.EOF {
break
}
if errRead != nil {
panic(errRead)
}
}
}()
return
}
func (c *Client) processMessage(m Message) (err error) {
switch m.Type {
case "pake":
notVerified := !c.Pake.IsVerified()
err = c.Pake.Update(m.Bytes)
if err != nil {
return
}
if (notVerified && c.Pake.IsVerified() && !c.IsSender) || !c.Pake.IsVerified() {
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "pake",
Bytes: c.Pake.Bytes(),
}.String()).Err()
}
if c.Pake.IsVerified() {
c.log.Debug(c.Pake.SessionKey())
c.Step1ChannelSecured = true
}
case "fileinfo":
err = json.Unmarshal(m.Bytes, &c.FileInfo)
if err != nil {
return
}
c.log.Debug(c.FileInfo)
c.f, err = os.Create("d.txt")
if err != nil {
return
}
err = c.f.Truncate(c.FileInfo.Size)
if err != nil {
return
}
c.Step2FileInfoTransfered = true
case "recipientready":
c.Step3RecipientReady = true
case "chunk":
var chunk Chunk
err = json.Unmarshal(m.Bytes, &chunk)
if err != nil {
return
}
_, err = c.f.WriteAt(chunk.Bytes, chunk.Location)
c.log.Debug("writing chunk", chunk.Location)
case "datachannel-offer":
offer := util.Decode(m.Message)
c.log.Debug("got offer:", m.Message)
// Set the remote SessionDescription
err = c.peerConnection.SetRemoteDescription(offer)
if err != nil {
return
}
// Sets the LocalDescription, and starts our UDP listeners
var answer webrtc.RTCSessionDescription
answer, err = c.peerConnection.CreateAnswer(nil)
if err != nil {
return
}
// Output the answer in base64 so we can paste it in browser
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "datachannel-answer",
Message: util.Encode(answer),
}.String()).Err()
case "datachannel-answer":
answer := util.Decode(m.Message)
// Apply the answer as the remote description
err = c.peerConnection.SetRemoteDescription(answer)
}
if err != nil {
return
}
err = c.updateState()
return
}
func (c *Client) updateState() (err error) {
if c.IsSender && c.Step1ChannelSecured && !c.Step2FileInfoTransfered {
var fstats os.FileInfo
fstats, err = os.Stat(c.Filename)
if err != nil {
return
}
c.FileInfo = FileInfo{
Name: fstats.Name(),
Size: fstats.Size(),
ModTime: fstats.ModTime(),
IsDir: fstats.IsDir(),
}
b, _ := json.Marshal(c.FileInfo)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "fileinfo",
Bytes: b,
}.String()).Err()
if err != nil {
return
}
c.Step2FileInfoTransfered = true
}
if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientReady {
// TODO: recipient requests the chunk locations (if empty, then should receive all chunks)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "recipientready",
}.String()).Err()
if err != nil {
return
}
c.Step3RecipientReady = true
// start receiving data
go func() {
err = c.dataChannelReceive()
if err != nil {
panic(err)
}
}()
}
if c.IsSender && c.Step3RecipientReady && !c.Step4SendingData {
c.log.Debug("start sending data!")
c.Step4SendingData = true
go func() {
err = c.dataChannelSend()
if err != nil {
panic(err)
}
}()
}
return
}
func (c *Client) dataChannelReceive() (err error) {
// Prepare the configuration
config := webrtc.RTCConfiguration{
IceServers: []webrtc.RTCIceServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
// Create a new RTCPeerConnection
c.peerConnection, err = webrtc.New(config)
if err != nil {
return
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
// Register data channel creation handling
c.peerConnection.OnDataChannel(func(d *webrtc.RTCDataChannel) {
fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID)
sendBytes := make(chan []byte, 1024)
// Register channel opening handling
d.OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label, d.ID)
for {
data := <-sendBytes
err := d.Send(datachannel.PayloadBinary{Data: data})
if err != nil {
c.log.Debug(err)
}
}
})
startTime := false
timer := time.Now()
var mutex = &sync.Mutex{}
piecesToDo := make(map[int64]bool)
for i := int64(0); i < c.FileInfo.Size; i += 4096 {
piecesToDo[i] = true
}
// Register message handling
d.OnMessage(func(payload datachannel.Payload) {
switch p := payload.(type) {
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
if bytes.Equal(p.Data, []byte("done")) {
c.f.Close()
c.log.Debug(time.Since(timer))
}
case *datachannel.PayloadBinary:
if !startTime {
startTime = true
timer = time.Now()
}
var chunk Chunk
errM := json.Unmarshal(p.Data, &chunk)
if errM != nil {
panic(errM)
}
var n int
mutex.Lock()
n, err = c.f.WriteAt(chunk.Bytes, chunk.Location)
mutex.Unlock()
if err != nil {
panic(err)
}
c.log.Debugf("wrote %d bytes to %d", n, chunk.Location)
mutex.Lock()
piecesToDo[chunk.Location] = false
mutex.Unlock()
go func() {
numToDo := 0
thingsToDo := make([]int64, len(piecesToDo))
mutex.Lock()
for k := range piecesToDo {
if piecesToDo[k] {
thingsToDo[numToDo] = k
numToDo++
}
}
mutex.Unlock()
thingsToDo = thingsToDo[:numToDo]
c.log.Debug("num to do: ", len(thingsToDo))
if len(thingsToDo) < 10 {
c.log.Debug(thingsToDo)
}
}()
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
}
})
})
// Block forever
return
}
func (c *Client) dataChannelSend() (err error) {
recievedBytes := make(chan []byte, 1024)
// Everything below is the pion-WebRTC API! Thanks for using it ❤️.
// Prepare the configuration
config := webrtc.RTCConfiguration{
IceServers: []webrtc.RTCIceServer{
{
URLs: []string{"stun:stun1.l.google.com:19305"},
},
},
}
// Create a new RTCPeerConnection
c.peerConnection, err = webrtc.New(config)
if err != nil {
return
}
// Create a datachannel with label 'data'
c.dataChannel, err = c.peerConnection.CreateDataChannel("data", nil)
if err != nil {
return
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
// Register channel opening handling
c.dataChannel.OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open\n", c.dataChannel.Label, c.dataChannel.ID)
time.Sleep(100 * time.Microsecond)
c.log.Debug("sending file")
const BufferSize = 4096
file, err := os.Open("test.txt")
if err != nil {
c.log.Debug(err)
return
}
defer file.Close()
buffer := make([]byte, BufferSize)
var location int64
for {
bytesread, err := file.Read(buffer)
if err != nil {
if err != io.EOF {
c.log.Debug(err)
}
break
}
mSend := Chunk{
Bytes: buffer[:bytesread],
Location: location,
}
dataToSend, _ := json.Marshal(mSend)
c.log.Debugf("sending %d bytes at %d", bytesread, location)
err = c.dataChannel.Send(datachannel.PayloadBinary{Data: dataToSend})
if err != nil {
c.log.Debug("Could not send on data channel", err.Error())
continue
}
location += int64(bytesread)
time.Sleep(100 * time.Microsecond)
}
c.log.Debug("sending done signal")
err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")})
if err != nil {
c.log.Debug(err)
}
})
// Register the OnMessage to handle incoming messages
c.dataChannel.OnMessage(func(payload datachannel.Payload) {
switch p := payload.(type) {
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), c.dataChannel.Label, string(p.Data))
case *datachannel.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), c.dataChannel.Label, p.Data)
recievedBytes <- p.Data
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), c.dataChannel.Label)
}
})
// Create an offer to send to the browser
offer, err := c.peerConnection.CreateOffer(nil)
if err != nil {
return
}
// Output the offer in base64 so we can paste it in browser
c.log.Debug("sending offer")
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "datachannel-offer",
Message: util.Encode(offer),
}.String()).Err()
if err != nil {
return
}
return
}