0
0
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

dont erase file

This commit is contained in:
Zack Scholl 2019-04-10 07:25:29 -07:00
parent 1bff6e28b1
commit ccd8ca3db3
35 changed files with 9 additions and 1985 deletions

View file

@ -1,8 +1,8 @@
package bench
import (
"github.com/schollz/croc/v5/pkg/session/bench"
"github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/src/webrtc/pkg/session/bench"
"github.com/schollz/croc/v5/src/webrtc/pkg/session/common"
log "github.com/sirupsen/logrus"
"gopkg.in/urfave/cli.v1"
)

View file

@ -6,7 +6,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/schollz/croc/v5/pkg/session/receiver"
"github.com/schollz/croc/v5/src/webrtc/pkg/session/receiver"
"gopkg.in/urfave/cli.v1"
)

View file

@ -4,8 +4,8 @@ import (
"fmt"
"os"
"github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/pkg/session/sender"
"github.com/schollz/croc/v5/src/webrtc/pkg/session/common"
"github.com/schollz/croc/v5/src/webrtc/pkg/session/sender"
log "github.com/sirupsen/logrus"
"gopkg.in/urfave/cli.v1"
)

View file

@ -1,47 +0,0 @@
package buffer
import (
"bytes"
"sync"
)
// Buffer is a threadsafe buffer
type Buffer struct {
b bytes.Buffer
m sync.Mutex
}
// Read in a thread-safe way
func (b *Buffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}
// ReadString in a thread-safe way
func (b *Buffer) ReadString(delim byte) (line string, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.ReadString(delim)
}
// Write in a thread-safe way
func (b *Buffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
// WriteString in a thread-safe way
func (b *Buffer) WriteString(s string) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.WriteString(s)
}
// String in a thread-safe way
func (b *Buffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}

View file

@ -1,126 +0,0 @@
package session
import (
"io"
"log"
"os"
"github.com/pion/webrtc/v2"
"github.com/schollz/croc/v5/pkg/stats"
"github.com/schollz/croc/v5/pkg/utils"
)
// SDPProvider returns the SDP input
func (s *Session) SDPProvider() io.Reader {
return s.sdpInput
}
// CompletionHandler to be called when transfer is done
type CompletionHandler func()
// Session contains common elements to perform send/receive
type Session struct {
Done chan struct{}
NetworkStats *stats.Stats
sdpInput io.Reader
sdpOutput io.Writer
peerConnection *webrtc.PeerConnection
onCompletion CompletionHandler
}
// New creates a new Session
func New(sdpInput io.Reader, sdpOutput io.Writer) Session {
log.Println("making new channel")
if sdpInput == nil {
sdpInput = os.Stdin
}
if sdpOutput == nil {
sdpOutput = os.Stdout
}
return Session{
sdpInput: sdpInput,
sdpOutput: sdpOutput,
Done: make(chan struct{}),
NetworkStats: stats.New(),
}
}
// CreateConnection prepares a WebRTC connection
func (s *Session) CreateConnection(onConnectionStateChange func(connectionState webrtc.ICEConnectionState)) error {
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
// Create a new RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(config)
if err != nil {
return err
}
s.peerConnection = peerConnection
peerConnection.OnICEConnectionStateChange(onConnectionStateChange)
return nil
}
// SetSDP sets the SDP
func (s *Session) SetSDP(encoded string) error {
var sdp webrtc.SessionDescription
err := utils.Decode(encoded, &sdp)
if err != nil {
return err
}
return s.peerConnection.SetRemoteDescription(sdp)
}
// CreateDataChannel that will be used to send data
func (s *Session) CreateDataChannel(c *webrtc.DataChannelInit) (*webrtc.DataChannel, error) {
return s.peerConnection.CreateDataChannel("data", c)
}
// OnDataChannel sets an OnDataChannel handler
func (s *Session) OnDataChannel(handler func(d *webrtc.DataChannel)) {
s.peerConnection.OnDataChannel(handler)
}
// CreateAnswer set the local description and print the answer SDP
func (s *Session) CreateAnswer() (string, error) {
// Create an answer
answer, err := s.peerConnection.CreateAnswer(nil)
if err != nil {
return "", err
}
return s.createSessionDescription(answer)
}
// CreateOffer set the local description and print the offer SDP
func (s *Session) CreateOffer() (string, error) {
// Create an offer
answer, err := s.peerConnection.CreateOffer(nil)
if err != nil {
return "", err
}
return s.createSessionDescription(answer)
}
// createSessionDescription set the local description and print the SDP
func (s *Session) createSessionDescription(desc webrtc.SessionDescription) (string, error) {
// Sets the LocalDescription, and starts our UDP listeners
if err := s.peerConnection.SetLocalDescription(desc); err != nil {
return "", err
}
desc.SDP = utils.StripSDP(desc.SDP)
// Output the SDP in base64 so we can paste it in browser
return utils.Encode(desc)
}
// OnCompletion is called when session ends
func (s *Session) OnCompletion() {
if s.onCompletion != nil {
s.onCompletion()
}
}

View file

