From 8734394bb8216e38c74c1b8bd84d5f48095fb438 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Sun, 14 Oct 2018 16:19:40 -0700 Subject: [PATCH 1/8] merge sender and recipient into croc --- src/croc/croc.go | 4 - src/croc/sending.go | 6 +- src/recipient/recipient.go | 527 ------------------------------------- src/sender/sender.go | 491 ---------------------------------- 4 files changed, 2 insertions(+), 1026 deletions(-) delete mode 100644 src/recipient/recipient.go delete mode 100644 src/sender/sender.go diff --git a/src/croc/croc.go b/src/croc/croc.go index 4bdace4b..2b9c8a57 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -5,9 +5,7 @@ import ( "time" "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/recipient" "github.com/schollz/croc/src/relay" - "github.com/schollz/croc/src/sender" "github.com/schollz/croc/src/zipper" ) @@ -78,8 +76,6 @@ func Init(debug bool) (c *Croc) { func SetDebugLevel(debugLevel string) { logger.SetLogLevel(debugLevel) - sender.DebugLevel = debugLevel - recipient.DebugLevel = debugLevel relay.DebugLevel = debugLevel zipper.DebugLevel = debugLevel } diff --git a/src/croc/sending.go b/src/croc/sending.go index 98f9632a..fbec7834 100644 --- a/src/croc/sending.go +++ b/src/croc/sending.go @@ -11,9 +11,7 @@ import ( log "github.com/cihub/seelog" "github.com/gorilla/websocket" - "github.com/schollz/croc/src/recipient" "github.com/schollz/croc/src/relay" - "github.com/schollz/croc/src/sender" "github.com/schollz/peerdiscovery" "github.com/schollz/utils" ) @@ -161,9 +159,9 @@ func (c *Croc) sendReceive(address, websocketPort string, tcpPorts []string, fna } if isSender { - go sender.Send(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) + go c.startSender(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) } else { - go recipient.Receive(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) + go c.startRecipient(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) } for { diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go deleted file mode 100644 index ed0a6427..00000000 --- a/src/recipient/recipient.go +++ /dev/null @@ -1,527 +0,0 @@ -package recipient - -import ( - "bufio" - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net" - "os" - "strconv" - "strings" - "sync" - "time" - - humanize "github.com/dustin/go-humanize" - - log "github.com/cihub/seelog" - "github.com/gorilla/websocket" - "github.com/schollz/croc/src/comm" - "github.com/schollz/croc/src/compress" - "github.com/schollz/croc/src/crypt" - "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/models" - "github.com/schollz/croc/src/utils" - "github.com/schollz/croc/src/zipper" - "github.com/schollz/pake" - "github.com/schollz/progressbar/v2" - "github.com/schollz/spinner" - "github.com/tscholl2/siec" -) - -var DebugLevel string - -// Receive is the async operation to receive a file -func Receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { - logger.SetLogLevel(DebugLevel) - err := receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout) - if err != nil { - if !strings.HasPrefix(err.Error(), "websocket: close 100") { - fmt.Fprintf(os.Stderr, "\n"+err.Error()) - } - } - done <- struct{}{} -} - -func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { - var fstats models.FileStats - var sessionKey []byte - var transferTime time.Duration - var hash256 []byte - var otherIP string - var progressFile string - var resumeFile bool - var tcpConnections []comm.Comm - dataChan := make(chan []byte, 1024*1024) - isConnectedIfUsingTCP := make(chan bool) - blocks := []string{} - - useWebsockets := true - switch forceSend { - case 0: - if !isLocal { - useWebsockets = false - } - case 1: - useWebsockets = true - case 2: - useWebsockets = false - } - - // start a spinner - spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) - spin.Writer = os.Stderr - spin.Suffix = " performing PAKE..." - spin.Start() - - // pick an elliptic curve - curve := siec.SIEC255() - // both parties should have a weak key - pw := []byte(codephrase) - - // initialize recipient Q ("1" indicates recipient) - Q, err := pake.Init(pw, 1, curve, 1*time.Millisecond) - if err != nil { - return - } - - step := 0 - for { - messageType, message, err := c.ReadMessage() - if err != nil { - return err - } - if messageType == websocket.PongMessage || messageType == websocket.PingMessage { - continue - } - if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { - return errors.New("\rinterrupted by other party") - } - - log.Debugf("got %d: %s", messageType, message) - switch step { - case 0: - // sender has initiated, sends their ip address - otherIP = string(message) - log.Debugf("sender IP: %s", otherIP) - - // recipient begins by sending address - ip := "" - if isLocal { - ip = utils.LocalIP() - } else { - ip, _ = utils.PublicIP() - } - c.WriteMessage(websocket.BinaryMessage, []byte(ip)) - case 1: - - // Q receives u - log.Debugf("[%d] Q computes k, sends H(k), v back to P", step) - if err := Q.Update(message); err != nil { - return err - } - - // Q has the session key now, but we will still check if its valid - sessionKey, err = Q.SessionKey() - if err != nil { - return err - } - log.Debugf("%x\n", sessionKey) - - // initialize TCP connections if using (possible, but unlikely, race condition) - go func() { - if !useWebsockets { - log.Debugf("connecting to server") - tcpConnections = make([]comm.Comm, len(tcpPorts)) - for i, tcpPort := range tcpPorts { - log.Debugf("connecting to %d", i) - tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) - if err != nil { - log.Error(err) - } - } - log.Debugf("fully connected") - } - isConnectedIfUsingTCP <- true - }() - - c.WriteMessage(websocket.BinaryMessage, Q.Bytes()) - case 2: - log.Debugf("[%d] Q recieves H(k) from P", step) - // check if everything is still kosher with our computed session key - if err := Q.Update(message); err != nil { - return err - } - c.WriteMessage(websocket.BinaryMessage, []byte("ready")) - case 3: - spin.Stop() - - // unmarshal the file info - log.Debugf("[%d] recieve file info", step) - // do decryption on the file stats - enc, err := crypt.FromBytes(message) - if err != nil { - return err - } - decryptedFileData, err := enc.Decrypt(sessionKey) - if err != nil { - return err - } - err = json.Unmarshal(decryptedFileData, &fstats) - if err != nil { - return err - } - log.Debugf("got file stats: %+v", fstats) - - // determine if the file is resuming or not - progressFile = fmt.Sprintf("%s.progress", fstats.SentName) - overwritingOrReceiving := "Receiving" - if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { - overwritingOrReceiving = "Overwriting" - if utils.Exists(progressFile) { - overwritingOrReceiving = "Resume receiving" - resumeFile = true - } - } - - // send blocks - if resumeFile { - fileWithBlocks, _ := os.Open(progressFile) - scanner := bufio.NewScanner(fileWithBlocks) - for scanner.Scan() { - blocks = append(blocks, strings.TrimSpace(scanner.Text())) - } - fileWithBlocks.Close() - } - blocksBytes, _ := json.Marshal(blocks) - // encrypt the block data and send - encblockBytes := crypt.Encrypt(blocksBytes, sessionKey) - c.WriteMessage(websocket.BinaryMessage, encblockBytes.Bytes()) - - // prompt user about the file - fileOrFolder := "file" - if fstats.IsDir { - fileOrFolder = "folder" - } - fmt.Fprintf(os.Stderr, "\r%s %s (%s) into: %s\n", - overwritingOrReceiving, - fileOrFolder, - humanize.Bytes(uint64(fstats.Size)), - fstats.Name, - ) - if !noPrompt { - if "y" != utils.GetInput("ok? (y/N): ") { - fmt.Fprintf(os.Stderr, "cancelling request") - c.WriteMessage(websocket.BinaryMessage, []byte("no")) - return nil - } - } - - // await file - // erase file if overwriting - if overwritingOrReceiving == "Overwriting" { - os.Remove(fstats.SentName) - } - var f *os.File - if utils.Exists(fstats.SentName) && resumeFile { - if !useWebsockets { - f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) - } else { - f, err = os.OpenFile(fstats.SentName, os.O_APPEND|os.O_WRONLY, 0644) - } - if err != nil { - log.Error(err) - return err - } - } else { - f, err = os.Create(fstats.SentName) - if err != nil { - log.Error(err) - return err - } - if !useWebsockets { - if err = f.Truncate(fstats.Size); err != nil { - log.Error(err) - return err - } - } - } - - blockSize := 0 - if useWebsockets { - blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 - } else { - blockSize = models.TCP_BUFFER_SIZE / 2 - } - - // start the ui for pgoress - bytesWritten := 0 - fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) - bar := progressbar.NewOptions( - int(fstats.Size), - progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(fstats.Size)), - progressbar.OptionSetWriter(os.Stderr), - ) - bar.Add((len(blocks) * blockSize)) - finished := make(chan bool) - - go func(finished chan bool, dataChan chan []byte) (err error) { - // remove previous progress - var fProgress *os.File - var progressErr error - if resumeFile { - fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND|os.O_WRONLY, 0644) - bytesWritten = len(blocks) * blockSize - } else { - os.Remove(progressFile) - fProgress, progressErr = os.Create(progressFile) - } - if progressErr != nil { - panic(progressErr) - } - defer fProgress.Close() - - blocksWritten := 0.0 - blocksToWrite := float64(fstats.Size) - if useWebsockets { - blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) - } else { - blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) - } - for { - message := <-dataChan - // do decryption - var enc crypt.Encryption - err = json.Unmarshal(message, &enc) - if err != nil { - // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) - log.Error(err) - return err - } - decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) - if err != nil { - log.Error(err) - return err - } - - // get location if TCP - var locationToWrite int - if !useWebsockets { - pieces := bytes.SplitN(decrypted, []byte("-"), 2) - decrypted = pieces[1] - locationToWrite, _ = strconv.Atoi(string(pieces[0])) - } - - // do decompression - if fstats.IsCompressed && !fstats.IsDir { - decrypted = compress.Decompress(decrypted) - } - - var n int - if !useWebsockets { - if err != nil { - log.Error(err) - return err - } - n, err = f.WriteAt(decrypted, int64(locationToWrite)) - fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) - log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, locationToWrite, blocksWritten, blocksToWrite) - } else { - // write to file - n, err = f.Write(decrypted) - log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, bytesWritten, blocksWritten, blocksToWrite) - fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten)) - } - if err != nil { - log.Error(err) - return err - } - - // update the bytes written - bytesWritten += n - blocksWritten += 1.0 - // update the progress bar - bar.Add(n) - if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { - log.Debug("finished", int64(bytesWritten), fstats.Size, blocksWritten, blocksToWrite) - break - } - } - finished <- true - return - }(finished, dataChan) - - log.Debug("telling sender i'm ready") - c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) - - startTime := time.Now() - if useWebsockets { - for { - var messageType int - // read from websockets - messageType, message, err = c.ReadMessage() - if messageType != websocket.BinaryMessage { - continue - } - if err != nil { - log.Error(err) - return err - } - if bytes.Equal(message, []byte("magic")) { - log.Debug("got magic") - break - } - dataChan <- message - // select { - // case dataChan <- message: - // default: - // log.Debug("blocked") - // // no message sent - // // block - // dataChan <- message - // } - } - } else { - _ = <-isConnectedIfUsingTCP - log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) - // using TCP - var wg sync.WaitGroup - wg.Add(len(tcpConnections)) - for i := range tcpConnections { - defer func(i int) { - log.Debugf("closing connection %d", i) - tcpConnections[i].Close() - }(i) - go func(wg *sync.WaitGroup, j int) { - defer wg.Done() - for { - log.Debugf("waiting to read on %d", j) - // read from TCP connection - message, _, _, err := tcpConnections[j].Read() - // log.Debugf("message: %s", message) - if err != nil { - panic(err) - } - if bytes.Equal(message, []byte("magic")) { - log.Debugf("%d got magic, leaving", j) - return - } - dataChan <- message - } - }(&wg, i) - } - wg.Wait() - } - - _ = <-finished - log.Debug("telling sender i'm done") - c.WriteMessage(websocket.BinaryMessage, []byte("done")) - // we are finished - transferTime = time.Since(startTime) - - // close file - err = f.Close() - if err != nil { - return err - } - - // finish bar - bar.Finish() - - // check hash - hash256, err = utils.HashFile(fstats.SentName) - if err != nil { - log.Error(err) - return err - } - // tell the sender the hash so they can quit - c.WriteMessage(websocket.BinaryMessage, append([]byte("hash:"), hash256...)) - case 4: - // receive the hash from the sender so we can check it and quit - log.Debugf("got hash: %x", message) - if bytes.Equal(hash256, message) { - // open directory - if fstats.IsDir { - err = zipper.UnzipFile(fstats.SentName, ".") - if DebugLevel != "debug" { - os.Remove(fstats.SentName) - } - } else { - err = nil - } - if err == nil { - if useStdout && !fstats.IsDir { - var bFile []byte - bFile, err = ioutil.ReadFile(fstats.SentName) - if err != nil { - return err - } - os.Stdout.Write(bFile) - os.Remove(fstats.SentName) - } - transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() - transferType := "MB/s" - if transferRate < 1 { - transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() - transferType = "kB/s" - } - folderOrFile := "file" - if fstats.IsDir { - folderOrFile = "folder" - } - if useStdout { - fstats.Name = "stdout" - } - fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) - os.Remove(progressFile) - } - return err - } else { - if DebugLevel != "debug" { - log.Debug("removing corrupted file") - os.Remove(fstats.SentName) - } - return errors.New("file corrupted") - } - default: - return fmt.Errorf("unknown step") - } - step++ - } -} - -func connectToTCPServer(room string, address string) (com comm.Comm, err error) { - log.Debugf("recipient connecting to %s", address) - connection, err := net.Dial("tcp", address) - if err != nil { - return - } - connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) - connection.SetDeadline(time.Now().Add(3 * time.Hour)) - connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) - - com = comm.New(connection) - log.Debug("waiting for server contact") - ok, err := com.Receive() - if err != nil { - return - } - log.Debugf("[%s] server says: %s", address, ok) - - err = com.Send(room) - if err != nil { - return - } - ok, err = com.Receive() - log.Debugf("[%s] server says: %s", address, ok) - if err != nil { - return - } - if ok != "recipient" { - err = errors.New(ok) - } - return -} diff --git a/src/sender/sender.go b/src/sender/sender.go deleted file mode 100644 index 7e10af8a..00000000 --- a/src/sender/sender.go +++ /dev/null @@ -1,491 +0,0 @@ -package sender - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net" - "os" - "path/filepath" - "strconv" - "strings" - "sync" - "time" - - log "github.com/cihub/seelog" - "github.com/gorilla/websocket" - "github.com/pkg/errors" - "github.com/schollz/croc/src/comm" - "github.com/schollz/croc/src/compress" - "github.com/schollz/croc/src/crypt" - "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/models" - "github.com/schollz/croc/src/utils" - "github.com/schollz/croc/src/zipper" - "github.com/schollz/pake" - "github.com/schollz/progressbar/v2" - "github.com/schollz/spinner" - "github.com/tscholl2/siec" -) - -var DebugLevel string - -// Send is the async call to send data -func Send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { - logger.SetLogLevel(DebugLevel) - log.Debugf("sending %s", fname) - err := send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption) - if err != nil { - if !strings.HasPrefix(err.Error(), "websocket: close 100") { - fmt.Fprintf(os.Stderr, "\n"+err.Error()) - } - } - - done <- struct{}{} -} - -func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { - var f *os.File - defer f.Close() // ignore the error if it wasn't opened :( - var fstats models.FileStats - var fileHash []byte - var otherIP string - var startTransfer time.Time - var tcpConnections []comm.Comm - blocksToSkip := make(map[int64]struct{}) - isConnectedIfUsingTCP := make(chan bool) - - type DataChan struct { - b []byte - currentPostition int64 - bytesRead int - err error - } - dataChan := make(chan DataChan, 1024*1024) - defer close(dataChan) - - useWebsockets := true - switch forceSend { - case 0: - if !isLocal { - useWebsockets = false - } - case 1: - useWebsockets = true - case 2: - useWebsockets = false - } - - fileReady := make(chan error) - - // normalize the file name - fname, err = filepath.Abs(fname) - if err != nil { - return err - } - _, filename := filepath.Split(fname) - - // get ready to generate session key - var sessionKey []byte - - // start a spinner - spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) - spin.Writer = os.Stderr - - // pick an elliptic curve - curve := siec.SIEC255() - // both parties should have a weak key - pw := []byte(codephrase) - // initialize sender P ("0" indicates sender) - P, err := pake.Init(pw, 0, curve, 1*time.Millisecond) - if err != nil { - return - } - - step := 0 - for { - messageType, message, errRead := c.ReadMessage() - if errRead != nil { - return errRead - } - if messageType == websocket.PongMessage || messageType == websocket.PingMessage { - continue - } - if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { - return errors.New("\rinterrupted by other party") - } - log.Debugf("got %d: %s", messageType, message) - switch step { - case 0: - // sender initiates communication - ip := "" - if isLocal { - ip = utils.LocalIP() - } else { - ip, _ = utils.PublicIP() - } - // send my IP address - c.WriteMessage(websocket.BinaryMessage, []byte(ip)) - case 1: - // first receive the IP address from the sender - otherIP = string(message) - log.Debugf("recipient IP: %s", otherIP) - - go func() { - // recipient might want file! start gathering information about file - fstat, err := os.Stat(fname) - if err != nil { - fileReady <- err - return - } - fstats = models.FileStats{ - Name: filename, - Size: fstat.Size(), - ModTime: fstat.ModTime(), - IsDir: fstat.IsDir(), - SentName: fstat.Name(), - IsCompressed: useCompression, - IsEncrypted: useEncryption, - } - if fstats.IsDir { - // zip the directory - fstats.SentName, err = zipper.ZipFile(fname, true) - if err != nil { - log.Error(err) - fileReady <- err - return - } - fname = fstats.SentName - - fstat, err := os.Stat(fname) - if err != nil { - fileReady <- err - return - } - // get new size - fstats.Size = fstat.Size() - } - - // open the file - f, err = os.Open(fname) - if err != nil { - fileReady <- err - return - } - fileReady <- nil - - }() - - // send pake data - log.Debugf("[%d] first, P sends u to Q", step) - c.WriteMessage(websocket.BinaryMessage, P.Bytes()) - // start PAKE spinnner - spin.Suffix = " performing PAKE..." - spin.Start() - case 2: - // P recieves H(k),v from Q - log.Debugf("[%d] P computes k, H(k), sends H(k) to Q", step) - if err := P.Update(message); err != nil { - return err - } - c.WriteMessage(websocket.BinaryMessage, P.Bytes()) - sessionKey, _ = P.SessionKey() - // check(err) - log.Debugf("%x\n", sessionKey) - - // wait for readiness - spin.Stop() - spin.Suffix = " waiting for recipient ok..." - spin.Start() - case 3: - log.Debugf("[%d] recipient declares readiness for file info", step) - if !bytes.HasPrefix(message, []byte("ready")) { - return errors.New("recipient refused file") - } - - // connect to TCP in background - tcpConnections = make([]comm.Comm, len(tcpPorts)) - go func() { - if !useWebsockets { - log.Debugf("connecting to server") - for i, tcpPort := range tcpPorts { - log.Debugf("connecting to %s on connection %d", tcpPort, i) - tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) - if err != nil { - log.Error(err) - } - } - } - isConnectedIfUsingTCP <- true - }() - - err = <-fileReady // block until file is ready - if err != nil { - return err - } - fstatsBytes, err := json.Marshal(fstats) - if err != nil { - return err - } - - // encrypt the file meta data - enc := crypt.Encrypt(fstatsBytes, sessionKey) - // send the file meta data - c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) - case 4: - log.Debugf("[%d] recipient declares gives blocks", step) - // recipient sends blocks, and sender does not send anything back - // determine if any blocks were sent to skip - enc, err := crypt.FromBytes(message) - if err != nil { - log.Error(err) - return err - } - decrypted, err := enc.Decrypt(sessionKey) - if err != nil { - err = errors.Wrap(err, "could not decrypt blocks with session key") - log.Error(err) - return err - } - - var blocks []string - errBlocks := json.Unmarshal(decrypted, &blocks) - if errBlocks == nil { - for _, block := range blocks { - blockInt64, errBlock := strconv.Atoi(block) - if errBlock == nil { - blocksToSkip[int64(blockInt64)] = struct{}{} - } - } - } - log.Debugf("found blocks: %+v", blocksToSkip) - - // start loading the file into memory - // start streaming encryption/compression - if fstats.IsDir { - // remove file if zipped - defer os.Remove(fstats.SentName) - } - go func(dataChan chan DataChan) { - var buffer []byte - if useWebsockets { - buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) - } else { - buffer = make([]byte, models.TCP_BUFFER_SIZE/2) - } - - currentPostition := int64(0) - for { - bytesread, err := f.Read(buffer) - if bytesread > 0 { - if _, ok := blocksToSkip[currentPostition]; ok { - log.Debugf("skipping the sending of block %d", currentPostition) - currentPostition += int64(bytesread) - continue - } - - // do compression - var compressedBytes []byte - if useCompression && !fstats.IsDir { - compressedBytes = compress.Compress(buffer[:bytesread]) - } else { - compressedBytes = buffer[:bytesread] - } - - // if using TCP, prepend the location to write the data to in the resulting file - if !useWebsockets { - compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) - } - - // do encryption - enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) - encBytes, err := json.Marshal(enc) - if err != nil { - dataChan <- DataChan{ - b: nil, - bytesRead: 0, - err: err, - } - return - } - - dataChan <- DataChan{ - b: encBytes, - bytesRead: bytesread, - err: nil, - } - currentPostition += int64(bytesread) - } - if err != nil { - if err != io.EOF { - log.Error(err) - } - break - } - } - // finish - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - if !useWebsockets { - log.Debug("sending extra magic to %d others", len(tcpPorts)-1) - for i := 0; i < len(tcpPorts)-1; i++ { - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - } - } - }(dataChan) - - case 5: - spin.Stop() - - log.Debugf("[%d] recipient declares readiness for file data", step) - if !bytes.HasPrefix(message, []byte("ready")) { - return errors.New("recipient refused file") - } - - fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) - // send file, compure hash simultaneously - startTransfer = time.Now() - - blockSize := 0 - if useWebsockets { - blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 - } else { - blockSize = models.TCP_BUFFER_SIZE / 2 - } - bar := progressbar.NewOptions( - int(fstats.Size), - progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(fstats.Size)), - progressbar.OptionSetWriter(os.Stderr), - ) - bar.Add(blockSize * len(blocksToSkip)) - - if useWebsockets { - for { - data := <-dataChan - if data.err != nil { - return data.err - } - bar.Add(data.bytesRead) - // write data to websockets - err = c.WriteMessage(websocket.BinaryMessage, data.b) - if err != nil { - err = errors.Wrap(err, "problem writing message") - return err - } - if bytes.Equal(data.b, []byte("magic")) { - break - } - } - } else { - _ = <-isConnectedIfUsingTCP - log.Debug("connected and ready to send on tcp") - var wg sync.WaitGroup - wg.Add(len(tcpConnections)) - for i := range tcpConnections { - defer func(i int) { - log.Debugf("closing connection %d", i) - tcpConnections[i].Close() - }(i) - go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan) { - defer wg.Done() - for data := range dataChan { - if data.err != nil { - log.Error(data.err) - return - } - bar.Add(data.bytesRead) - // write data to tcp connection - _, err = tcpConnections[i].Write(data.b) - if err != nil { - err = errors.Wrap(err, "problem writing message") - log.Error(err) - return - } - if bytes.Equal(data.b, []byte("magic")) { - log.Debugf("%d got magic", i) - return - } - } - }(i, &wg, dataChan) - } - wg.Wait() - } - - bar.Finish() - log.Debug("send hash to finish file") - fileHash, err = utils.HashFile(fname) - if err != nil { - return err - } - case 6: - // recevied something, maybe the file hash - transferTime := time.Since(startTransfer) - if !bytes.HasPrefix(message, []byte("hash:")) { - log.Debugf("%s", message) - continue - } - c.WriteMessage(websocket.BinaryMessage, fileHash) - message = bytes.TrimPrefix(message, []byte("hash:")) - log.Debugf("[%d] determing whether it went ok", step) - if bytes.Equal(message, fileHash) { - log.Debug("file transfered successfully") - transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() - transferType := "MB/s" - if transferRate < 1 { - transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() - transferType = "kB/s" - } - fmt.Fprintf(os.Stderr, "\nTransfer complete (%2.1f %s)", transferRate, transferType) - return nil - } else { - fmt.Fprintf(os.Stderr, "\nTransfer corrupted") - return errors.New("file not transfered succesfully") - } - default: - return fmt.Errorf("unknown step") - } - step++ - } -} - -func connectToTCPServer(room string, address string) (com comm.Comm, err error) { - connection, err := net.DialTimeout("tcp", address, 3*time.Hour) - if err != nil { - return - } - connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) - connection.SetDeadline(time.Now().Add(3 * time.Hour)) - connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) - - com = comm.New(connection) - ok, err := com.Receive() - if err != nil { - return - } - log.Debugf("server says: %s", ok) - - err = com.Send(room) - if err != nil { - return - } - ok, err = com.Receive() - log.Debugf("server says: %s", ok) - if err != nil { - return - } - if ok != "sender" { - err = errors.New(ok) - } - return -} From 3bed0bc8bf8b29038587d14f0b73330ffb5faa58 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Sun, 14 Oct 2018 17:10:08 -0700 Subject: [PATCH 2/8] move files here --- src/croc/recipient.go | 497 ++++++++++++++++++++++++++++++++++++++++++ src/croc/sender.go | 487 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 984 insertions(+) create mode 100644 src/croc/recipient.go create mode 100644 src/croc/sender.go diff --git a/src/croc/recipient.go b/src/croc/recipient.go new file mode 100644 index 00000000..84b22071 --- /dev/null +++ b/src/croc/recipient.go @@ -0,0 +1,497 @@ +package croc + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "strconv" + "strings" + "sync" + "time" + + humanize "github.com/dustin/go-humanize" + + log "github.com/cihub/seelog" + "github.com/gorilla/websocket" + "github.com/schollz/croc/src/comm" + "github.com/schollz/croc/src/compress" + "github.com/schollz/croc/src/crypt" + "github.com/schollz/croc/src/logger" + "github.com/schollz/croc/src/models" + "github.com/schollz/croc/src/utils" + "github.com/schollz/croc/src/zipper" + "github.com/schollz/pake" + "github.com/schollz/progressbar/v2" + "github.com/schollz/spinner" + "github.com/tscholl2/siec" +) + +var DebugLevel string + +// Receive is the async operation to receive a file +func (cr *Croc) startRecipient(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { + logger.SetLogLevel(DebugLevel) + err := cr.receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout) + if err != nil { + if !strings.HasPrefix(err.Error(), "websocket: close 100") { + fmt.Fprintf(os.Stderr, "\n"+err.Error()) + } + } + done <- struct{}{} +} + +func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { + var fstats models.FileStats + var sessionKey []byte + var transferTime time.Duration + var hash256 []byte + var otherIP string + var progressFile string + var resumeFile bool + var tcpConnections []comm.Comm + dataChan := make(chan []byte, 1024*1024) + isConnectedIfUsingTCP := make(chan bool) + blocks := []string{} + + useWebsockets := true + switch forceSend { + case 0: + if !isLocal { + useWebsockets = false + } + case 1: + useWebsockets = true + case 2: + useWebsockets = false + } + + // start a spinner + spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) + spin.Writer = os.Stderr + spin.Suffix = " performing PAKE..." + spin.Start() + + // pick an elliptic curve + curve := siec.SIEC255() + // both parties should have a weak key + pw := []byte(codephrase) + + // initialize recipient Q ("1" indicates recipient) + Q, err := pake.Init(pw, 1, curve, 1*time.Millisecond) + if err != nil { + return + } + + step := 0 + for { + messageType, message, err := c.ReadMessage() + if err != nil { + return err + } + if messageType == websocket.PongMessage || messageType == websocket.PingMessage { + continue + } + if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { + return errors.New("\rinterrupted by other party") + } + + log.Debugf("got %d: %s", messageType, message) + switch step { + case 0: + // sender has initiated, sends their ip address + otherIP = string(message) + log.Debugf("sender IP: %s", otherIP) + + // recipient begins by sending address + ip := "" + if isLocal { + ip = utils.LocalIP() + } else { + ip, _ = utils.PublicIP() + } + c.WriteMessage(websocket.BinaryMessage, []byte(ip)) + case 1: + + // Q receives u + log.Debugf("[%d] Q computes k, sends H(k), v back to P", step) + if err := Q.Update(message); err != nil { + return err + } + + // Q has the session key now, but we will still check if its valid + sessionKey, err = Q.SessionKey() + if err != nil { + return err + } + log.Debugf("%x\n", sessionKey) + + // initialize TCP connections if using (possible, but unlikely, race condition) + go func() { + if !useWebsockets { + log.Debugf("connecting to server") + tcpConnections = make([]comm.Comm, len(tcpPorts)) + for i, tcpPort := range tcpPorts { + log.Debugf("connecting to %d", i) + var message string + tcpConnections[i], message, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + } + if message != "recipient" { + log.Errorf("got wrong message: %s", message) + } + } + log.Debugf("fully connected") + } + isConnectedIfUsingTCP <- true + }() + + c.WriteMessage(websocket.BinaryMessage, Q.Bytes()) + case 2: + log.Debugf("[%d] Q recieves H(k) from P", step) + // check if everything is still kosher with our computed session key + if err := Q.Update(message); err != nil { + return err + } + c.WriteMessage(websocket.BinaryMessage, []byte("ready")) + case 3: + spin.Stop() + + // unmarshal the file info + log.Debugf("[%d] recieve file info", step) + // do decryption on the file stats + enc, err := crypt.FromBytes(message) + if err != nil { + return err + } + decryptedFileData, err := enc.Decrypt(sessionKey) + if err != nil { + return err + } + err = json.Unmarshal(decryptedFileData, &fstats) + if err != nil { + return err + } + log.Debugf("got file stats: %+v", fstats) + + // determine if the file is resuming or not + progressFile = fmt.Sprintf("%s.progress", fstats.SentName) + overwritingOrReceiving := "Receiving" + if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { + overwritingOrReceiving = "Overwriting" + if utils.Exists(progressFile) { + overwritingOrReceiving = "Resume receiving" + resumeFile = true + } + } + + // send blocks + if resumeFile { + fileWithBlocks, _ := os.Open(progressFile) + scanner := bufio.NewScanner(fileWithBlocks) + for scanner.Scan() { + blocks = append(blocks, strings.TrimSpace(scanner.Text())) + } + fileWithBlocks.Close() + } + blocksBytes, _ := json.Marshal(blocks) + // encrypt the block data and send + encblockBytes := crypt.Encrypt(blocksBytes, sessionKey) + c.WriteMessage(websocket.BinaryMessage, encblockBytes.Bytes()) + + // prompt user about the file + fileOrFolder := "file" + if fstats.IsDir { + fileOrFolder = "folder" + } + fmt.Fprintf(os.Stderr, "\r%s %s (%s) into: %s\n", + overwritingOrReceiving, + fileOrFolder, + humanize.Bytes(uint64(fstats.Size)), + fstats.Name, + ) + if !noPrompt { + if "y" != utils.GetInput("ok? (y/N): ") { + fmt.Fprintf(os.Stderr, "cancelling request") + c.WriteMessage(websocket.BinaryMessage, []byte("no")) + return nil + } + } + + // await file + // erase file if overwriting + if overwritingOrReceiving == "Overwriting" { + os.Remove(fstats.SentName) + } + var f *os.File + if utils.Exists(fstats.SentName) && resumeFile { + if !useWebsockets { + f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + } else { + f, err = os.OpenFile(fstats.SentName, os.O_APPEND|os.O_WRONLY, 0644) + } + if err != nil { + log.Error(err) + return err + } + } else { + f, err = os.Create(fstats.SentName) + if err != nil { + log.Error(err) + return err + } + if !useWebsockets { + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } + } + } + + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } + + // start the ui for pgoress + bytesWritten := 0 + fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) + bar := progressbar.NewOptions( + int(fstats.Size), + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionSetBytes(int(fstats.Size)), + progressbar.OptionSetWriter(os.Stderr), + ) + bar.Add((len(blocks) * blockSize)) + finished := make(chan bool) + + go func(finished chan bool, dataChan chan []byte) (err error) { + // remove previous progress + var fProgress *os.File + var progressErr error + if resumeFile { + fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND|os.O_WRONLY, 0644) + bytesWritten = len(blocks) * blockSize + } else { + os.Remove(progressFile) + fProgress, progressErr = os.Create(progressFile) + } + if progressErr != nil { + panic(progressErr) + } + defer fProgress.Close() + + blocksWritten := 0.0 + blocksToWrite := float64(fstats.Size) + if useWebsockets { + blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) + } else { + blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) + } + for { + message := <-dataChan + // do decryption + var enc crypt.Encryption + err = json.Unmarshal(message, &enc) + if err != nil { + // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) + if err != nil { + log.Error(err) + return err + } + + // get location if TCP + var locationToWrite int + if !useWebsockets { + pieces := bytes.SplitN(decrypted, []byte("-"), 2) + decrypted = pieces[1] + locationToWrite, _ = strconv.Atoi(string(pieces[0])) + } + + // do decompression + if fstats.IsCompressed && !fstats.IsDir { + decrypted = compress.Decompress(decrypted) + } + + var n int + if !useWebsockets { + if err != nil { + log.Error(err) + return err + } + n, err = f.WriteAt(decrypted, int64(locationToWrite)) + fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) + log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, locationToWrite, blocksWritten, blocksToWrite) + } else { + // write to file + n, err = f.Write(decrypted) + log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, bytesWritten, blocksWritten, blocksToWrite) + fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten)) + } + if err != nil { + log.Error(err) + return err + } + + // update the bytes written + bytesWritten += n + blocksWritten += 1.0 + // update the progress bar + bar.Add(n) + if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { + log.Debug("finished", int64(bytesWritten), fstats.Size, blocksWritten, blocksToWrite) + break + } + } + finished <- true + return + }(finished, dataChan) + + log.Debug("telling sender i'm ready") + c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) + + startTime := time.Now() + if useWebsockets { + for { + var messageType int + // read from websockets + messageType, message, err = c.ReadMessage() + if messageType != websocket.BinaryMessage { + continue + } + if err != nil { + log.Error(err) + return err + } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + break + } + dataChan <- message + // select { + // case dataChan <- message: + // default: + // log.Debug("blocked") + // // no message sent + // // block + // dataChan <- message + // } + } + } else { + _ = <-isConnectedIfUsingTCP + log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) + // using TCP + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) + for i := range tcpConnections { + defer func(i int) { + log.Debugf("closing connection %d", i) + tcpConnections[i].Close() + }(i) + go func(wg *sync.WaitGroup, j int) { + defer wg.Done() + for { + log.Debugf("waiting to read on %d", j) + // read from TCP connection + message, _, _, err := tcpConnections[j].Read() + // log.Debugf("message: %s", message) + if err != nil { + panic(err) + } + if bytes.Equal(message, []byte("magic")) { + log.Debugf("%d got magic, leaving", j) + return + } + dataChan <- message + } + }(&wg, i) + } + wg.Wait() + } + + _ = <-finished + log.Debug("telling sender i'm done") + c.WriteMessage(websocket.BinaryMessage, []byte("done")) + // we are finished + transferTime = time.Since(startTime) + + // close file + err = f.Close() + if err != nil { + return err + } + + // finish bar + bar.Finish() + + // check hash + hash256, err = utils.HashFile(fstats.SentName) + if err != nil { + log.Error(err) + return err + } + // tell the sender the hash so they can quit + c.WriteMessage(websocket.BinaryMessage, append([]byte("hash:"), hash256...)) + case 4: + // receive the hash from the sender so we can check it and quit + log.Debugf("got hash: %x", message) + if bytes.Equal(hash256, message) { + // open directory + if fstats.IsDir { + err = zipper.UnzipFile(fstats.SentName, ".") + if DebugLevel != "debug" { + os.Remove(fstats.SentName) + } + } else { + err = nil + } + if err == nil { + if useStdout && !fstats.IsDir { + var bFile []byte + bFile, err = ioutil.ReadFile(fstats.SentName) + if err != nil { + return err + } + os.Stdout.Write(bFile) + os.Remove(fstats.SentName) + } + transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() + transferType := "MB/s" + if transferRate < 1 { + transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() + transferType = "kB/s" + } + folderOrFile := "file" + if fstats.IsDir { + folderOrFile = "folder" + } + if useStdout { + fstats.Name = "stdout" + } + fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) + os.Remove(progressFile) + } + return err + } else { + if DebugLevel != "debug" { + log.Debug("removing corrupted file") + os.Remove(fstats.SentName) + } + return errors.New("file corrupted") + } + default: + return fmt.Errorf("unknown step") + } + step++ + } +} diff --git a/src/croc/sender.go b/src/croc/sender.go new file mode 100644 index 00000000..75831b96 --- /dev/null +++ b/src/croc/sender.go @@ -0,0 +1,487 @@ +package croc + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + log "github.com/cihub/seelog" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/schollz/croc/src/comm" + "github.com/schollz/croc/src/compress" + "github.com/schollz/croc/src/crypt" + "github.com/schollz/croc/src/logger" + "github.com/schollz/croc/src/models" + "github.com/schollz/croc/src/utils" + "github.com/schollz/croc/src/zipper" + "github.com/schollz/pake" + "github.com/schollz/progressbar/v2" + "github.com/schollz/spinner" + "github.com/tscholl2/siec" +) + +// Send is the async call to send data +func (cr *Croc) startSender(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { + logger.SetLogLevel(DebugLevel) + log.Debugf("sending %s", fname) + err := cr.send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption) + if err != nil { + if !strings.HasPrefix(err.Error(), "websocket: close 100") { + fmt.Fprintf(os.Stderr, "\n"+err.Error()) + } + } + + done <- struct{}{} +} + +func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { + var f *os.File + defer f.Close() // ignore the error if it wasn't opened :( + var fstats models.FileStats + var fileHash []byte + var otherIP string + var startTransfer time.Time + var tcpConnections []comm.Comm + blocksToSkip := make(map[int64]struct{}) + isConnectedIfUsingTCP := make(chan bool) + + type DataChan struct { + b []byte + currentPostition int64 + bytesRead int + err error + } + dataChan := make(chan DataChan, 1024*1024) + defer close(dataChan) + + useWebsockets := true + switch forceSend { + case 0: + if !isLocal { + useWebsockets = false + } + case 1: + useWebsockets = true + case 2: + useWebsockets = false + } + + fileReady := make(chan error) + + // normalize the file name + fname, err = filepath.Abs(fname) + if err != nil { + return err + } + _, filename := filepath.Split(fname) + + // get ready to generate session key + var sessionKey []byte + + // start a spinner + spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) + spin.Writer = os.Stderr + + // pick an elliptic curve + curve := siec.SIEC255() + // both parties should have a weak key + pw := []byte(codephrase) + // initialize sender P ("0" indicates sender) + P, err := pake.Init(pw, 0, curve, 1*time.Millisecond) + if err != nil { + return + } + + step := 0 + for { + messageType, message, errRead := c.ReadMessage() + if errRead != nil { + return errRead + } + if messageType == websocket.PongMessage || messageType == websocket.PingMessage { + continue + } + if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { + return errors.New("\rinterrupted by other party") + } + log.Debugf("got %d: %s", messageType, message) + switch step { + case 0: + // sender initiates communication + ip := "" + if isLocal { + ip = utils.LocalIP() + } else { + ip, _ = utils.PublicIP() + } + // send my IP address + c.WriteMessage(websocket.BinaryMessage, []byte(ip)) + case 1: + // first receive the IP address from the sender + otherIP = string(message) + log.Debugf("recipient IP: %s", otherIP) + + go func() { + // recipient might want file! start gathering information about file + fstat, err := os.Stat(fname) + if err != nil { + fileReady <- err + return + } + fstats = models.FileStats{ + Name: filename, + Size: fstat.Size(), + ModTime: fstat.ModTime(), + IsDir: fstat.IsDir(), + SentName: fstat.Name(), + IsCompressed: useCompression, + IsEncrypted: useEncryption, + } + if fstats.IsDir { + // zip the directory + fstats.SentName, err = zipper.ZipFile(fname, true) + if err != nil { + log.Error(err) + fileReady <- err + return + } + fname = fstats.SentName + + fstat, err := os.Stat(fname) + if err != nil { + fileReady <- err + return + } + // get new size + fstats.Size = fstat.Size() + } + + // open the file + f, err = os.Open(fname) + if err != nil { + fileReady <- err + return + } + fileReady <- nil + + }() + + // send pake data + log.Debugf("[%d] first, P sends u to Q", step) + c.WriteMessage(websocket.BinaryMessage, P.Bytes()) + // start PAKE spinnner + spin.Suffix = " performing PAKE..." + spin.Start() + case 2: + // P recieves H(k),v from Q + log.Debugf("[%d] P computes k, H(k), sends H(k) to Q", step) + if err := P.Update(message); err != nil { + return err + } + c.WriteMessage(websocket.BinaryMessage, P.Bytes()) + sessionKey, _ = P.SessionKey() + // check(err) + log.Debugf("%x\n", sessionKey) + + // wait for readiness + spin.Stop() + spin.Suffix = " waiting for recipient ok..." + spin.Start() + case 3: + log.Debugf("[%d] recipient declares readiness for file info", step) + if !bytes.HasPrefix(message, []byte("ready")) { + return errors.New("recipient refused file") + } + + // connect to TCP in background + tcpConnections = make([]comm.Comm, len(tcpPorts)) + go func() { + if !useWebsockets { + log.Debugf("connecting to server") + for i, tcpPort := range tcpPorts { + log.Debugf("connecting to %s on connection %d", tcpPort, i) + var message string + tcpConnections[i], message, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + } + if message != "sender" { + log.Errorf("got wrong message: %s", message) + } + } + } + isConnectedIfUsingTCP <- true + }() + + err = <-fileReady // block until file is ready + if err != nil { + return err + } + fstatsBytes, err := json.Marshal(fstats) + if err != nil { + return err + } + + // encrypt the file meta data + enc := crypt.Encrypt(fstatsBytes, sessionKey) + // send the file meta data + c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) + case 4: + log.Debugf("[%d] recipient declares gives blocks", step) + // recipient sends blocks, and sender does not send anything back + // determine if any blocks were sent to skip + enc, err := crypt.FromBytes(message) + if err != nil { + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey) + if err != nil { + err = errors.Wrap(err, "could not decrypt blocks with session key") + log.Error(err) + return err + } + + var blocks []string + errBlocks := json.Unmarshal(decrypted, &blocks) + if errBlocks == nil { + for _, block := range blocks { + blockInt64, errBlock := strconv.Atoi(block) + if errBlock == nil { + blocksToSkip[int64(blockInt64)] = struct{}{} + } + } + } + log.Debugf("found blocks: %+v", blocksToSkip) + + // start loading the file into memory + // start streaming encryption/compression + if fstats.IsDir { + // remove file if zipped + defer os.Remove(fstats.SentName) + } + go func(dataChan chan DataChan) { + var buffer []byte + if useWebsockets { + buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) + } else { + buffer = make([]byte, models.TCP_BUFFER_SIZE/2) + } + + currentPostition := int64(0) + for { + bytesread, err := f.Read(buffer) + if bytesread > 0 { + if _, ok := blocksToSkip[currentPostition]; ok { + log.Debugf("skipping the sending of block %d", currentPostition) + currentPostition += int64(bytesread) + continue + } + + // do compression + var compressedBytes []byte + if useCompression && !fstats.IsDir { + compressedBytes = compress.Compress(buffer[:bytesread]) + } else { + compressedBytes = buffer[:bytesread] + } + + // if using TCP, prepend the location to write the data to in the resulting file + if !useWebsockets { + compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) + } + + // do encryption + enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) + encBytes, err := json.Marshal(enc) + if err != nil { + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: err, + } + return + } + + dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + } + currentPostition += int64(bytesread) + } + if err != nil { + if err != io.EOF { + log.Error(err) + } + break + } + } + // finish + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + if !useWebsockets { + log.Debug("sending extra magic to %d others", len(tcpPorts)-1) + for i := 0; i < len(tcpPorts)-1; i++ { + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + } + } + }(dataChan) + + case 5: + spin.Stop() + + log.Debugf("[%d] recipient declares readiness for file data", step) + if !bytes.HasPrefix(message, []byte("ready")) { + return errors.New("recipient refused file") + } + + fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) + // send file, compure hash simultaneously + startTransfer = time.Now() + + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } + bar := progressbar.NewOptions( + int(fstats.Size), + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionSetBytes(int(fstats.Size)), + progressbar.OptionSetWriter(os.Stderr), + ) + bar.Add(blockSize * len(blocksToSkip)) + + if useWebsockets { + for { + data := <-dataChan + if data.err != nil { + return data.err + } + bar.Add(data.bytesRead) + // write data to websockets + err = c.WriteMessage(websocket.BinaryMessage, data.b) + if err != nil { + err = errors.Wrap(err, "problem writing message") + return err + } + if bytes.Equal(data.b, []byte("magic")) { + break + } + } + } else { + _ = <-isConnectedIfUsingTCP + log.Debug("connected and ready to send on tcp") + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) + for i := range tcpConnections { + defer func(i int) { + log.Debugf("closing connection %d", i) + tcpConnections[i].Close() + }(i) + go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan) { + defer wg.Done() + for data := range dataChan { + if data.err != nil { + log.Error(data.err) + return + } + bar.Add(data.bytesRead) + // write data to tcp connection + _, err = tcpConnections[i].Write(data.b) + if err != nil { + err = errors.Wrap(err, "problem writing message") + log.Error(err) + return + } + if bytes.Equal(data.b, []byte("magic")) { + log.Debugf("%d got magic", i) + return + } + } + }(i, &wg, dataChan) + } + wg.Wait() + } + + bar.Finish() + log.Debug("send hash to finish file") + fileHash, err = utils.HashFile(fname) + if err != nil { + return err + } + case 6: + // recevied something, maybe the file hash + transferTime := time.Since(startTransfer) + if !bytes.HasPrefix(message, []byte("hash:")) { + log.Debugf("%s", message) + continue + } + c.WriteMessage(websocket.BinaryMessage, fileHash) + message = bytes.TrimPrefix(message, []byte("hash:")) + log.Debugf("[%d] determing whether it went ok", step) + if bytes.Equal(message, fileHash) { + log.Debug("file transfered successfully") + transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() + transferType := "MB/s" + if transferRate < 1 { + transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() + transferType = "kB/s" + } + fmt.Fprintf(os.Stderr, "\nTransfer complete (%2.1f %s)", transferRate, transferType) + return nil + } else { + fmt.Fprintf(os.Stderr, "\nTransfer corrupted") + return errors.New("file not transfered succesfully") + } + default: + return fmt.Errorf("unknown step") + } + step++ + } +} + +func connectToTCPServer(room string, address string) (com comm.Comm, message string, err error) { + connection, err := net.DialTimeout("tcp", address, 3*time.Hour) + if err != nil { + return + } + connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) + connection.SetDeadline(time.Now().Add(3 * time.Hour)) + connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) + + com = comm.New(connection) + ok, err := com.Receive() + if err != nil { + return + } + log.Debugf("server says: %s", ok) + + err = com.Send(room) + if err != nil { + return + } + message, err = com.Receive() + log.Debugf("server says: %s", message) + return +} From 55d9137b6dd3880b2b20c7fd2af3cd220db59158 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Sun, 14 Oct 2018 18:26:18 -0700 Subject: [PATCH 3/8] add state --- src/croc/croc.go | 2 ++ src/croc/sender.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/src/croc/croc.go b/src/croc/croc.go index 2b9c8a57..144115f7 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -50,6 +50,8 @@ type Croc struct { // is using local relay isLocal bool normalFinish bool + + State string } // Init will initiate with the default parameters diff --git a/src/croc/sender.go b/src/croc/sender.go index 75831b96..8cce512e 100644 --- a/src/croc/sender.go +++ b/src/croc/sender.go @@ -180,6 +180,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL c.WriteMessage(websocket.BinaryMessage, P.Bytes()) // start PAKE spinnner spin.Suffix = " performing PAKE..." + cr.State = "Performing PAKE..." spin.Start() case 2: // P recieves H(k),v from Q @@ -195,6 +196,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL // wait for readiness spin.Stop() spin.Suffix = " waiting for recipient ok..." + cr.State = "Waiting for recipient ok...." spin.Start() case 3: log.Debugf("[%d] recipient declares readiness for file info", step) @@ -379,6 +381,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL return data.err } bar.Add(data.bytesRead) + // write data to websockets err = c.WriteMessage(websocket.BinaryMessage, data.b) if err != nil { From 47ab799c6fc4c8d695b32a294cdffd5c0d903249 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 17 Oct 2018 06:19:01 -0700 Subject: [PATCH 4/8] add file stats to state --- src/croc/croc.go | 7 +++- src/croc/recipient.go | 75 +++++++++++++++++++++---------------------- src/croc/sender.go | 43 ++++++++++++------------- 3 files changed, 64 insertions(+), 61 deletions(-) diff --git a/src/croc/croc.go b/src/croc/croc.go index 144115f7..db217b7d 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -5,8 +5,10 @@ import ( "time" "github.com/schollz/croc/src/logger" + "github.com/schollz/croc/src/models" "github.com/schollz/croc/src/relay" "github.com/schollz/croc/src/zipper" + "github.com/schollz/progressbar" ) func init() { @@ -51,7 +53,10 @@ type Croc struct { isLocal bool normalFinish bool - State string + // state variables + StateString string + Bar *progressbar.ProgressBar + FileInfo models.FileStats } // Init will initiate with the default parameters diff --git a/src/croc/recipient.go b/src/croc/recipient.go index 84b22071..66abddfc 100644 --- a/src/croc/recipient.go +++ b/src/croc/recipient.go @@ -45,7 +45,6 @@ func (cr *Croc) startRecipient(forceSend int, serverAddress string, tcpPorts []s } func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { - var fstats models.FileStats var sessionKey []byte var transferTime time.Duration var hash256 []byte @@ -172,16 +171,16 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, if err != nil { return err } - err = json.Unmarshal(decryptedFileData, &fstats) + err = json.Unmarshal(decryptedFileData, &cr.FileInfo) if err != nil { return err } - log.Debugf("got file stats: %+v", fstats) + log.Debugf("got file stats: %+v", cr.FileInfo) // determine if the file is resuming or not - progressFile = fmt.Sprintf("%s.progress", fstats.SentName) + progressFile = fmt.Sprintf("%s.progress", cr.FileInfo.SentName) overwritingOrReceiving := "Receiving" - if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { + if utils.Exists(cr.FileInfo.Name) || utils.Exists(cr.FileInfo.SentName) { overwritingOrReceiving = "Overwriting" if utils.Exists(progressFile) { overwritingOrReceiving = "Resume receiving" @@ -205,14 +204,14 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, // prompt user about the file fileOrFolder := "file" - if fstats.IsDir { + if cr.FileInfo.IsDir { fileOrFolder = "folder" } fmt.Fprintf(os.Stderr, "\r%s %s (%s) into: %s\n", overwritingOrReceiving, fileOrFolder, - humanize.Bytes(uint64(fstats.Size)), - fstats.Name, + humanize.Bytes(uint64(cr.FileInfo.Size)), + cr.FileInfo.Name, ) if !noPrompt { if "y" != utils.GetInput("ok? (y/N): ") { @@ -225,27 +224,27 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, // await file // erase file if overwriting if overwritingOrReceiving == "Overwriting" { - os.Remove(fstats.SentName) + os.Remove(cr.FileInfo.SentName) } var f *os.File - if utils.Exists(fstats.SentName) && resumeFile { + if utils.Exists(cr.FileInfo.SentName) && resumeFile { if !useWebsockets { - f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + f, err = os.OpenFile(cr.FileInfo.SentName, os.O_WRONLY, 0644) } else { - f, err = os.OpenFile(fstats.SentName, os.O_APPEND|os.O_WRONLY, 0644) + f, err = os.OpenFile(cr.FileInfo.SentName, os.O_APPEND|os.O_WRONLY, 0644) } if err != nil { log.Error(err) return err } } else { - f, err = os.Create(fstats.SentName) + f, err = os.Create(cr.FileInfo.SentName) if err != nil { log.Error(err) return err } if !useWebsockets { - if err = f.Truncate(fstats.Size); err != nil { + if err = f.Truncate(cr.FileInfo.Size); err != nil { log.Error(err) return err } @@ -262,13 +261,13 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, // start the ui for pgoress bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) - bar := progressbar.NewOptions( - int(fstats.Size), + cr.Bar = progressbar.NewOptions( + int(cr.FileInfo.Size), progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(fstats.Size)), + progressbar.OptionSetBytes(int(cr.FileInfo.Size)), progressbar.OptionSetWriter(os.Stderr), ) - bar.Add((len(blocks) * blockSize)) + cr.Bar.Add((len(blocks) * blockSize)) finished := make(chan bool) go func(finished chan bool, dataChan chan []byte) (err error) { @@ -288,7 +287,7 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, defer fProgress.Close() blocksWritten := 0.0 - blocksToWrite := float64(fstats.Size) + blocksToWrite := float64(cr.FileInfo.Size) if useWebsockets { blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) } else { @@ -304,7 +303,7 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, log.Error(err) return err } - decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) + decrypted, err := enc.Decrypt(sessionKey, !cr.FileInfo.IsEncrypted) if err != nil { log.Error(err) return err @@ -319,7 +318,7 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, } // do decompression - if fstats.IsCompressed && !fstats.IsDir { + if cr.FileInfo.IsCompressed && !cr.FileInfo.IsDir { decrypted = compress.Decompress(decrypted) } @@ -347,9 +346,9 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, bytesWritten += n blocksWritten += 1.0 // update the progress bar - bar.Add(n) - if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { - log.Debug("finished", int64(bytesWritten), fstats.Size, blocksWritten, blocksToWrite) + cr.Bar.Add(n) + if int64(bytesWritten) == cr.FileInfo.Size || blocksWritten >= blocksToWrite { + log.Debug("finished", int64(bytesWritten), cr.FileInfo.Size, blocksWritten, blocksToWrite) break } } @@ -432,10 +431,10 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, } // finish bar - bar.Finish() + cr.Bar.Finish() // check hash - hash256, err = utils.HashFile(fstats.SentName) + hash256, err = utils.HashFile(cr.FileInfo.SentName) if err != nil { log.Error(err) return err @@ -447,45 +446,45 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, log.Debugf("got hash: %x", message) if bytes.Equal(hash256, message) { // open directory - if fstats.IsDir { - err = zipper.UnzipFile(fstats.SentName, ".") + if cr.FileInfo.IsDir { + err = zipper.UnzipFile(cr.FileInfo.SentName, ".") if DebugLevel != "debug" { - os.Remove(fstats.SentName) + os.Remove(cr.FileInfo.SentName) } } else { err = nil } if err == nil { - if useStdout && !fstats.IsDir { + if useStdout && !cr.FileInfo.IsDir { var bFile []byte - bFile, err = ioutil.ReadFile(fstats.SentName) + bFile, err = ioutil.ReadFile(cr.FileInfo.SentName) if err != nil { return err } os.Stdout.Write(bFile) - os.Remove(fstats.SentName) + os.Remove(cr.FileInfo.SentName) } - transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() + transferRate := float64(cr.FileInfo.Size) / 1000000.0 / transferTime.Seconds() transferType := "MB/s" if transferRate < 1 { - transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() + transferRate = float64(cr.FileInfo.Size) / 1000.0 / transferTime.Seconds() transferType = "kB/s" } folderOrFile := "file" - if fstats.IsDir { + if cr.FileInfo.IsDir { folderOrFile = "folder" } if useStdout { - fstats.Name = "stdout" + cr.FileInfo.Name = "stdout" } - fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) + fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, cr.FileInfo.Name, transferRate, transferType) os.Remove(progressFile) } return err } else { if DebugLevel != "debug" { log.Debug("removing corrupted file") - os.Remove(fstats.SentName) + os.Remove(cr.FileInfo.SentName) } return errors.New("file corrupted") } diff --git a/src/croc/sender.go b/src/croc/sender.go index 8cce512e..9d334c4a 100644 --- a/src/croc/sender.go +++ b/src/croc/sender.go @@ -24,7 +24,7 @@ import ( "github.com/schollz/croc/src/utils" "github.com/schollz/croc/src/zipper" "github.com/schollz/pake" - "github.com/schollz/progressbar/v2" + "github.com/schollz/progressbar" "github.com/schollz/spinner" "github.com/tscholl2/siec" ) @@ -46,7 +46,6 @@ func (cr *Croc) startSender(forceSend int, serverAddress string, tcpPorts []stri func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { var f *os.File defer f.Close() // ignore the error if it wasn't opened :( - var fstats models.FileStats var fileHash []byte var otherIP string var startTransfer time.Time @@ -137,7 +136,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL fileReady <- err return } - fstats = models.FileStats{ + cr.FileInfo = models.FileStats{ Name: filename, Size: fstat.Size(), ModTime: fstat.ModTime(), @@ -146,15 +145,15 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL IsCompressed: useCompression, IsEncrypted: useEncryption, } - if fstats.IsDir { + if cr.FileInfo.IsDir { // zip the directory - fstats.SentName, err = zipper.ZipFile(fname, true) + cr.FileInfo.SentName, err = zipper.ZipFile(fname, true) if err != nil { log.Error(err) fileReady <- err return } - fname = fstats.SentName + fname = cr.FileInfo.SentName fstat, err := os.Stat(fname) if err != nil { @@ -162,7 +161,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL return } // get new size - fstats.Size = fstat.Size() + cr.FileInfo.Size = fstat.Size() } // open the file @@ -180,7 +179,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL c.WriteMessage(websocket.BinaryMessage, P.Bytes()) // start PAKE spinnner spin.Suffix = " performing PAKE..." - cr.State = "Performing PAKE..." + cr.StateString = "Performing PAKE..." spin.Start() case 2: // P recieves H(k),v from Q @@ -196,7 +195,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL // wait for readiness spin.Stop() spin.Suffix = " waiting for recipient ok..." - cr.State = "Waiting for recipient ok...." + cr.StateString = "Waiting for recipient ok...." spin.Start() case 3: log.Debugf("[%d] recipient declares readiness for file info", step) @@ -228,7 +227,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL if err != nil { return err } - fstatsBytes, err := json.Marshal(fstats) + fstatsBytes, err := json.Marshal(cr.FileInfo) if err != nil { return err } @@ -267,9 +266,9 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL // start loading the file into memory // start streaming encryption/compression - if fstats.IsDir { + if cr.FileInfo.IsDir { // remove file if zipped - defer os.Remove(fstats.SentName) + defer os.Remove(cr.FileInfo.SentName) } go func(dataChan chan DataChan) { var buffer []byte @@ -291,7 +290,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL // do compression var compressedBytes []byte - if useCompression && !fstats.IsDir { + if useCompression && !cr.FileInfo.IsDir { compressedBytes = compress.Compress(buffer[:bytesread]) } else { compressedBytes = buffer[:bytesread] @@ -366,13 +365,13 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL } else { blockSize = models.TCP_BUFFER_SIZE / 2 } - bar := progressbar.NewOptions( - int(fstats.Size), + cr.Bar = progressbar.NewOptions( + int(cr.FileInfo.Size), progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(fstats.Size)), + progressbar.OptionSetBytes(int(cr.FileInfo.Size)), progressbar.OptionSetWriter(os.Stderr), ) - bar.Add(blockSize * len(blocksToSkip)) + cr.Bar.Add(blockSize * len(blocksToSkip)) if useWebsockets { for { @@ -380,7 +379,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL if data.err != nil { return data.err } - bar.Add(data.bytesRead) + cr.Bar.Add(data.bytesRead) // write data to websockets err = c.WriteMessage(websocket.BinaryMessage, data.b) @@ -409,7 +408,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL log.Error(data.err) return } - bar.Add(data.bytesRead) + cr.Bar.Add(data.bytesRead) // write data to tcp connection _, err = tcpConnections[i].Write(data.b) if err != nil { @@ -427,7 +426,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL wg.Wait() } - bar.Finish() + cr.Bar.Finish() log.Debug("send hash to finish file") fileHash, err = utils.HashFile(fname) if err != nil { @@ -445,10 +444,10 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL log.Debugf("[%d] determing whether it went ok", step) if bytes.Equal(message, fileHash) { log.Debug("file transfered successfully") - transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() + transferRate := float64(cr.FileInfo.Size) / 1000000.0 / transferTime.Seconds() transferType := "MB/s" if transferRate < 1 { - transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() + transferRate = float64(cr.FileInfo.Size) / 1000.0 / transferTime.Seconds() transferType = "kB/s" } fmt.Fprintf(os.Stderr, "\nTransfer complete (%2.1f %s)", transferRate, transferType) From 6a899492f5a8a4fa7849fb990528a881abfe5588 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 17 Oct 2018 06:39:02 -0700 Subject: [PATCH 5/8] add state --- main.go | 4 +- src/cli/cli.go | 2 +- src/croc/croc.go | 2 +- src/win/main.go | 124 ++++++++++++++++++++++++++++++++++++----------- 4 files changed, 101 insertions(+), 31 deletions(-) diff --git a/main.go b/main.go index 0fbc03cd..b55bb36c 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main -import "github.com/schollz/croc/src/cli" +import ( + "github.com/schollz/croc/src/cli" +) var Version string diff --git a/src/cli/cli.go b/src/cli/cli.go index 8781bce1..3e1428ca 100644 --- a/src/cli/cli.go +++ b/src/cli/cli.go @@ -61,7 +61,7 @@ func Run() { }, } app.Flags = []cli.Flag{ - cli.StringFlag{Name: "addr", Value: "198.199.67.130", Usage: "address of the public relay"}, + cli.StringFlag{Name: "addr", Value: "croc4.schollz.com", Usage: "address of the public relay"}, cli.StringFlag{Name: "addr-ws", Value: "8153", Usage: "port of the public relay websocket server to connect"}, cli.StringFlag{Name: "addr-tcp", Value: "8154,8155,8156,8157,8158,8159,8160,8161", Usage: "tcp ports of the public relay server to connect"}, cli.BoolFlag{Name: "no-local", Usage: "disable local mode"}, diff --git a/src/croc/croc.go b/src/croc/croc.go index db217b7d..50c4e4a0 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -68,7 +68,7 @@ func Init(debug bool) (c *Croc) { c.RelayWebsocketPort = "8153" c.RelayTCPPorts = []string{"8154", "8155", "8156", "8157", "8158", "8159", "8160", "8161"} c.CurveType = "siec" - c.Address = "198.199.67.130" + c.Address = "croc4.schollz.com" c.AddressWebsocketPort = "8153" c.AddressTCPPorts = []string{"8154", "8155", "8156", "8157", "8158", "8159", "8160", "8161"} c.NoRecipientPrompt = true diff --git a/src/win/main.go b/src/win/main.go index 390b1b90..1c7e968a 100644 --- a/src/win/main.go +++ b/src/win/main.go @@ -8,6 +8,8 @@ import ( "time" "github.com/schollz/croc/src/cli" + "github.com/schollz/croc/src/croc" + "github.com/schollz/croc/src/utils" "github.com/therecipe/qt/core" "github.com/therecipe/qt/widgets" ) @@ -47,13 +49,12 @@ func main() { button := widgets.NewQPushButton2("Send file", nil) button.ConnectClicked(func(bool) { if isWorking { - var info = widgets.NewQMessageBox(nil) - info.SetWindowTitle("Info") - info.SetText(fmt.Sprintf("Can only do one send or recieve at a time")) - info.Exec() + dialog("Can only do one send or receive at a time") return } - isWorking = true + defer func() { + isWorking = false + }() var fileDialog = widgets.NewQFileDialog2(nil, "Open file to send...", "", "") fileDialog.SetAcceptMode(widgets.QFileDialog__AcceptOpen) @@ -62,32 +63,62 @@ func main() { return } var fn = fileDialog.SelectedFiles()[0] - fmt.Println(fn) - for i, label := range labels { - go func(i int, label *CustomLabel) { - var tick int - for range time.NewTicker(time.Duration((i+1)*25) * time.Millisecond).C { - tick++ - label.SetText(fmt.Sprintf("%v %v", tick, time.Now().UTC().Format("15:04:05.0000"))) - } - }(i, label) + if len(fn) == 0 { + dialog(fmt.Sprintf("No file selected")) + return } + + cr := croc.Init(false) + done := make(chan bool) + go func() { + cr.Send(fn, utils.GetRandomName()) + done <- true + }() + + for { + select { + case _ <- done: + break + } + if cr.FileInfo != nil { + labels[0].SetText(fmt.Sprintf("%s",cr.FileInfo.SentName)) + } + if cr.Bar != nil { + barState := cr.Bar.State() + labels[1].SetText(fmt.Sprintf("%2.1f",barState.CurrentPercent) + } + time.Sleep(100 * time.Millisecond) + } + + // for i, label := range labels { + // go func(i int, label *CustomLabel) { + // var tick int + // for range time.NewTicker(time.Duration((i+1)*25) * time.Millisecond).C { + // tick++ + // label.SetText(fmt.Sprintf("%v %v", tick, time.Now().UTC().Format("15:04:05.0000"))) + // } + // }(i, label) + // } }) widget.Layout().AddWidget(button) receiveButton := widgets.NewQPushButton2("Receive", nil) receiveButton.ConnectClicked(func(bool) { if isWorking { - var info = widgets.NewQMessageBox(nil) - info.SetWindowTitle("Info") - info.SetText(fmt.Sprintf("Can only do one send or recieve at a time")) - info.Exec() + dialog("Can only do one send or receive at a time") return } isWorking = true + defer func() { + isWorking = false + }() + var codePhrase = widgets.QInputDialog_GetText(nil, "Enter code phrase", "", widgets.QLineEdit__Normal, "", true, core.Qt__Dialog, core.Qt__ImhNone) - fmt.Println(codePhrase) + if len(codePhrase) < 3 { + dialog(fmt.Sprintf("Invalid codephrase: '%s'", codePhrase)) + return + } var folderDialog = widgets.NewQFileDialog2(nil, "Open folder to receive file...", "", "") folderDialog.SetAcceptMode(widgets.QFileDialog__AcceptOpen) folderDialog.SetFileMode(widgets.QFileDialog__DirectoryOnly) @@ -95,19 +126,56 @@ func main() { return } var fn = folderDialog.SelectedFiles()[0] - fmt.Println(fn) - for i, label := range labels { - go func(i int, label *CustomLabel) { - var tick int - for range time.NewTicker(time.Duration((i+1)*25) * time.Millisecond).C { - tick++ - label.SetText(fmt.Sprintf("%v %v", tick, time.Now().UTC().Format("15:04:05.0000"))) - } - }(i, label) + if len(fn) == 0 { + dialog(fmt.Sprintf("No folder selected")) + return } + cwd, _ := os.Getwd() + os.Chdir(fn) + defer os.Chdir(cwd) + + cr := croc.Init(false) + done := make(chan bool) + go func() { + cr.Receive(codephrase) + done <- true + }() + + for { + select { + case _ <- done: + break + } + labels[0].SetText(cr.StateString) + if cr.FileInfo != nil { + labels[0].SetText(fmt.Sprintf("%s",cr.FileInfo.SentName)) + } + if cr.Bar != nil { + barState := cr.Bar.State() + labels[1].SetText(fmt.Sprintf("%2.1f",barState.CurrentPercent) + } + time.Sleep(100 * time.Millisecond) + } + + // for i, label := range labels { + // go func(i int, label *CustomLabel) { + // var tick int + // for range time.NewTicker(time.Duration((i+1)*25) * time.Millisecond).C { + // tick++ + // label.SetText(fmt.Sprintf("%v %v", tick, time.Now().UTC().Format("15:04:05.0000"))) + // } + // }(i, label) + // } }) widget.Layout().AddWidget(receiveButton) window.Show() app.Exec() } + +func dialog(s string) { + var info = widgets.NewQMessageBox(nil) + info.SetWindowTitle("Info") + info.SetText(s) + info.Exec() +} From 157ab169aa760916d922bfb57bfb9b099faf19a6 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 17 Oct 2018 07:01:44 -0700 Subject: [PATCH 6/8] sending is working for wincroc --- src/win/main.go | 50 +++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/src/win/main.go b/src/win/main.go index 1c7e968a..24f03e0d 100644 --- a/src/win/main.go +++ b/src/win/main.go @@ -5,6 +5,7 @@ package main import ( "fmt" "os" + "path/filepath" "time" "github.com/schollz/croc/src/cli" @@ -30,7 +31,7 @@ func main() { app := widgets.NewQApplication(len(os.Args), os.Args) window := widgets.NewQMainWindow(nil, 0) - window.SetFixedSize2(250, 200) + window.SetFixedSize2(300, 200) window.SetWindowTitle("croc - secure data transfer") widget := widgets.NewQWidget(nil, 0) @@ -71,24 +72,33 @@ func main() { cr := croc.Init(false) done := make(chan bool) go func() { - cr.Send(fn, utils.GetRandomName()) + codePhrase := utils.GetRandomName() + _, fname := filepath.Split(fn) + labels[0].SetText(fmt.Sprintf("Sending '%'", fname)) + labels[1].SetText(fmt.Sprintf("Code phrase: %s", codePhrase)) + cr.Send(fn, codePhrase) done <- true }() - for { - select { - case _ <- done: - break + go func() { + for { + fmt.Println(cr.FileInfo, cr.Bar) + if cr.FileInfo.SentName != "" { + labels[0].UpdateTextFromGoroutine(fmt.Sprintf("%s", cr.FileInfo.SentName)) + } + if cr.Bar != nil { + barState := cr.Bar.State() + labels[1].UpdateTextFromGoroutine(fmt.Sprintf("%2.1f", barState.CurrentPercent)) + } + time.Sleep(100 * time.Millisecond) + select { + case _ = <-done: + break + default: + continue + } } - if cr.FileInfo != nil { - labels[0].SetText(fmt.Sprintf("%s",cr.FileInfo.SentName)) - } - if cr.Bar != nil { - barState := cr.Bar.State() - labels[1].SetText(fmt.Sprintf("%2.1f",barState.CurrentPercent) - } - time.Sleep(100 * time.Millisecond) - } + }() // for i, label := range labels { // go func(i int, label *CustomLabel) { @@ -137,22 +147,22 @@ func main() { cr := croc.Init(false) done := make(chan bool) go func() { - cr.Receive(codephrase) + cr.Receive(codePhrase) done <- true }() for { select { - case _ <- done: + case _ = <-done: break } labels[0].SetText(cr.StateString) - if cr.FileInfo != nil { - labels[0].SetText(fmt.Sprintf("%s",cr.FileInfo.SentName)) + if cr.FileInfo.SentName != "" { + labels[0].SetText(fmt.Sprintf("%s", cr.FileInfo.SentName)) } if cr.Bar != nil { barState := cr.Bar.State() - labels[1].SetText(fmt.Sprintf("%2.1f",barState.CurrentPercent) + labels[1].SetText(fmt.Sprintf("%2.1f", barState.CurrentPercent)) } time.Sleep(100 * time.Millisecond) } From 93942f4e0cb4eed8124736b3229fad3fcfa48973 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 17 Oct 2018 07:38:21 -0700 Subject: [PATCH 7/8] give stop signals to servers --- src/croc/recipient.go | 4 +-- src/croc/sender.go | 4 +++ src/croc/sending.go | 5 ++++ src/relay/hub.go | 4 +++ src/relay/relay.go | 27 ++++++++++++++++-- src/win/main.go | 65 +++++++++++++++++++++++-------------------- 6 files changed, 74 insertions(+), 35 deletions(-) diff --git a/src/croc/recipient.go b/src/croc/recipient.go index 66abddfc..e3c0d08e 100644 --- a/src/croc/recipient.go +++ b/src/croc/recipient.go @@ -13,9 +13,8 @@ import ( "sync" "time" - humanize "github.com/dustin/go-humanize" - log "github.com/cihub/seelog" + humanize "github.com/dustin/go-humanize" "github.com/gorilla/websocket" "github.com/schollz/croc/src/comm" "github.com/schollz/croc/src/compress" @@ -73,6 +72,7 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, spin.Writer = os.Stderr spin.Suffix = " performing PAKE..." spin.Start() + defer spin.Stop() // pick an elliptic curve curve := siec.SIEC255() diff --git a/src/croc/sender.go b/src/croc/sender.go index 9d334c4a..9cd2d775 100644 --- a/src/croc/sender.go +++ b/src/croc/sender.go @@ -38,6 +38,9 @@ func (cr *Croc) startSender(forceSend int, serverAddress string, tcpPorts []stri if !strings.HasPrefix(err.Error(), "websocket: close 100") { fmt.Fprintf(os.Stderr, "\n"+err.Error()) } + cr.StateString = err.Error() + } else { + cr.StateString = "File transfer completed." } done <- struct{}{} @@ -89,6 +92,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL // start a spinner spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) spin.Writer = os.Stderr + defer spin.Stop() // pick an elliptic curve curve := siec.SIEC255() diff --git a/src/croc/sending.go b/src/croc/sending.go index fbec7834..74d972e2 100644 --- a/src/croc/sending.go +++ b/src/croc/sending.go @@ -18,6 +18,7 @@ import ( // Send the file func (c *Croc) Send(fname, codephrase string) (err error) { + defer log.Flush() log.Debugf("sending %s", fname) errChan := make(chan error) @@ -36,6 +37,10 @@ func (c *Croc) Send(fname, codephrase string) (err error) { // use local relay if !c.NoLocal { + defer func() { + log.Debug("sending relay stop signal") + relay.Stop() + }() go func() { // start own relay and connect to it go relay.Run(c.RelayWebsocketPort, c.RelayTCPPorts) diff --git a/src/relay/hub.go b/src/relay/hub.go index 86d2aca9..bbb11b37 100644 --- a/src/relay/hub.go +++ b/src/relay/hub.go @@ -47,6 +47,10 @@ var h = hub{ func (h *hub) run() { for { + if stop { + log.Debug("stopping hub") + return + } select { case s := <-h.register: log.Debugf("adding connection to %s", s.room) diff --git a/src/relay/relay.go b/src/relay/relay.go index e713e446..951a1ea5 100644 --- a/src/relay/relay.go +++ b/src/relay/relay.go @@ -1,8 +1,10 @@ package relay import ( + "context" "fmt" "net/http" + "time" log "github.com/cihub/seelog" "github.com/schollz/croc/src/logger" @@ -10,6 +12,12 @@ import ( ) var DebugLevel string +var stop bool + +func Stop() { + log.Debug("got stop signal") + stop = true +} // Run is the async operation for running a server func Run(port string, tcpPorts []string) (err error) { @@ -23,12 +31,25 @@ func Run(port string, tcpPorts []string) (err error) { go h.run() log.Debug("running relay on " + port) - http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + m := http.NewServeMux() + s := http.Server{Addr: ":" + port, Handler: m} + m.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { serveWs(w, r) }) - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "ok") }) - err = http.ListenAndServe(":"+port, nil) + go func() { + for { + if stop { + s.Shutdown(context.Background()) + log.Debug("stopping http server") + return + } + time.Sleep(10 * time.Millisecond) + } + }() + s.ListenAndServe() + log.Debug("finished") return } diff --git a/src/win/main.go b/src/win/main.go index 24f03e0d..f7dc9e04 100644 --- a/src/win/main.go +++ b/src/win/main.go @@ -32,7 +32,7 @@ func main() { window := widgets.NewQMainWindow(nil, 0) window.SetFixedSize2(300, 200) - window.SetWindowTitle("croc - secure data transfer") + window.SetWindowTitle("🐊📦 croc") widget := widgets.NewQWidget(nil, 0) widget.SetLayout(widgets.NewQVBoxLayout()) @@ -45,7 +45,8 @@ func main() { widget.Layout().AddWidget(label) labels[i] = label } - labels[0].SetText("Click 'Send' or 'Receive' to start") + labels[0].SetText("secure data transfer") + labels[1].SetText("Click 'Send' or 'Receive' to start") button := widgets.NewQPushButton2("Send file", nil) button.ConnectClicked(func(bool) { @@ -53,51 +54,55 @@ func main() { dialog("Can only do one send or receive at a time") return } - defer func() { - isWorking = false - }() + isWorking = true var fileDialog = widgets.NewQFileDialog2(nil, "Open file to send...", "", "") fileDialog.SetAcceptMode(widgets.QFileDialog__AcceptOpen) fileDialog.SetFileMode(widgets.QFileDialog__AnyFile) if fileDialog.Exec() != int(widgets.QDialog__Accepted) { + isWorking = false return } var fn = fileDialog.SelectedFiles()[0] if len(fn) == 0 { dialog(fmt.Sprintf("No file selected")) + isWorking = false return } - cr := croc.Init(false) - done := make(chan bool) go func() { + cr := croc.Init(false) + done := make(chan bool) codePhrase := utils.GetRandomName() _, fname := filepath.Split(fn) - labels[0].SetText(fmt.Sprintf("Sending '%'", fname)) - labels[1].SetText(fmt.Sprintf("Code phrase: %s", codePhrase)) + labels[0].UpdateTextFromGoroutine(fmt.Sprintf("Sending '%s'", fname)) + labels[1].UpdateTextFromGoroutine(fmt.Sprintf("Code phrase: %s", codePhrase)) + + go func(done chan bool) { + for { + fmt.Println(cr.FileInfo, cr.Bar) + if cr.FileInfo.SentName != "" { + labels[0].UpdateTextFromGoroutine(fmt.Sprintf("Sending %s", cr.FileInfo.SentName)) + } + if cr.Bar != nil { + barState := cr.Bar.State() + labels[1].UpdateTextFromGoroutine(fmt.Sprintf("%2.1f%% [%2.0f:%2.0f]", barState.CurrentPercent*100, barState.SecondsSince, barState.SecondsLeft)) + } + labels[2].UpdateTextFromGoroutine(cr.StateString) + time.Sleep(100 * time.Millisecond) + select { + case _ = <-done: + labels[2].UpdateTextFromGoroutine(cr.StateString) + return + default: + continue + } + } + }(done) + cr.Send(fn, codePhrase) done <- true - }() - - go func() { - for { - fmt.Println(cr.FileInfo, cr.Bar) - if cr.FileInfo.SentName != "" { - labels[0].UpdateTextFromGoroutine(fmt.Sprintf("%s", cr.FileInfo.SentName)) - } - if cr.Bar != nil { - barState := cr.Bar.State() - labels[1].UpdateTextFromGoroutine(fmt.Sprintf("%2.1f", barState.CurrentPercent)) - } - time.Sleep(100 * time.Millisecond) - select { - case _ = <-done: - break - default: - continue - } - } + isWorking = false }() // for i, label := range labels { @@ -144,7 +149,7 @@ func main() { os.Chdir(fn) defer os.Chdir(cwd) - cr := croc.Init(false) + cr := croc.Init(true) done := make(chan bool) go func() { cr.Receive(codePhrase) From 56e00681292c40a335a87dbc2fe2becb48aafe1f Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 17 Oct 2018 07:41:13 -0700 Subject: [PATCH 8/8] send file works --- src/croc/sender.go | 2 +- src/win/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/croc/sender.go b/src/croc/sender.go index 9cd2d775..11291d35 100644 --- a/src/croc/sender.go +++ b/src/croc/sender.go @@ -358,7 +358,7 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL if !bytes.HasPrefix(message, []byte("ready")) { return errors.New("recipient refused file") } - + cr.StateString = "Transfer in progress..." fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) // send file, compure hash simultaneously startTransfer = time.Now() diff --git a/src/win/main.go b/src/win/main.go index f7dc9e04..43e65cfe 100644 --- a/src/win/main.go +++ b/src/win/main.go @@ -31,7 +31,7 @@ func main() { app := widgets.NewQApplication(len(os.Args), os.Args) window := widgets.NewQMainWindow(nil, 0) - window.SetFixedSize2(300, 200) + window.SetFixedSize2(400, 150) window.SetWindowTitle("🐊📦 croc") widget := widgets.NewQWidget(nil, 0)