From 8fd312d876f586ba5e8a9d5e3b12f39f00a127cc Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 10 Apr 2019 11:26:10 -0700 Subject: [PATCH] add compression and encryption --- main.go | 5 +-- src/compress/compress_test.go | 4 +-- src/croc/croc.go | 1 - src/crypt/crypt.go | 36 +++++++++---------- src/utils/ip_test.go | 4 +-- src/webrtc/internal/session/session.go | 2 -- src/webrtc/pkg/session/bench/init.go | 4 +-- .../pkg/session/bench/state_download.go | 6 ++-- src/webrtc/pkg/session/bench/state_upload.go | 6 ++-- src/webrtc/pkg/session/receiver/receiver.go | 17 ++++++--- src/webrtc/pkg/session/sender/sender.go | 21 ++++++----- src/webrtc/pkg/session/session_test.go | 4 +-- src/webrtc/pkg/utils/utils.go | 26 -------------- 13 files changed, 59 insertions(+), 77 deletions(-) diff --git a/main.go b/main.go index 9e1b4374..18a48c86 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,7 @@ func main() { // if err != nil { // break // } - // fmt.Println(bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread])) + // log.Debugln(bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread])) // } var sender bool flag.BoolVar(&sender, "sender", false, "sender") @@ -35,7 +35,8 @@ func main() { // PathToFile: "../wskeystore/README.md", // PathToFile: "./src/croc/croc.go", // PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"}, - PathToFiles: []string{"croc2.exe", "croc3.exe"}, + // PathToFiles: []string{"croc2.exe"}, //,"croc2.exe", "croc3.exe"}, + PathToFiles: []string{"README.md"}, //,"croc2.exe", "croc3.exe"}, KeepPathInRemote: false, }) } else { diff --git a/src/compress/compress_test.go b/src/compress/compress_test.go index 563883df..f7ba7722 100644 --- a/src/compress/compress_test.go +++ b/src/compress/compress_test.go @@ -68,11 +68,11 @@ func BenchmarkCompressLevelNineBinary(b *testing.B) { func TestCompress(t *testing.T) { compressedB := CompressWithOption(fable, 9) dataRateSavings := 100 * (1.0 - float64(len(compressedB))/float64(len(fable))) - fmt.Printf("Level 9: %2.0f%% percent space savings\n", dataRateSavings) + log.Debugf("Level 9: %2.0f%% percent space savings\n", dataRateSavings) assert.True(t, len(compressedB) < len(fable)) compressedB = CompressWithOption(fable, -2) dataRateSavings = 100 * (1.0 - float64(len(compressedB))/float64(len(fable))) - fmt.Printf("Level -2: %2.0f%% percent space savings\n", dataRateSavings) + log.Debugf("Level -2: %2.0f%% percent space savings\n", dataRateSavings) assert.True(t, len(compressedB) < len(fable)) } diff --git a/src/croc/croc.go b/src/croc/croc.go index d3b61ba3..68e29ff6 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -394,7 +394,6 @@ func (c *Client) processMessage(m Message) (err error) { // start receiving data pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) c.recvSess.ReceiveData(pathToFile, c.FilesToTransfer[c.FilesToTransferCurrentNum].Size) - fmt.Println("\ndone receiving") err = c.redisdb.Publish(c.nameOutChannel, Message{ Type: "close-sender", }.String()).Err() diff --git a/src/crypt/crypt.go b/src/crypt/crypt.go index 7160ad53..e267e0fd 100644 --- a/src/crypt/crypt.go +++ b/src/crypt/crypt.go @@ -5,9 +5,7 @@ import ( "crypto/cipher" "crypto/rand" "crypto/sha256" - "encoding/base64" - "errors" - "strings" + "encoding/json" "golang.org/x/crypto/pbkdf2" ) @@ -20,28 +18,28 @@ type Encryption struct { } func (e Encryption) Bytes() []byte { - return []byte(base64.StdEncoding.EncodeToString(e.Encrypted) + "-" + base64.StdEncoding.EncodeToString(e.Salt) + "-" + base64.StdEncoding.EncodeToString(e.IV)) + b, _ := json.Marshal(e) + return b } func FromBytes(b []byte) (enc Encryption, err error) { - enc = Encryption{} - items := strings.Split(string(b), "-") - if len(items) != 3 { - err = errors.New("not valid") - return - } - enc.Encrypted, err = base64.StdEncoding.DecodeString(items[0]) - if err != nil { - return - } - enc.Salt, err = base64.StdEncoding.DecodeString(items[1]) - if err != nil { - return - } - enc.IV, err = base64.StdEncoding.DecodeString(items[2]) + err = json.Unmarshal(b, &enc) return } +func DecryptFromBytes(b []byte, passphrase []byte, dontencrypt ...bool) (decrypted []byte, err error) { + enc, err := FromBytes(b) + if err != nil { + return + } + return enc.Decrypt(passphrase, dontencrypt...) +} + +func EncryptToBytes(plaintext []byte, passphrase []byte, dontencrypt ...bool) []byte { + enc := Encrypt(plaintext, passphrase, dontencrypt...) + return enc.Bytes() +} + // Encrypt will generate an encryption func Encrypt(plaintext []byte, passphrase []byte, dontencrypt ...bool) Encryption { if len(dontencrypt) > 0 && dontencrypt[0] { diff --git a/src/utils/ip_test.go b/src/utils/ip_test.go index ae3fcf67..d81b2874 100644 --- a/src/utils/ip_test.go +++ b/src/utils/ip_test.go @@ -6,6 +6,6 @@ import ( ) func TestGetIP(t *testing.T) { - fmt.Println(PublicIP()) - fmt.Println(LocalIP()) + log.Debugln(PublicIP()) + log.Debugln(LocalIP()) } diff --git a/src/webrtc/internal/session/session.go b/src/webrtc/internal/session/session.go index 19e846e4..082dd16a 100644 --- a/src/webrtc/internal/session/session.go +++ b/src/webrtc/internal/session/session.go @@ -2,7 +2,6 @@ package session import ( "io" - "log" "os" "github.com/pion/webrtc/v2" @@ -30,7 +29,6 @@ type Session struct { // New creates a new Session func New(sdpInput io.Reader, sdpOutput io.Writer) Session { - log.Println("making new channel") if sdpInput == nil { sdpInput = os.Stdin } diff --git a/src/webrtc/pkg/session/bench/init.go b/src/webrtc/pkg/session/bench/init.go index 0fd7c3fe..8a315bed 100644 --- a/src/webrtc/pkg/session/bench/init.go +++ b/src/webrtc/pkg/session/bench/init.go @@ -33,8 +33,8 @@ func (s *Session) Start() error { // Wait for benchmarks to be done s.wg.Wait() - fmt.Printf("Upload: %s\n", s.uploadNetworkStats.String()) - fmt.Printf("Download: %s\n", s.downloadNetworkStats.String()) + log.Debugf("Upload: %s\n", s.uploadNetworkStats.String()) + log.Debugf("Download: %s\n", s.downloadNetworkStats.String()) s.sess.OnCompletion() return nil } diff --git a/src/webrtc/pkg/session/bench/state_download.go b/src/webrtc/pkg/session/bench/state_download.go index 4ddd5491..23426729 100644 --- a/src/webrtc/pkg/session/bench/state_download.go +++ b/src/webrtc/pkg/session/bench/state_download.go @@ -24,7 +24,7 @@ func (s *Session) onOpenHandlerDownload(dc *webrtc.DataChannel) func() { // Useful for unit tests if dc != nil { dc.OnMessage(func(msg webrtc.DataChannelMessage) { - fmt.Printf("Downloading at %.2f MB/s\r", s.downloadNetworkStats.Bandwidth()) + log.Debugf("Downloading at %.2f MB/s\r", s.downloadNetworkStats.Bandwidth()) s.downloadNetworkStats.AddBytes(uint64(len(msg.Data))) }) } else { @@ -32,7 +32,7 @@ func (s *Session) onOpenHandlerDownload(dc *webrtc.DataChannel) func() { } timeoutErr := time.After(s.testDurationError) - fmt.Printf("Downloading random datas ... (%d s)\n", int(s.testDuration.Seconds())) + log.Debugf("Downloading random datas ... (%d s)\n", int(s.testDuration.Seconds())) select { case <-s.downloadDone: @@ -46,7 +46,7 @@ func (s *Session) onOpenHandlerDownload(dc *webrtc.DataChannel) func() { close(s.startPhase2) } - fmt.Printf("\n") + log.Debugf("\n") s.downloadNetworkStats.Stop() s.wg.Done() } diff --git a/src/webrtc/pkg/session/bench/state_upload.go b/src/webrtc/pkg/session/bench/state_upload.go index 58960a0b..73b82026 100644 --- a/src/webrtc/pkg/session/bench/state_upload.go +++ b/src/webrtc/pkg/session/bench/state_upload.go @@ -31,7 +31,7 @@ func (s *Session) onOpenUploadHandler(dc *webrtc.DataChannel) func() { dc.SetBufferedAmountLowThreshold(s.bufferThreshold) dc.OnBufferedAmountLow(func() { if err := dc.Send(token); err == nil { - fmt.Printf("Uploading at %.2f MB/s\r", s.uploadNetworkStats.Bandwidth()) + log.Debugf("Uploading at %.2f MB/s\r", s.uploadNetworkStats.Bandwidth()) s.uploadNetworkStats.AddBytes(lenToken) } }) @@ -39,7 +39,7 @@ func (s *Session) onOpenUploadHandler(dc *webrtc.DataChannel) func() { log.Warningln("No DataChannel provided") } - fmt.Printf("Uploading random datas ... (%d s)\n", int(s.testDuration.Seconds())) + log.Debugf("Uploading random datas ... (%d s)\n", int(s.testDuration.Seconds())) timeout := time.After(s.testDuration) timeoutErr := time.After(s.testDurationError) @@ -59,7 +59,7 @@ func (s *Session) onOpenUploadHandler(dc *webrtc.DataChannel) func() { break SENDING_LOOP } } - fmt.Printf("\n") + log.Debugf("\n") s.uploadNetworkStats.Stop() if dc != nil { diff --git a/src/webrtc/pkg/session/receiver/receiver.go b/src/webrtc/pkg/session/receiver/receiver.go index 6adc59d6..a21213c0 100644 --- a/src/webrtc/pkg/session/receiver/receiver.go +++ b/src/webrtc/pkg/session/receiver/receiver.go @@ -2,13 +2,14 @@ package receiver import ( "encoding/binary" - "fmt" "io" "os" "time" "github.com/mattn/go-colorable" "github.com/pion/webrtc/v2" + "github.com/schollz/croc/v5/src/compress" + "github.com/schollz/croc/v5/src/crypt" internalSess "github.com/schollz/croc/v5/src/webrtc/internal/session" "github.com/schollz/croc/v5/src/webrtc/pkg/session/common" "github.com/schollz/progressbar/v2" @@ -164,11 +165,17 @@ func (s *Session) receiveData(pathToFile string, fileSize int64) error { select { case <-s.sess.Done: s.sess.NetworkStats.Stop() - fmt.Printf("\nNetwork: %s\n", s.sess.NetworkStats.String()) + log.Debugf("Network: %s", s.sess.NetworkStats.String()) return nil case msg := <-s.msgChannel: - pos := int64(binary.LittleEndian.Uint64(msg.Data[:8])) - n, err := f.WriteAt(msg.Data[8:], pos) + buff, errDecrypt := crypt.DecryptFromBytes(msg.Data, []byte{1, 2, 3, 4}) + if errDecrypt != nil { + log.Error(errDecrypt) + return errDecrypt + } + buff = compress.Decompress(buff) + pos := int64(binary.LittleEndian.Uint64(buff[:8])) + n, err := f.WriteAt(buff[8:], pos) if err != nil { return err } else { @@ -184,7 +191,7 @@ func (s *Session) receiveData(pathToFile string, fileSize int64) error { } bar.Add(n) // currentSpeed := s.sess.NetworkStats.Bandwidth() - // fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed) + // log.Debugf("Transferring at %.2f MB/s\r", currentSpeed) // s.sess.NetworkStats.AddBytes(uint64(n)) } } diff --git a/src/webrtc/pkg/session/sender/sender.go b/src/webrtc/pkg/session/sender/sender.go index 9a9bbda4..79bf9874 100644 --- a/src/webrtc/pkg/session/sender/sender.go +++ b/src/webrtc/pkg/session/sender/sender.go @@ -2,7 +2,6 @@ package sender import ( "encoding/binary" - "fmt" "io" "os" "sync" @@ -10,6 +9,8 @@ import ( colorable "github.com/mattn/go-colorable" "github.com/pion/webrtc/v2" + "github.com/schollz/croc/v5/src/compress" + "github.com/schollz/croc/v5/src/crypt" internalSess "github.com/schollz/croc/v5/src/webrtc/internal/session" "github.com/schollz/croc/v5/src/webrtc/pkg/session/common" "github.com/schollz/croc/v5/src/webrtc/pkg/stats" @@ -19,7 +20,9 @@ import ( const ( // Must be <= 16384 - senderBuffSize = 16376 + // 8 bytes for position + // 3000 bytes for encryption / compression overhead + senderBuffSize = 13376 bufferThreshold = 512 * 1024 // 512kB ) @@ -205,13 +208,18 @@ func (s *Session) readFile(pathToFile string) error { } s.dataBuff = s.dataBuff[:n] s.readingStats.AddBytes(uint64(n)) + posByte := make([]byte, 8) binary.LittleEndian.PutUint64(posByte, pos) + buff := append([]byte(nil), posByte...) + buff = append(buff, s.dataBuff...) + buff = compress.Compress(buff) + buff = crypt.EncryptToBytes(buff, []byte{1, 2, 3, 4}) s.output <- outputMsg{ n: n, // Make a copy of the buffer - buff: append(buff, s.dataBuff...), + buff: buff, } pos += uint64(n) } @@ -230,7 +238,7 @@ func (s *Session) onBufferedAmountLow() func() { } // currentSpeed := s.sess.NetworkStats.Bandwidth() - // fmt.Printf("Transferring at %.2f MB/s\r", currentSpeed) + // log.Debugf("Transferring at %.2f MB/s\r", currentSpeed) for len(s.msgToBeSent) != 0 { cur := s.msgToBeSent[0] @@ -303,8 +311,5 @@ func (s *Session) close(calledFromCloseHandler bool) { } func (s *Session) dumpStats() { - fmt.Printf(` -Disk : %s -Network: %s -`, s.readingStats.String(), s.sess.NetworkStats.String()) + log.Debugf(`Disk : %s, Network: %s`, s.readingStats.String(), s.sess.NetworkStats.String()) } diff --git a/src/webrtc/pkg/session/session_test.go b/src/webrtc/pkg/session/session_test.go index dfc0da32..87eeef28 100644 --- a/src/webrtc/pkg/session/session_test.go +++ b/src/webrtc/pkg/session/session_test.go @@ -69,7 +69,7 @@ func Test_TransferSmallMessage(t *testing.T) { // Get SDP from sender and send it to the client sdp, err := utils.MustReadStream(senderSDPOutput) assert.Nil(err) - fmt.Printf("READ SDP -> %s\n", sdp) + log.Debugf("READ SDP -> %s\n", sdp) sdp += "\n" n, err = clientSDPProvider.WriteString(sdp) assert.Nil(err) @@ -89,7 +89,7 @@ func Test_TransferSmallMessage(t *testing.T) { assert.Nil(err) assert.Equal(len(sdp), n) - fmt.Println("Waiting for everyone to be done...") + log.Debugln("Waiting for everyone to be done...") <-senderDone <-clientDone diff --git a/src/webrtc/pkg/utils/utils.go b/src/webrtc/pkg/utils/utils.go index ee5037d1..39f42aea 100644 --- a/src/webrtc/pkg/utils/utils.go +++ b/src/webrtc/pkg/utils/utils.go @@ -1,40 +1,14 @@ package utils import ( - "bufio" "bytes" "compress/gzip" "encoding/base64" "encoding/json" - "fmt" - "io" "io/ioutil" "strings" ) -// MustReadStream blocks until input is received from the stream -func MustReadStream(stream io.Reader) (string, error) { - r := bufio.NewReader(stream) - - var in string - for { - var err error - in, err = r.ReadString('\n') - if err != io.EOF { - if err != nil { - return "", err - } - } - in = strings.TrimSpace(in) - if len(in) > 0 { - break - } - } - - fmt.Println("") - return in, nil -} - // StripSDP remove useless elements from an SDP func StripSDP(originalSDP string) string { finalSDP := strings.Replace(originalSDP, "a=group:BUNDLE audio video data", "a=group:BUNDLE data", -1)