@ -1,24 +0,0 @@
package session
import (
"bufio"
"bytes"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_New(t *testing.T) {
assert := assert.New(t)
input := bufio.NewReader(&bytes.Buffer{})
output := bufio.NewWriter(&bytes.Buffer{})
sess := New(nil, nil)
assert.Equal(os.Stdin, sess.sdpInput)
assert.Equal(os.Stdout, sess.sdpOutput)
sess = New(input, output)
assert.Equal(input, sess.sdpInput)
assert.Equal(output, sess.sdpOutput)
}

View file

@ -35,7 +35,7 @@ func main() {
// PathToFile: "../wskeystore/README.md",
// PathToFile: "./src/croc/croc.go",
// PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"},
PathToFiles: []string{"croc.exe","croc2.exe"},
PathToFiles: []string{"croc3.exe", "croc2.exe"},
KeepPathInRemote: false,
})
} else {

View file

@ -1,61 +0,0 @@
package bench
import (
"sync"
"time"
internalSess "github.com/schollz/croc/v5/internal/session"
"github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/pkg/stats"
)
const (
bufferThresholdDefault = 64 * 1024 // 64kB
testDurationDefault = 20 * time.Second
testDurationErrorDefault = (testDurationDefault * 10) / 7
)
// Session is a benchmark session
type Session struct {
sess internalSess.Session
master bool
wg sync.WaitGroup
// Settings
bufferThreshold uint64
testDuration time.Duration
testDurationError time.Duration
startPhase2 chan struct{}
uploadNetworkStats *stats.Stats
downloadDone chan bool
downloadNetworkStats *stats.Stats
}
// New creates a new sender session
func new(s internalSess.Session, isMaster bool) *Session {
return &Session{
sess: s,
master: isMaster,
bufferThreshold: bufferThresholdDefault,
testDuration: testDurationDefault,
testDurationError: testDurationErrorDefault,
startPhase2: make(chan struct{}),
downloadDone: make(chan bool),
uploadNetworkStats: stats.New(),
downloadNetworkStats: stats.New(),
}
}
// Config contains custom configuration for a session
type Config struct {
common.Configuration
Master bool // Will create the SDP offer ?
}
// NewWith createa a new benchmark Session with custom configuration
func NewWith(c Config) *Session {
return new(internalSess.New(c.SDPProvider, c.SDPOutput), c.Master)
}

View file

@ -1,86 +0,0 @@
package bench
import (
"testing"
"time"
"github.com/schollz/croc/v5/internal/buffer"
"github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/pkg/utils"
"github.com/stretchr/testify/assert"
)
func Test_New(t *testing.T) {
assert := assert.New(t)
sess := NewWith(Config{
Master: false,
})
assert.NotNil(sess)
assert.Equal(false, sess.master)
}
func Test_Bench(t *testing.T) {
assert := assert.New(t)
sessionSDPProvider := &buffer.Buffer{}
sessionSDPOutput := &buffer.Buffer{}
sessionMasterSDPProvider := &buffer.Buffer{}
sessionMasterSDPOutput := &buffer.Buffer{}
testDuration := 2 * time.Second
sess := NewWith(Config{
Configuration: common.Configuration{
SDPProvider: sessionSDPProvider,
SDPOutput: sessionSDPOutput,
},
Master: false,
})
assert.NotNil(sess)
sess.testDuration = testDuration
sess.testDurationError = (testDuration * 10) / 8
sessMaster := NewWith(Config{
Configuration: common.Configuration{
SDPProvider: sessionMasterSDPProvider,
SDPOutput: sessionMasterSDPOutput,
},
Master: true,
})
assert.NotNil(sessMaster)
sessMaster.testDuration = testDuration
sessMaster.testDurationError = (testDuration * 10) / 8
masterDone := make(chan struct{})
go func() {
defer close(masterDone)
err := sessMaster.Start()
assert.Nil(err)
}()
sdp, err := utils.MustReadStream(sessionMasterSDPOutput)
assert.Nil(err)
sdp += "\n"
n, err := sessionSDPProvider.WriteString(sdp)
assert.Nil(err)
assert.Equal(len(sdp), n)
slaveDone := make(chan struct{})
go func() {
defer close(slaveDone)
err := sess.Start()
assert.Nil(err)
}()
// Get SDP from slave and send it to the master
sdp, err = utils.MustReadStream(sessionSDPOutput)
assert.Nil(err)
n, err = sessionMasterSDPProvider.WriteString(sdp)
assert.Nil(err)
assert.Equal(len(sdp), n)
<-masterDone
<-slaveDone
}

View file

@ -1,24 +0,0 @@
package bench
const (
// Used as upload channel for master (and download channel for non-master)
// 43981 -> 0xABCD
dataChannel1ID = uint16(43981)
// Used as download channel for master (and upload channel for non-master)
// 61185 -> 0xef01
dataChannel2ID = uint16(61185)
)
func (s *Session) uploadChannelID() uint16 {
if s.master {
return dataChannel1ID
}
return dataChannel2ID
}
func (s *Session) downloadChannelID() uint16 {
if s.master {
return dataChannel2ID
}
return dataChannel1ID
}

View file

@ -1,28 +0,0 @@
package bench
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_IDs(t *testing.T) {
assert := assert.New(t)
sess := NewWith(Config{
Master: false,
})
assert.NotNil(sess)
assert.Equal(false, sess.master)
sessMaster := NewWith(Config{
Master: true,
})
assert.NotNil(sessMaster)
assert.Equal(true, sessMaster.master)
assert.Equal(sessMaster.downloadChannelID(), sess.uploadChannelID())
assert.Equal(sessMaster.uploadChannelID(), sess.downloadChannelID())
assert.NotEqual(sessMaster.downloadChannelID(), sess.downloadChannelID())
}

View file

@ -1,62 +0,0 @@
package bench
import (
"fmt"
"github.com/pion/webrtc/v2"
log "github.com/sirupsen/logrus"
)
// Start initializes the connection and the benchmark
func (s *Session) Start() error {
if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil {
log.Errorln(err)
return err
}
s.sess.OnDataChannel(s.onNewDataChannel())
if err := s.createUploadDataChannel(); err != nil {
log.Errorln(err)
return err
}
s.wg.Add(2) // Download + Upload
if s.master {
if err := s.createMasterSession(); err != nil {
return err
}
} else {
if err := s.createSlaveSession(); err != nil {
return err
}
}
// Wait for benchmarks to be done
s.wg.Wait()
fmt.Printf("Upload: %s\n", s.uploadNetworkStats.String())
fmt.Printf("Download: %s\n", s.downloadNetworkStats.String())
s.sess.OnCompletion()
return nil
}
func (s *Session) initDataChannel(channelID *uint16) (*webrtc.DataChannel, error) {
ordered := true
maxPacketLifeTime := uint16(10000)
return s.sess.CreateDataChannel(&webrtc.DataChannelInit{
Ordered: &ordered,
MaxPacketLifeTime: &maxPacketLifeTime,
ID: channelID,
})
}
func (s *Session) createUploadDataChannel() error {
channelID := s.uploadChannelID()
dataChannel, err := s.initDataChannel(&channelID)
if err != nil {
return err
}
dataChannel.OnOpen(s.onOpenUploadHandler(dataChannel))
return nil
}

View file

@ -1,59 +0,0 @@
package bench
import (
"github.com/pion/webrtc/v2"
log "github.com/sirupsen/logrus"
)
// Useful for unit tests
func (s *Session) onNewDataChannelHelper(name string, channelID uint16, d *webrtc.DataChannel) {
log.Tracef("New DataChannel %s (id: %x)\n", name, channelID)
switch channelID {
case s.downloadChannelID():
log.Traceln("Created Download data channel")
d.OnClose(s.onCloseHandlerDownload())
go s.onOpenHandlerDownload(d)()
case s.uploadChannelID():
log.Traceln("Created Upload data channel")
default:
log.Warningln("Created unknown data channel")
}
}
func (s *Session) onNewDataChannel() func(d *webrtc.DataChannel) {
return func(d *webrtc.DataChannel) {
if d == nil || d.ID() == nil {
return
}
s.onNewDataChannelHelper(d.Label(), *d.ID(), d)
}
}
func (s *Session) createMasterSession() error {
if err := s.sess.CreateOffer(); err != nil {
log.Errorln(err)
return err
}
if err := s.sess.ReadSDP(); err != nil {
log.Errorln(err)
return err
}
return nil
}
func (s *Session) createSlaveSession() error {
if err := s.sess.ReadSDP(); err != nil {
log.Errorln(err)
return err
}
if err := s.sess.CreateAnswer(); err != nil {
log.Errorln(err)
return err
}
return nil
}

View file

@ -1,28 +0,0 @@
package bench
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_OnNewDataChannel(t *testing.T) {
assert := assert.New(t)
testDuration := 2 * time.Second
sess := NewWith(Config{
Master: false,
})
assert.NotNil(sess)
sess.testDuration = testDuration
sess.testDurationError = (testDuration * 10) / 8
sess.onNewDataChannel()(nil)
testID := sess.uploadChannelID()
sess.onNewDataChannelHelper("", testID, nil)
testID = sess.uploadChannelID() | sess.downloadChannelID()
sess.onNewDataChannelHelper("", testID, nil)
}

View file

@ -1,12 +0,0 @@
package bench
import (
"github.com/pion/webrtc/v2"
log "github.com/sirupsen/logrus"
)
func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) {
return func(connectionState webrtc.ICEConnectionState) {
log.Infof("ICE Connection State has changed: %s\n", connectionState.String())
}
}

