0
0
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

use latest api

This commit is contained in:
Zack Scholl 2019-04-04 13:45:18 -07:00
parent b6e3679c54
commit 082c5e80dd

View file

@ -3,6 +3,7 @@ package croc
import ( import (
"bytes" "bytes"
"crypto/elliptic" "crypto/elliptic"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -17,9 +18,6 @@ import (
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/mattn/go-colorable" "github.com/mattn/go-colorable"
"github.com/pions/webrtc" "github.com/pions/webrtc"
"github.com/pions/webrtc/examples/util"
"github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/ice"
"github.com/schollz/croc/v5/src/utils" "github.com/schollz/croc/v5/src/utils"
"github.com/schollz/pake" "github.com/schollz/pake"
"github.com/schollz/progressbar/v2" "github.com/schollz/progressbar/v2"
@ -65,8 +63,8 @@ type Client struct {
nameInChannel string nameInChannel string
// webrtc connections // webrtc connections
peerConnection *webrtc.RTCPeerConnection peerConnection *webrtc.PeerConnection
dataChannel *webrtc.RTCDataChannel dataChannel *webrtc.DataChannel
quit chan bool quit chan bool
} }
@ -360,7 +358,11 @@ func (c *Client) processMessage(m Message) (err error) {
_, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) _, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location)
c.log.Debug("writing chunk", chunk.Location) c.log.Debug("writing chunk", chunk.Location)
case "datachannel-offer": case "datachannel-offer":
offer := util.Decode(m.Message) offer := webrtc.SessionDescription{}
err = Decode(m.Message, &offer)
if err != nil {
return
}
c.log.Debug("got offer:", m.Message) c.log.Debug("got offer:", m.Message)
// Set the remote SessionDescription // Set the remote SessionDescription
err = c.peerConnection.SetRemoteDescription(offer) err = c.peerConnection.SetRemoteDescription(offer)
@ -369,7 +371,7 @@ func (c *Client) processMessage(m Message) (err error) {
} }
// Sets the LocalDescription, and starts our UDP listeners // Sets the LocalDescription, and starts our UDP listeners
var answer webrtc.RTCSessionDescription var answer webrtc.SessionDescription
answer, err = c.peerConnection.CreateAnswer(nil) answer, err = c.peerConnection.CreateAnswer(nil)
if err != nil { if err != nil {
return return
@ -378,10 +380,15 @@ func (c *Client) processMessage(m Message) (err error) {
// Output the answer in base64 so we can paste it in browser // Output the answer in base64 so we can paste it in browser
err = c.redisdb.Publish(c.nameOutChannel, Message{ err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "datachannel-answer", Type: "datachannel-answer",
Message: util.Encode(answer), Message: Encode(answer),
}.String()).Err() }.String()).Err()
case "datachannel-answer": case "datachannel-answer":
answer := util.Decode(m.Message) var answer webrtc.SessionDescription
err = Decode(m.Message, &answer)
if err != nil {
return
}
// Apply the answer as the remote description // Apply the answer as the remote description
err = c.peerConnection.SetRemoteDescription(answer) err = c.peerConnection.SetRemoteDescription(answer)
case "close-sender": case "close-sender":
@ -522,8 +529,8 @@ func (c *Client) updateState() (err error) {
func (c *Client) dataChannelReceive() (err error) { func (c *Client) dataChannelReceive() (err error) {
// Prepare the configuration // Prepare the configuration
config := webrtc.RTCConfiguration{ config := webrtc.Configuration{
IceServers: []webrtc.RTCIceServer{ ICEServers: []webrtc.ICEServer{
{ {
URLs: []string{"stun:stun.l.google.com:19302"}, URLs: []string{"stun:stun.l.google.com:19302"},
}, },
@ -531,87 +538,70 @@ func (c *Client) dataChannelReceive() (err error) {
} }
// Create a new RTCPeerConnection // Create a new RTCPeerConnection
c.peerConnection, err = webrtc.New(config) c.peerConnection, err = webrtc.NewPeerConnection(config)
if err != nil { if err != nil {
return return
} }
// Set the handler for ICE connection state // Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected // This will notify you when the peer has connected/disconnected
c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) { c.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
}) })
// Register data channel creation handling // Register data channel creation handling
c.peerConnection.OnDataChannel(func(d *webrtc.RTCDataChannel) { c.peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID) fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID)
sendBytes := make(chan []byte, 1024) })
// Register channel opening handling
d.OnOpen(func() {
log.Debugf("Data channel '%s'-'%d' open", d.Label, d.ID)
for {
data := <-sendBytes
err := d.Send(datachannel.PayloadBinary{Data: data})
if err != nil {
c.log.Debug(err)
}
}
})
startTime := false sendBytes := make(chan []byte, 1024)
timer := time.Now() // Register channel opening handling
var mutex = &sync.Mutex{} c.dataChannel.OnOpen(func() {
piecesToDo := make(map[int64]bool) log.Debugf("Data channel '%s'-'%d' open", c.dataChannel.Label, c.dataChannel.ID)
for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += 4096 { for {
piecesToDo[i] = true data := <-sendBytes
err := c.dataChannel.Send(data)
if err != nil {
c.log.Debug(err)
}
} }
// Register message handling })
bar := progressbar.NewOptions64(
c.FilesToTransfer[c.FilesToTransferCurrentNum].Size,
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetBytes64(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(1/60*time.Second),
)
d.OnMessage(func(payload datachannel.Payload) {
switch p := payload.(type) { startTime := false
case *datachannel.PayloadString: timer := time.Now()
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data)) var mutex = &sync.Mutex{}
if bytes.Equal(p.Data, []byte("done")) { piecesToDo := make(map[int64]bool)
c.CurrentFile.Close() for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += 4096 {
c.log.Debug(time.Since(timer)) piecesToDo[i] = true
c.log.Debug("telling transfer is over") }
err = c.redisdb.Publish(c.nameOutChannel, Message{ // Register message handling
Type: "close-sender", bar := progressbar.NewOptions64(
}.String()).Err() c.FilesToTransfer[c.FilesToTransferCurrentNum].Size,
if err != nil { progressbar.OptionSetRenderBlankState(true),
panic(err) progressbar.OptionSetBytes64(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size),
} progressbar.OptionSetWriter(os.Stderr),
} progressbar.OptionThrottle(1/60*time.Second),
case *datachannel.PayloadBinary: )
if !startTime { c.dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
startTime = true
timer = time.Now() if !startTime {
} startTime = true
var chunk Chunk timer = time.Now()
errM := json.Unmarshal(p.Data, &chunk) }
if errM != nil { var chunk Chunk
panic(errM) errM := json.Unmarshal(msg.Data, &chunk)
} if errM != nil {
var n int panic(errM)
mutex.Lock() }
n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) var n int
mutex.Unlock() mutex.Lock()
if err != nil { n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location)
panic(err) mutex.Unlock()
} if err != nil {
// c.log.Debugf("wrote %d bytes to %d", n, chunk.Location) panic(err)
bar.Add(n) }
default: // c.log.Debugf("wrote %d bytes to %d", n, chunk.Location)
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label) bar.Add(n)
}
})
}) })
// Block forever // Block forever
@ -619,20 +609,19 @@ func (c *Client) dataChannelReceive() (err error) {
} }
func (c *Client) dataChannelSend() (err error) { func (c *Client) dataChannelSend() (err error) {
recievedBytes := make(chan []byte, 1024)
// Everything below is the pion-WebRTC API! Thanks for using it ❤️. // Everything below is the pion-WebRTC API! Thanks for using it ❤️.
// Prepare the configuration // Prepare the configuration
config := webrtc.RTCConfiguration{ config := webrtc.Configuration{
IceServers: []webrtc.RTCIceServer{ ICEServers: []webrtc.ICEServer{
{ {
URLs: []string{"stun:stun1.l.google.com:19305"}, URLs: []string{"stun:stun.l.google.com:19302"},
}, },
}, },
} }
// Create a new RTCPeerConnection // Create a new RTCPeerConnection
c.peerConnection, err = webrtc.New(config) c.peerConnection, err = webrtc.NewPeerConnection(config)
if err != nil { if err != nil {
return return
} }
@ -645,7 +634,7 @@ func (c *Client) dataChannelSend() (err error) {
// Set the handler for ICE connection state // Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected // This will notify you when the peer has connected/disconnected
c.peerConnection.OnICEConnectionStateChange(func(connectionState ice.ConnectionState) { c.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
}) })
@ -691,7 +680,7 @@ func (c *Client) dataChannelSend() (err error) {
dataToSend, _ := json.Marshal(mSend) dataToSend, _ := json.Marshal(mSend)
bar.Add(bytesread) bar.Add(bytesread)
err = c.dataChannel.Send(datachannel.PayloadBinary{Data: dataToSend}) err = c.dataChannel.Send(dataToSend)
if err != nil { if err != nil {
c.log.Debug("Could not send on data channel", err.Error()) c.log.Debug("Could not send on data channel", err.Error())
continue continue
@ -710,7 +699,7 @@ func (c *Client) dataChannelSend() (err error) {
if c.dataChannel == nil { if c.dataChannel == nil {
return return
} }
err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")}) err = c.dataChannel.Send([]byte("done"))
if err != nil { if err != nil {
c.log.Debug(err) c.log.Debug(err)
} }
@ -720,16 +709,8 @@ func (c *Client) dataChannelSend() (err error) {
}) })
// Register the OnMessage to handle incoming messages // Register the OnMessage to handle incoming messages
c.dataChannel.OnMessage(func(payload datachannel.Payload) { c.dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
switch p := payload.(type) { fmt.Printf("Message from DataChannel '%s': '%s'\n", c.dataChannel.Label(), string(msg.Data))
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), c.dataChannel.Label, string(p.Data))
case *datachannel.PayloadBinary:
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), c.dataChannel.Label, p.Data)
recievedBytes <- p.Data
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), c.dataChannel.Label)
}
}) })
// Create an offer to send to the browser // Create an offer to send to the browser
@ -742,7 +723,7 @@ func (c *Client) dataChannelSend() (err error) {
c.log.Debug("sending offer") c.log.Debug("sending offer")
err = c.redisdb.Publish(c.nameOutChannel, Message{ err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "datachannel-offer", Type: "datachannel-offer",
Message: util.Encode(offer), Message: Encode(offer),
}.String()).Err() }.String()).Err()
if err != nil { if err != nil {
return return
@ -788,3 +769,26 @@ func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) {
} }
return return
} }
// Encode encodes the input in base64
// It can optionally zip the input before encoding
func Encode(obj interface{}) string {
b, err := json.Marshal(obj)
if err != nil {
panic(err)
}
return base64.StdEncoding.EncodeToString(b)
}
// Decode decodes the input from base64
// It can optionally unzip the input after decoding
func Decode(in string, obj interface{}) (err error) {
b, err := base64.StdEncoding.DecodeString(in)
if err != nil {
return
}
err = json.Unmarshal(b, obj)
return
}