View file

@ -1,59 +0,0 @@
package bench
import (
"fmt"
"time"
"github.com/pion/webrtc/v2"
log "github.com/sirupsen/logrus"
)
func (s *Session) onOpenHandlerDownload(dc *webrtc.DataChannel) func() {
// If master, wait for the upload to complete
// If not master, close the channel so the upload can start
return func() {
if s.master {
<-s.startPhase2
}
log.Debugf("Starting to download data...")
defer log.Debugf("Stopped downloading data...")
s.downloadNetworkStats.Start()
// Useful for unit tests
if dc != nil {
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
fmt.Printf("Downloading at %.2f MB/s\r", s.downloadNetworkStats.Bandwidth())
s.downloadNetworkStats.AddBytes(uint64(len(msg.Data)))
})
} else {
log.Warningln("No DataChannel provided")
}
timeoutErr := time.After(s.testDurationError)
fmt.Printf("Downloading random datas ... (%d s)\n", int(s.testDuration.Seconds()))
select {
case <-s.downloadDone:
case <-timeoutErr:
log.Error("Time'd out")
}
log.Traceln("Done downloading")
if !s.master {
close(s.startPhase2)
}
fmt.Printf("\n")
s.downloadNetworkStats.Stop()
s.wg.Done()
}
}
func (s *Session) onCloseHandlerDownload() func() {
return func() {
close(s.downloadDone)
}
}

View file

@ -1,75 +0,0 @@
package bench
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/webrtc/v2"
log "github.com/sirupsen/logrus"
)
func (s *Session) onOpenUploadHandler(dc *webrtc.DataChannel) func() {
return func() {
if !s.master {
<-s.startPhase2
}
log.Debugln("Starting to upload data...")
defer log.Debugln("Stopped uploading data...")
lenToken := uint64(4096)
token := make([]byte, lenToken)
if _, err := rand.Read(token); err != nil {
log.Fatalln("Err: ", err)
}
s.uploadNetworkStats.Start()
// Useful for unit tests
if dc != nil {
dc.SetBufferedAmountLowThreshold(s.bufferThreshold)
dc.OnBufferedAmountLow(func() {
if err := dc.Send(token); err == nil {
fmt.Printf("Uploading at %.2f MB/s\r", s.uploadNetworkStats.Bandwidth())
s.uploadNetworkStats.AddBytes(lenToken)
}
})
} else {
log.Warningln("No DataChannel provided")
}
fmt.Printf("Uploading random datas ... (%d s)\n", int(s.testDuration.Seconds()))
timeout := time.After(s.testDuration)
timeoutErr := time.After(s.testDurationError)
if dc != nil {
// Ignore potential error
_ = dc.Send(token)
}
SENDING_LOOP:
for {
select {
case <-timeoutErr:
log.Error("Time'd out")
break SENDING_LOOP
case <-timeout:
log.Traceln("Done uploading")
break SENDING_LOOP
}
}
fmt.Printf("\n")
s.uploadNetworkStats.Stop()
if dc != nil {
dc.Close()
}
if s.master {
close(s.startPhase2)
}
s.wg.Done()
}
}

View file

@ -1,38 +0,0 @@
package bench
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_TimeoutDownload(t *testing.T) {
assert := assert.New(t)
sess := NewWith(Config{
Master: false,
})
assert.NotNil(sess)
assert.Equal(false, sess.master)
sess.testDurationError = 2 * time.Millisecond
sess.wg.Add(1)
sess.onOpenHandlerDownload(nil)()
}
func Test_TimeoutUpload(t *testing.T) {
assert := assert.New(t)
sess := NewWith(Config{
Master: true,
})
assert.NotNil(sess)
assert.Equal(true, sess.master)
sess.testDurationError = 2 * time.Millisecond
sess.wg.Add(1)
sess.onOpenUploadHandler(nil)()
}

View file

@ -1,14 +0,0 @@
package common
import (
"io"
"github.com/schollz/croc/v5/internal/session"
)
// Configuration common to both Sender and Receiver session
type Configuration struct {
SDPProvider io.Reader // The SDP reader
SDPOutput io.Writer // The SDP writer
OnCompletion session.CompletionHandler // Handler to call on session completion
}

View file

@ -1,156 +0,0 @@
package receiver
import (
"fmt"
"io"
"os"
"github.com/pion/webrtc/v2"
internalSess "github.com/schollz/croc/v5/internal/session"
"github.com/schollz/croc/v5/pkg/session/common"
log "github.com/sirupsen/logrus"
)
// Session is a receiver session
type Session struct {
sess internalSess.Session
msgChannel chan webrtc.DataChannelMessage
initialized bool
}
func new(s internalSess.Session) *Session {
return &Session{
sess: s,
msgChannel: make(chan webrtc.DataChannelMessage, 4096*2),
initialized: false,
}
}
// New creates a new receiver session
func New() *Session {
return new(internalSess.New(nil, nil))
}
// Config contains custom configuration for a session
type Config struct {
common.Configuration
Stream io.Writer // The Stream to write to
}
// NewWith createa a new receiver Session with custom configuration
func NewWith(c Config) *Session {
return new(internalSess.New(c.SDPProvider, c.SDPOutput))
}
func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) {
return func(connectionState webrtc.ICEConnectionState) {
log.Infof("ICE Connection State has changed: %s\n", connectionState.String())
}
}
func (s *Session) onMessage() func(msg webrtc.DataChannelMessage) {
return func(msg webrtc.DataChannelMessage) {
// Store each message in the message channel
s.msgChannel <- msg
}
}
func (s *Session) onClose() func() {
return func() {
close(s.sess.Done)
}
}
// // Initialize creates the connection, the datachannel and creates the offer
// func (s *Session) Initialize() error {
// if s.initialized {
// return nil
// }
// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil {
// log.Errorln(err)
// return err
// }
// s.createDataHandler()
// if err := s.sess.ReadSDP(); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.sess.CreateAnswer(); err != nil {
// log.Errorln(err)
// return err
// }
// s.initialized = true
// return nil
// }
// // Start initializes the connection and the file transfer
// func (s *Session) Start() error {
// if err := s.Initialize(); err != nil {
// return err
// }
// // Handle data
// s.receiveData()
// s.sess.OnCompletion()
// return nil
// }
func (s *Session) CreateDataHandler() {
s.sess.OnDataChannel(func(d *webrtc.DataChannel) {
log.Debugf("New DataChannel %s %d\n", d.Label(), d.ID())
s.sess.NetworkStats.Start()
d.OnMessage(s.onMessage())
d.OnClose(s.onClose())
})
}
func (s *Session) ReceiveData(pathToFile string) {
s.receiveData(pathToFile)
s.sess.OnCompletion()
}
func (s *Session) receiveData(pathToFile string) error {
log.Infoln("Starting to receive data...")
f, err := os.OpenFile(pathToFile, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
return err
}
defer func() {
log.Infoln("Stopped receiving data...")
f.Close()
}()
// Consume the message channel, until done
// Does not stop on error
for {
select {
case <-s.sess.Done:
s.sess.NetworkStats.Stop()
fmt.Printf("\nNetwork: %s\n", s.sess.NetworkStats.String())
return nil
case msg := <-s.msgChannel:
n, err := f.Write(msg.Data)
if err != nil {
return err
} else {
currentSpeed := s.sess.NetworkStats.Bandwidth()
fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed)
s.sess.NetworkStats.AddBytes(uint64(n))
}
}
}
return nil
}
func (s *Session) CreateConnection() (err error) {
return s.sess.CreateConnection(s.onConnectionStateChange())
}
func (s *Session) SetSDP(sdp string) error {
return s.sess.SetSDP(sdp)
}
func (s *Session) CreateAnswer() (string, error) {
return s.sess.CreateAnswer()
}

View file

@ -1,19 +0,0 @@
package receiver
import (
"bufio"
"bytes"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_New(t *testing.T) {
assert := assert.New(t)
output := bufio.NewWriter(&bytes.Buffer{})
sess := New(output)
assert.NotNil(sess)
assert.Equal(output, sess.stream)
}

View file

@ -1,292 +0,0 @@
package sender
import (
"fmt"
"io"
"os"
"sync"
colorable "github.com/mattn/go-colorable"
"github.com/pion/webrtc/v2"
internalSess "github.com/schollz/croc/v5/internal/session"
"github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/pkg/stats"
"github.com/sirupsen/logrus"
)
const (
// Must be <= 16384
senderBuffSize = 16384
bufferThreshold = 512 * 1024 // 512kB
)
var log = logrus.New()
func init() {
log.SetFormatter(&logrus.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout())
log.SetLevel(logrus.DebugLevel)
}
type outputMsg struct {
n int
buff []byte
}
// Session is a sender session
type Session struct {
sess internalSess.Session
initialized bool
dataChannel *webrtc.DataChannel
dataBuff []byte
msgToBeSent []outputMsg
stopSending chan struct{}
output chan outputMsg
doneCheckLock sync.Mutex
doneCheck bool
// Stats/infos
readingStats *stats.Stats
}
// New creates a new sender session
func new(s internalSess.Session) *Session {
return &Session{
sess: s,
initialized: false,
dataBuff: make([]byte, senderBuffSize),
stopSending: make(chan struct{}, 1),
output: make(chan outputMsg, senderBuffSize*10),
doneCheck: false,
readingStats: stats.New(),
}
}
// New creates a new receiver session
func New() *Session {
return new(internalSess.New(nil, nil))
}
// Config contains custom configuration for a session
type Config struct {
common.Configuration
}
// NewWith createa a new sender Session with custom configuration
func NewWith(c Config) *Session {
return new(internalSess.New(c.SDPProvider, c.SDPOutput))
}
func (s *Session) CreateConnection() (err error) {
return s.sess.CreateConnection(s.onConnectionStateChange())
}
func (s *Session) CreateOffer() (string, error) {
return s.sess.CreateOffer()
}
func (s *Session) SetSDP(sdp string) error {
return s.sess.SetSDP(sdp)
}
func (s *Session) TransferFile(pathToFile string) {
s.readFile(pathToFile)
s.sess.OnCompletion()
}
// SDPProvider returns the underlying SDPProvider
func (s *Session) SDPProvider() io.Reader {
return s.sess.SDPProvider()
}
// // Initialize creates the connection, the datachannel and creates the offer
// func (s *Session) Initialize() error {
// if s.initialized {
// return nil
// }
// if err := s.sess.CreateConnection(s.onConnectionStateChange()); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.createDataChannel(); err != nil {
// log.Errorln(err)
// return err
// }
// if err := s.sess.CreateOffer(); err != nil {
// log.Errorln(err)
// return err
// }
// s.initialized = true
// return nil
// }
// // Start the connection and the file transfer
// func (s *Session) Start() error {
// if err := s.Initialize(); err != nil {
// return err
// }
// go s.readFile()
// if err := s.sess.ReadSDP(); err != nil {
// log.Errorln(err)
// return err
// }
// <-s.sess.Done
// s.sess.OnCompletion()
// return nil
// }
func (s *Session) CreateDataChannel() error {
ordered := true
maxPacketLifeTime := uint16(10000)
dataChannel, err := s.sess.CreateDataChannel(&webrtc.DataChannelInit{
Ordered: &ordered,
MaxPacketLifeTime: &maxPacketLifeTime,
})
if err != nil {
return err
}
s.dataChannel = dataChannel
s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow())
s.dataChannel.SetBufferedAmountLowThreshold(bufferThreshold)
s.dataChannel.OnOpen(s.onOpenHandler())
s.dataChannel.OnClose(s.onCloseHandler())
return nil
}
func (s *Session) readFile(pathToFile string) error {
f, err := os.Open(pathToFile)
if err != nil {
return err
}
log.Infof("Starting to read data...")
s.readingStats.Start()
defer func() {
f.Close()
s.readingStats.Pause()
log.Infof("Stopped reading data...")
close(s.output)
}()
for {
// Read file
s.dataBuff = s.dataBuff[:cap(s.dataBuff)]
n, err := f.Read(s.dataBuff)
if err != nil {
if err == io.EOF {
s.readingStats.Stop()
log.Debugf("Got EOF after %v bytes!\n", s.readingStats.Bytes())
return nil
}
log.Errorf("Read Error: %v\n", err)
return err
}
s.dataBuff = s.dataBuff[:n]
s.readingStats.AddBytes(uint64(n))
s.output <- outputMsg{
n: n,
// Make a copy of the buffer
buff: append([]byte(nil), s.dataBuff...),
}
}
return nil
}
func (s *Session) onBufferedAmountLow() func() {
return func() {
data := <-s.output
if data.n != 0 {
s.msgToBeSent = append(s.msgToBeSent, data)
} else if len(s.msgToBeSent) == 0 && s.dataChannel.BufferedAmount() == 0 {
s.sess.NetworkStats.Stop()
s.close(false)
return
}
currentSpeed := s.sess.NetworkStats.Bandwidth()
fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed)
for len(s.msgToBeSent) != 0 {
cur := s.msgToBeSent[0]
if err := s.dataChannel.Send(cur.buff); err != nil {
log.Errorf("Error, cannot send to client: %v\n", err)
return
}
s.sess.NetworkStats.AddBytes(uint64(cur.n))
s.msgToBeSent = s.msgToBeSent[1:]
}
}
}
func (s *Session) writeToNetwork() {
// Set callback, as transfer may be paused
fmt.Println("\nwriting")
s.dataChannel.OnBufferedAmountLow(s.onBufferedAmountLow())
fmt.Println("\ndone")
<-s.stopSending
fmt.Println("\nstopped sending")
s.dataChannel.OnBufferedAmountLow(nil)
log.Infof("Pausing network I/O... (remaining at least %v packets)\n", len(s.output))
s.sess.NetworkStats.Pause()
}
func (s *Session) StopSending() {
s.stopSending <- struct{}{}
}
func (s *Session) onConnectionStateChange() func(connectionState webrtc.ICEConnectionState) {
return func(connectionState webrtc.ICEConnectionState) {
log.Infof("ICE Connection State has changed: %s\n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateDisconnected {
s.StopSending()
}
}
}
func (s *Session) onOpenHandler() func() {
return func() {
s.sess.NetworkStats.Start()
log.Infof("Starting to send data...")
defer log.Infof("Stopped sending data...")
s.writeToNetwork()
}
}
func (s *Session) onCloseHandler() func() {
return func() {
s.close(true)
}
}
func (s *Session) close(calledFromCloseHandler bool) {
if !calledFromCloseHandler {
s.dataChannel.Close()
}
// Sometime, onCloseHandler is not invoked, so it's a work-around
s.doneCheckLock.Lock()
if s.doneCheck {
s.doneCheckLock.Unlock()
return
}
s.doneCheck = true
s.doneCheckLock.Unlock()
s.dumpStats()
close(s.sess.Done)
}
func (s *Session) dumpStats() {
fmt.Printf(`
Disk : %s
Network: %s
`, s.readingStats.String(), s.sess.NetworkStats.String())
}

View file

@ -1,19 +0,0 @@
package sender
import (
"bufio"
"bytes"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_New(t *testing.T) {
assert := assert.New(t)
input := bufio.NewReader(&bytes.Buffer{})
sess := New(input)
assert.NotNil(sess)
assert.Equal(input, sess.stream)
}

View file

@ -1,7 +0,0 @@
package session
// Session defines a common interface for sender and receiver sessions
type Session interface {
// Start a connection and starts the file transfer
Start() error
}

View file

@ -1,99 +0,0 @@
package session
import (
"bytes"
"fmt"
"testing"
"github.com/schollz/croc/v5/internal/buffer"
"github.com/schollz/croc/v5/pkg/session/common"
"github.com/schollz/croc/v5/pkg/session/receiver"
"github.com/schollz/croc/v5/pkg/session/sender"
"github.com/schollz/croc/v5/pkg/utils"
"github.com/stretchr/testify/assert"
)
// Tests
func Test_CreateReceiverSession(t *testing.T) {
assert := assert.New(t)
stream := &bytes.Buffer{}
sess := receiver.NewWith(receiver.Config{
Stream: stream,
})
assert.NotNil(sess)
}
func Test_TransferSmallMessage(t *testing.T) {
assert := assert.New(t)
// Create client receiver
clientStream := &buffer.Buffer{}
clientSDPProvider := &buffer.Buffer{}
clientSDPOutput := &buffer.Buffer{}
clientConfig := receiver.Config{
Stream: clientStream,
Configuration: common.Configuration{
SDPProvider: clientSDPProvider,
SDPOutput: clientSDPOutput,
},
}
clientSession := receiver.NewWith(clientConfig)
assert.NotNil(clientSession)
// Create sender
senderStream := &buffer.Buffer{}
senderSDPProvider := &buffer.Buffer{}
senderSDPOutput := &buffer.Buffer{}
n, err := senderStream.WriteString("Hello World!\n")
assert.Nil(err)
assert.Equal(13, n) // Len "Hello World\n"
senderConfig := sender.Config{
Stream: senderStream,
Configuration: common.Configuration{
SDPProvider: senderSDPProvider,
SDPOutput: senderSDPOutput,
},
}
senderSession := sender.NewWith(senderConfig)
assert.NotNil(senderSession)
senderDone := make(chan struct{})
go func() {
defer close(senderDone)
err := senderSession.Start()
assert.Nil(err)
}()
// Get SDP from sender and send it to the client
sdp, err := utils.MustReadStream(senderSDPOutput)
assert.Nil(err)
fmt.Printf("READ SDP -> %s\n", sdp)
sdp += "\n"
n, err = clientSDPProvider.WriteString(sdp)
assert.Nil(err)
assert.Equal(len(sdp), n)
clientDone := make(chan struct{})
go func() {
defer close(clientDone)
err := clientSession.Start()
assert.Nil(err)
}()
// Get SDP from client and send it to the sender
sdp, err = utils.MustReadStream(clientSDPOutput)
assert.Nil(err)
n, err = senderSDPProvider.WriteString(sdp)
assert.Nil(err)
assert.Equal(len(sdp), n)
fmt.Println("Waiting for everyone to be done...")
<-senderDone
<-clientDone
msg, err := clientStream.ReadString('\n')
assert.Nil(err)
assert.Equal("Hello World!\n", msg)
}

View file

@ -1,17 +0,0 @@
package stats
// Bytes returns the stored number of bytes
func (s *Stats) Bytes() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.nbBytes
}
// AddBytes increase the nbBytes counter
func (s *Stats) AddBytes(c uint64) {
s.lock.Lock()
defer s.lock.Unlock()
s.nbBytes += c
}

View file

@ -1,40 +0,0 @@
package stats
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_Bytes(t *testing.T) {
assert := assert.New(t)
tests := []struct {
before uint64
add uint64
after uint64
}{
{
before: 0,
add: 0,
after: 0,
},
{
before: 0,
add: 1,
after: 1,
},
{
before: 1,
add: 10,
after: 11,
},
}
s := New()
for _, cur := range tests {
assert.Equal(cur.before, s.Bytes())
s.AddBytes(cur.add)
assert.Equal(cur.after, s.Bytes())
}
}

View file

@ -1,55 +0,0 @@
package stats
import "time"
// Start stores the "start" timestamp
func (s *Stats) Start() {
s.lock.Lock()
defer s.lock.Unlock()
if s.timeStart.IsZero() {
s.timeStart = time.Now()
} else if !s.timePause.IsZero() {
s.timePaused += time.Since(s.timePause)
// Reset
s.timePause = time.Time{}
}
}
// Pause stores an interruption timestamp
func (s *Stats) Pause() {
s.lock.RLock()
if s.timeStart.IsZero() || !s.timeStop.IsZero() {
// Can't stop if not started, or if stopped
s.lock.RUnlock()
return
}
s.lock.RUnlock()
s.lock.Lock()
defer s.lock.Unlock()
if s.timePause.IsZero() {
s.timePause = time.Now()
}
}
// Stop stores the "stop" timestamp
func (s *Stats) Stop() {
s.lock.RLock()
if s.timeStart.IsZero() {
// Can't stop if not started
s.lock.RUnlock()
return
}
s.lock.RUnlock()
s.lock.Lock()
defer s.lock.Unlock()
if s.timeStop.IsZero() {
s.timeStop = time.Now()
}
}

View file

@ -1,55 +0,0 @@
package stats
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_ControlFlow(t *testing.T) {
assert := assert.New(t)
s := New()
// Everything should be 0 at the beginning
assert.Equal(true, s.timeStart.IsZero())
assert.Equal(true, s.timeStop.IsZero())
assert.Equal(true, s.timePause.IsZero())
// Should not do anything
s.Stop()
assert.Equal(true, s.timeStop.IsZero())
// Should not do anything
s.Pause()
assert.Equal(true, s.timePause.IsZero())
// Should start
s.Start()
originalStart := s.timeStart
assert.Equal(false, s.timeStart.IsZero())
// Should pause
s.Pause()
assert.Equal(false, s.timePause.IsZero())
originalPause := s.timePause
// Should not modify
s.Pause()
assert.Equal(originalPause, s.timePause)
// Should release
assert.Equal(int64(0), s.timePaused.Nanoseconds())
s.Start()
assert.NotEqual(0, s.timePaused.Nanoseconds())
originalPausedDuration := s.timePaused
assert.Equal(true, s.timePause.IsZero())
assert.Equal(originalStart, s.timeStart)
s.Pause()
time.Sleep(10 * time.Nanosecond)
s.Start()
assert.Equal(true, s.timePaused > originalPausedDuration)
s.Stop()
assert.Equal(false, s.timeStop.IsZero())
}

View file

@ -1,26 +0,0 @@
package stats
import "time"
// Duration returns the 'stop - start' duration, if stopped
// Returns 0 if not started
// Returns time.Since(s.timeStart) if not stopped
func (s *Stats) Duration() time.Duration {
s.lock.RLock()
defer s.lock.RUnlock()
if s.timeStart.IsZero() {
return 0
} else if s.timeStop.IsZero() {
return time.Since(s.timeStart) - s.timePaused
}
return s.timeStop.Sub(s.timeStart) - s.timePaused
}
// Bandwidth returns the IO speed in MB/s
func (s *Stats) Bandwidth() float64 {
s.lock.RLock()
defer s.lock.RUnlock()
return (float64(s.nbBytes) / 1024 / 1024) / s.Duration().Seconds()
}

View file

@ -1,79 +0,0 @@
package stats
import (
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_Bandwidth(t *testing.T) {
assert := assert.New(t)
s := New()
now := time.Now()
tests := []struct {
startTime time.Time
stopTime time.Time
pauseDuration time.Duration
bytesCount uint64
expectedBandwidth float64
}{
{
startTime: time.Time{},
stopTime: time.Time{},
pauseDuration: 0,
bytesCount: 0,
expectedBandwidth: math.NaN(),
},
{
startTime: now,
stopTime: time.Time{},
pauseDuration: 0,
bytesCount: 0,
expectedBandwidth: 0,
},
{
startTime: now,
stopTime: now.Add(time.Duration(1 * 1000000000)),
pauseDuration: 0,
bytesCount: 1024 * 1024,
expectedBandwidth: 1,
},
{
startTime: now,
stopTime: now.Add(time.Duration(2 * 1000000000)),
pauseDuration: time.Duration(1 * 1000000000),
bytesCount: 1024 * 1024,
expectedBandwidth: 1,
},
}
for _, cur := range tests {
s.timeStart = cur.startTime
s.timeStop = cur.stopTime
s.timePaused = cur.pauseDuration
s.nbBytes = cur.bytesCount
if math.IsNaN(cur.expectedBandwidth) {
assert.Equal(true, math.IsNaN(s.Bandwidth()))
} else {
assert.Equal(cur.expectedBandwidth, s.Bandwidth())
}
}
}
func Test_Duration(t *testing.T) {
assert := assert.New(t)
s := New()
// Should be 0
assert.Equal(time.Duration(0), s.Duration())
// Should return time.Since()
s.Start()
durationTmp := s.Duration()
time.Sleep(10 * time.Nanosecond)
assert.Equal(true, s.Duration() > durationTmp)
}

View file

@ -1,31 +0,0 @@
package stats
import (
"fmt"
"sync"
"time"
)
// Stats provide a way to track statistics infos
type Stats struct {
lock *sync.RWMutex
nbBytes uint64
timeStart time.Time
timeStop time.Time
timePause time.Time
timePaused time.Duration
}
// New creates a new Stats
func New() *Stats {
return &Stats{
lock: &sync.RWMutex{},
}
}
func (s *Stats) String() string {
s.lock.RLock()
defer s.lock.RUnlock()
return fmt.Sprintf("%v bytes | %-v | %0.4f MB/s", s.Bytes(), s.Duration(), s.Bandwidth())
}

View file

@ -1,102 +0,0 @@
package utils
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strings"
)
// MustReadStream blocks until input is received from the stream
func MustReadStream(stream io.Reader) (string, error) {
r := bufio.NewReader(stream)
var in string
for {
var err error
in, err = r.ReadString('\n')
if err != io.EOF {
if err != nil {
return "", err
}
}
in = strings.TrimSpace(in)
if len(in) > 0 {
break
}
}
fmt.Println("")
return in, nil
}
// StripSDP remove useless elements from an SDP
func StripSDP(originalSDP string) string {
finalSDP := strings.Replace(originalSDP, "a=group:BUNDLE audio video data", "a=group:BUNDLE data", -1)
tmp := strings.Split(finalSDP, "m=audio")
beginningSdp := tmp[0]
var endSdp string
if len(tmp) > 1 {
tmp = strings.Split(tmp[1], "a=end-of-candidates")
endSdp = strings.Join(tmp[2:], "a=end-of-candidates")
} else {
endSdp = strings.Join(tmp[1:], "a=end-of-candidates")
}
finalSDP = beginningSdp + endSdp
finalSDP = strings.Replace(finalSDP, "\r\n\r\n", "\r\n", -1)
finalSDP = strings.Replace(finalSDP, "\n\n", "\n", -1)
return finalSDP
}
// Encode encodes the input in base64
// It can optionally zip the input before encoding
func Encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", err
}
var gzbuff bytes.Buffer
gz, err := gzip.NewWriterLevel(&gzbuff, gzip.BestCompression)
if err != nil {
return "", err
}
if _, err := gz.Write(b); err != nil {
return "", err
}
if err := gz.Flush(); err != nil {
return "", err
}
if err := gz.Close(); err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(gzbuff.Bytes()), nil
}
// Decode decodes the input from base64
// It can optionally unzip the input after decoding
func Decode(in string, obj interface{}) error {
b, err := base64.StdEncoding.DecodeString(in)
if err != nil {
return err
}
gz, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return err
}
defer gz.Close()
s, err := ioutil.ReadAll(gz)
if err != nil {
return err
}
return json.Unmarshal(s, obj)
}

View file

@ -1,223 +0,0 @@
package utils
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_ReadStream(t *testing.T) {
assert := assert.New(t)
stream := &bytes.Buffer{}
_, err := stream.WriteString("Hello\n")
assert.Nil(err)
str, err := MustReadStream(stream)
assert.Equal("Hello", str)
assert.Nil(err)
}
func Test_StripSDP(t *testing.T) {
assert := assert.New(t)
tests := []struct {
sdp string
expected string
}{
{
sdp: "",
expected: "",
},
{
sdp: `v=0
o=- 297292268 1552262038 IN IP4 0.0.0.0
s=-
t=0 0
a=fingerprint:sha-256 70:E0:B2:DA:F8:04:D6:0C:32:03:DF:CD:A8:70:EC:45:10:FF:66:6F:3D:72:B1:BA:4C:AF:FB:5E:BE:F9:CF:6A
a=group:BUNDLE audio video data
m=audio 9 UDP/TLS/RTP/SAVPF 111 9
c=IN IP4 0.0.0.0
a=setup:actpass
a=mid:audio
a=ice-ufrag:SNxNaqIiaNoDiCNM
a=ice-pwd:dSZlwOEOKEmBfNiXCtpmPTOVJlwUCaFX
a=rtcp-mux
a=rtcp-rsize
a=rtpmap:111 opus/48000/2
a=fmtp:111 minptime=10;useinbandfec=1
a=rtpmap:9 G722/8000
a=recvonly
a=candidate:foundation 1 udp 3776 192.168.100.207 61879 typ host generation 0
a=candidate:foundation 2 udp 3776 192.168.100.207 61879 typ host generation 0
a=end-of-candidates
a=setup:actpass
m=video 9 UDP/TLS/RTP/SAVPF 96 100 98
c=IN IP4 0.0.0.0
a=setup:actpass
a=mid:video
a=ice-ufrag:SNxNaqIiaNoDiCNM
a=ice-pwd:dSZlwOEOKEmBfNiXCtpmPTOVJlwUCaFX
a=rtcp-mux
a=rtcp-rsize
a=rtpmap:96 VP8/90000
a=rtpmap:100 H264/90000
a=fmtp:100 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f
a=rtpmap:98 VP9/90000
a=recvonly
a=candidate:foundation 1 udp 3776 192.168.100.207 61879 typ host generation 0
a=candidate:foundation 2 udp 3776 192.168.100.207 61879 typ host generation 0
a=end-of-candidates
a=setup:actpass
m=application 9 DTLS/SCTP 5000
c=IN IP4 0.0.0.0
a=setup:actpass
a=mid:data
a=sendrecv
a=sctpmap:5000 webrtc-datachannel 1024
a=ice-ufrag:SNxNaqIiaNoDiCNM
a=ice-pwd:dSZlwOEOKEmBfNiXCtpmPTOVJlwUCaFX
a=candidate:foundation 1 udp 3776 192.168.100.207 61879 typ host generation 0
a=candidate:foundation 2 udp 3776 192.168.100.207 61879 typ host generation 0
a=end-of-candidates
a=setup:actpass
`,
expected: `v=0
o=- 297292268 1552262038 IN IP4 0.0.0.0
s=-
t=0 0
a=fingerprint:sha-256 70:E0:B2:DA:F8:04:D6:0C:32:03:DF:CD:A8:70:EC:45:10:FF:66:6F:3D:72:B1:BA:4C:AF:FB:5E:BE:F9:CF:6A
a=group:BUNDLE data
a=setup:actpass
m=application 9 DTLS/SCTP 5000
c=IN IP4 0.0.0.0
a=setup:actpass
a=mid:data
a=sendrecv
a=sctpmap:5000 webrtc-datachannel 1024
a=ice-ufrag:SNxNaqIiaNoDiCNM
a=ice-pwd:dSZlwOEOKEmBfNiXCtpmPTOVJlwUCaFX
a=candidate:foundation 1 udp 3776 192.168.100.207 61879 typ host generation 0
a=candidate:foundation 2 udp 3776 192.168.100.207 61879 typ host generation 0
a=end-of-candidates
a=setup:actpass
`,
},
}
for _, cur := range tests {
assert.Equal(cur.expected, StripSDP(cur.sdp))
}
}
func Test_Encode(t *testing.T) {
assert := assert.New(t)
tests := []struct {
input interface{}
shouldErr bool
expected string
}{
// Invalid object
{
input: make(chan int),
shouldErr: true,
},
// Empty input
{
input: nil,
shouldErr: false,
expected: "H4sIAAAAAAAC/8orzckBAAAA//8BAAD//0/8yyUEAAAA",
},
// Not JSON
{
input: "ThisTestIsNotInB64",
shouldErr: false,
expected: "H4sIAAAAAAAC/1IKycgsDkktLvEs9ssv8cxzMjNRAgAAAP//AQAA//8+sWiWFAAAAA==",
},
// JSON
{
input: struct {
Name string `json:"name"`
}{
Name: "TestJson",
},
shouldErr: false,
expected: "H4sIAAAAAAAC/6pWykvMTVWyUgpJLS7xKs7PU6oFAAAA//8BAAD//3cqgZQTAAAA",
},
}
for _, cur := range tests {
res, err := Encode(cur.input)
if cur.shouldErr {
assert.NotNil(err)
} else {
assert.Nil(err)
assert.Equal(cur.expected, res)
}
}
}
func Test_Decode(t *testing.T) {
assert := assert.New(t)
tests := []struct {
input string
shouldErr bool
}{
// Empty string
{
input: "",
shouldErr: true,
},
// Not base64
{
input: "ThisTestIsNotInB64",
shouldErr: true,
},
// Not base64 JSON
{
input: "aGVsbG8gd29ybGQ=",
shouldErr: true,
},
// Base64 JSON
{
input: "H4sIAAAAAAAC/+xVTY/bNhD9KwOdK5ukqK8JdFh77XabNPXGXqcJcmFEystGogiKsuMU/e+FLG/qtEWBBRo0h4UgCTPz+OaRegP9FvijVQEGwnQH5YLvgk7aAIN9Qd65d6YtQojzmCRpmmZA45ixhFNO4eYl3Kw4kMnpGqBdEQ4vXxA4xaKotNkpZ502Hrt7EbI4gYjiLMKE4oIhYZgskWXIyfCcLZFdY5ZgxnEeYTRDHmGeYsyQ57icIWdI5ji/wnmClCBLMUowjjBLcXGFEUHOx8Y71/YWZ3cvr18sQPRSt7DXUrUghRcDpCnGbA5316vp5sV6+mqzmq6vtqslUEohH0Bl8fdNiqJTvrcoSq/3asw0WuKJbgx1qcK+cmKH5avl6zflzeGu8z9vd4df/qzbg8TXerH7cfnrQj9/u/a36+2b55ubulosDx9u5eb2p7cj2vnShk3/8SJynf6kHmLbCIuD5Nb23ZRnhJApOx9/48dSo431ulEFJc/6TmnzXhhZqbKgX7Dk8H3K2HSgOCs1l9sshZFaCq+wansjhdetAQq9tBClaQI0ZxOaZBMyoVkOcRQnCfijhfu287BTRrlxCfkXOvbf0p3VUUKApxM63CyfRAxinhJ6outcVX8EJ6R0f2npbOv8Gfk4+V+jnzIybKvwc9tutPFo63+ycZ7AoCPPHmvlE+X/ZuU8ge0qm+bkswsfPE4I/MASflkaHU4I1Gqv6lB0x6ZR3h1DUdftQcmCPrOi/KC8/nQ6zLBppRqSrq10rcJxmZYFZ4TQ6kshGWxX+WW3p3H45sdBWFvrckTmcD1MxHq+WUF8/oiPmYOHf8VQN9Kpcn+OytEgAycc1Hvny3DAlvfCGFUDJYx/jfF5Mtw3Z7jg9z8AAAD//wEAAP//RjpVQj8JAAA=",
shouldErr: false,
},
}
var obj interface{}
for _, cur := range tests {
err := Decode(cur.input, &obj)
if cur.shouldErr {
assert.NotNil(err)
} else {
assert.Nil(err)
}
}
}
func Test_EncodeDecode(t *testing.T) {
assert := assert.New(t)
input := struct {
Name string `json:"name"`
}{
Name: "TestJson",
}
encoded, err := Encode(input)
assert.Nil(err)
assert.Equal("H4sIAAAAAAAC/6pWykvMTVWyUgpJLS7xKs7PU6oFAAAA//8BAAD//3cqgZQTAAAA", encoded)
var obj struct {
Name string `json:"name"`
}
err = Decode(encoded, &obj)
assert.Nil(err)
assert.Equal(input, obj)
}

View file

@ -18,10 +18,10 @@ import (
"github.com/go-redis/redis"
"github.com/mattn/go-colorable"
"github.com/pions/webrtc"
common "github.com/schollz/croc/v5/pkg/session/common"
recvSess "github.com/schollz/croc/v5/pkg/session/receiver"
sendSess "github.com/schollz/croc/v5/pkg/session/sender"
"github.com/schollz/croc/v5/src/utils"
common "github.com/schollz/croc/v5/src/webrtc/pkg/session/common"
recvSess "github.com/schollz/croc/v5/src/webrtc/pkg/session/receiver"
sendSess "github.com/schollz/croc/v5/src/webrtc/pkg/session/sender"
"github.com/schollz/pake"
"github.com/schollz/progressbar/v2"
"github.com/sirupsen/logrus"
@ -524,15 +524,7 @@ func (c *Client) updateState() (err error) {
}
func (c *Client) dataChannelReceive() (err error) {
pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name)
os.MkdirAll(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, os.ModeDir)
c.CurrentFile, err = os.OpenFile(pathToFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
c.recvSess = recvSess.NewWith(recvSess.Config{})
err = c.recvSess.CreateConnection()
if err != nil {
return
@ -542,11 +534,6 @@ func (c *Client) dataChannelReceive() (err error) {
}
func (c *Client) dataChannelSend() (err error) {
pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name)
c.CurrentFile, err = os.Open(pathToFile)
if err != nil {
return
}
c.sendSess = sendSess.NewWith(sendSess.Config{
Configuration: common.Configuration{
OnCompletion: func() {