diff --git a/src/croc/croc.go b/src/croc/croc.go index 4bdace4b..2b9c8a57 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -5,9 +5,7 @@ import ( "time" "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/recipient" "github.com/schollz/croc/src/relay" - "github.com/schollz/croc/src/sender" "github.com/schollz/croc/src/zipper" ) @@ -78,8 +76,6 @@ func Init(debug bool) (c *Croc) { func SetDebugLevel(debugLevel string) { logger.SetLogLevel(debugLevel) - sender.DebugLevel = debugLevel - recipient.DebugLevel = debugLevel relay.DebugLevel = debugLevel zipper.DebugLevel = debugLevel } diff --git a/src/croc/sending.go b/src/croc/sending.go index 98f9632a..fbec7834 100644 --- a/src/croc/sending.go +++ b/src/croc/sending.go @@ -11,9 +11,7 @@ import ( log "github.com/cihub/seelog" "github.com/gorilla/websocket" - "github.com/schollz/croc/src/recipient" "github.com/schollz/croc/src/relay" - "github.com/schollz/croc/src/sender" "github.com/schollz/peerdiscovery" "github.com/schollz/utils" ) @@ -161,9 +159,9 @@ func (c *Croc) sendReceive(address, websocketPort string, tcpPorts []string, fna } if isSender { - go sender.Send(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) + go c.startSender(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) } else { - go recipient.Receive(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) + go c.startRecipient(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) } for { diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go deleted file mode 100644 index ed0a6427..00000000 --- a/src/recipient/recipient.go +++ /dev/null @@ -1,527 +0,0 @@ -package recipient - -import ( - "bufio" - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net" - "os" - "strconv" - "strings" - "sync" - "time" - - humanize "github.com/dustin/go-humanize" - - log "github.com/cihub/seelog" - "github.com/gorilla/websocket" - "github.com/schollz/croc/src/comm" - "github.com/schollz/croc/src/compress" - "github.com/schollz/croc/src/crypt" - "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/models" - "github.com/schollz/croc/src/utils" - "github.com/schollz/croc/src/zipper" - "github.com/schollz/pake" - "github.com/schollz/progressbar/v2" - "github.com/schollz/spinner" - "github.com/tscholl2/siec" -) - -var DebugLevel string - -// Receive is the async operation to receive a file -func Receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { - logger.SetLogLevel(DebugLevel) - err := receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout) - if err != nil { - if !strings.HasPrefix(err.Error(), "websocket: close 100") { - fmt.Fprintf(os.Stderr, "\n"+err.Error()) - } - } - done <- struct{}{} -} - -func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { - var fstats models.FileStats - var sessionKey []byte - var transferTime time.Duration - var hash256 []byte - var otherIP string - var progressFile string - var resumeFile bool - var tcpConnections []comm.Comm - dataChan := make(chan []byte, 1024*1024) - isConnectedIfUsingTCP := make(chan bool) - blocks := []string{} - - useWebsockets := true - switch forceSend { - case 0: - if !isLocal { - useWebsockets = false - } - case 1: - useWebsockets = true - case 2: - useWebsockets = false - } - - // start a spinner - spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) - spin.Writer = os.Stderr - spin.Suffix = " performing PAKE..." - spin.Start() - - // pick an elliptic curve - curve := siec.SIEC255() - // both parties should have a weak key - pw := []byte(codephrase) - - // initialize recipient Q ("1" indicates recipient) - Q, err := pake.Init(pw, 1, curve, 1*time.Millisecond) - if err != nil { - return - } - - step := 0 - for { - messageType, message, err := c.ReadMessage() - if err != nil { - return err - } - if messageType == websocket.PongMessage || messageType == websocket.PingMessage { - continue - } - if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { - return errors.New("\rinterrupted by other party") - } - - log.Debugf("got %d: %s", messageType, message) - switch step { - case 0: - // sender has initiated, sends their ip address - otherIP = string(message) - log.Debugf("sender IP: %s", otherIP) - - // recipient begins by sending address - ip := "" - if isLocal { - ip = utils.LocalIP() - } else { - ip, _ = utils.PublicIP() - } - c.WriteMessage(websocket.BinaryMessage, []byte(ip)) - case 1: - - // Q receives u - log.Debugf("[%d] Q computes k, sends H(k), v back to P", step) - if err := Q.Update(message); err != nil { - return err - } - - // Q has the session key now, but we will still check if its valid - sessionKey, err = Q.SessionKey() - if err != nil { - return err - } - log.Debugf("%x\n", sessionKey) - - // initialize TCP connections if using (possible, but unlikely, race condition) - go func() { - if !useWebsockets { - log.Debugf("connecting to server") - tcpConnections = make([]comm.Comm, len(tcpPorts)) - for i, tcpPort := range tcpPorts { - log.Debugf("connecting to %d", i) - tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) - if err != nil { - log.Error(err) - } - } - log.Debugf("fully connected") - } - isConnectedIfUsingTCP <- true - }() - - c.WriteMessage(websocket.BinaryMessage, Q.Bytes()) - case 2: - log.Debugf("[%d] Q recieves H(k) from P", step) - // check if everything is still kosher with our computed session key - if err := Q.Update(message); err != nil { - return err - } - c.WriteMessage(websocket.BinaryMessage, []byte("ready")) - case 3: - spin.Stop() - - // unmarshal the file info - log.Debugf("[%d] recieve file info", step) - // do decryption on the file stats - enc, err := crypt.FromBytes(message) - if err != nil { - return err - } - decryptedFileData, err := enc.Decrypt(sessionKey) - if err != nil { - return err - } - err = json.Unmarshal(decryptedFileData, &fstats) - if err != nil { - return err - } - log.Debugf("got file stats: %+v", fstats) - - // determine if the file is resuming or not - progressFile = fmt.Sprintf("%s.progress", fstats.SentName) - overwritingOrReceiving := "Receiving" - if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { - overwritingOrReceiving = "Overwriting" - if utils.Exists(progressFile) { - overwritingOrReceiving = "Resume receiving" - resumeFile = true - } - } - - // send blocks - if resumeFile { - fileWithBlocks, _ := os.Open(progressFile) - scanner := bufio.NewScanner(fileWithBlocks) - for scanner.Scan() { - blocks = append(blocks, strings.TrimSpace(scanner.Text())) - } - fileWithBlocks.Close() - } - blocksBytes, _ := json.Marshal(blocks) - // encrypt the block data and send - encblockBytes := crypt.Encrypt(blocksBytes, sessionKey) - c.WriteMessage(websocket.BinaryMessage, encblockBytes.Bytes()) - - // prompt user about the file - fileOrFolder := "file" - if fstats.IsDir { - fileOrFolder = "folder" - } - fmt.Fprintf(os.Stderr, "\r%s %s (%s) into: %s\n", - overwritingOrReceiving, - fileOrFolder, - humanize.Bytes(uint64(fstats.Size)), - fstats.Name, - ) - if !noPrompt { - if "y" != utils.GetInput("ok? (y/N): ") { - fmt.Fprintf(os.Stderr, "cancelling request") - c.WriteMessage(websocket.BinaryMessage, []byte("no")) - return nil - } - } - - // await file - // erase file if overwriting - if overwritingOrReceiving == "Overwriting" { - os.Remove(fstats.SentName) - } - var f *os.File - if utils.Exists(fstats.SentName) && resumeFile { - if !useWebsockets { - f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) - } else { - f, err = os.OpenFile(fstats.SentName, os.O_APPEND|os.O_WRONLY, 0644) - } - if err != nil { - log.Error(err) - return err - } - } else { - f, err = os.Create(fstats.SentName) - if err != nil { - log.Error(err) - return err - } - if !useWebsockets { - if err = f.Truncate(fstats.Size); err != nil { - log.Error(err) - return err - } - } - } - - blockSize := 0 - if useWebsockets { - blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 - } else { - blockSize = models.TCP_BUFFER_SIZE / 2 - } - - // start the ui for pgoress - bytesWritten := 0 - fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) - bar := progressbar.NewOptions( - int(fstats.Size), - progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(fstats.Size)), - progressbar.OptionSetWriter(os.Stderr), - ) - bar.Add((len(blocks) * blockSize)) - finished := make(chan bool) - - go func(finished chan bool, dataChan chan []byte) (err error) { - // remove previous progress - var fProgress *os.File - var progressErr error - if resumeFile { - fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND|os.O_WRONLY, 0644) - bytesWritten = len(blocks) * blockSize - } else { - os.Remove(progressFile) - fProgress, progressErr = os.Create(progressFile) - } - if progressErr != nil { - panic(progressErr) - } - defer fProgress.Close() - - blocksWritten := 0.0 - blocksToWrite := float64(fstats.Size) - if useWebsockets { - blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) - } else { - blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) - } - for { - message := <-dataChan - // do decryption - var enc crypt.Encryption - err = json.Unmarshal(message, &enc) - if err != nil { - // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) - log.Error(err) - return err - } - decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) - if err != nil { - log.Error(err) - return err - } - - // get location if TCP - var locationToWrite int - if !useWebsockets { - pieces := bytes.SplitN(decrypted, []byte("-"), 2) - decrypted = pieces[1] - locationToWrite, _ = strconv.Atoi(string(pieces[0])) - } - - // do decompression - if fstats.IsCompressed && !fstats.IsDir { - decrypted = compress.Decompress(decrypted) - } - - var n int - if !useWebsockets { - if err != nil { - log.Error(err) - return err - } - n, err = f.WriteAt(decrypted, int64(locationToWrite)) - fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) - log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, locationToWrite, blocksWritten, blocksToWrite) - } else { - // write to file - n, err = f.Write(decrypted) - log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, bytesWritten, blocksWritten, blocksToWrite) - fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten)) - } - if err != nil { - log.Error(err) - return err - } - - // update the bytes written - bytesWritten += n - blocksWritten += 1.0 - // update the progress bar - bar.Add(n) - if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { - log.Debug("finished", int64(bytesWritten), fstats.Size, blocksWritten, blocksToWrite) - break - } - } - finished <- true - return - }(finished, dataChan) - - log.Debug("telling sender i'm ready") - c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) - - startTime := time.Now() - if useWebsockets { - for { - var messageType int - // read from websockets - messageType, message, err = c.ReadMessage() - if messageType != websocket.BinaryMessage { - continue - } - if err != nil { - log.Error(err) - return err - } - if bytes.Equal(message, []byte("magic")) { - log.Debug("got magic") - break - } - dataChan <- message - // select { - // case dataChan <- message: - // default: - // log.Debug("blocked") - // // no message sent - // // block - // dataChan <- message - // } - } - } else { - _ = <-isConnectedIfUsingTCP - log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) - // using TCP - var wg sync.WaitGroup - wg.Add(len(tcpConnections)) - for i := range tcpConnections { - defer func(i int) { - log.Debugf("closing connection %d", i) - tcpConnections[i].Close() - }(i) - go func(wg *sync.WaitGroup, j int) { - defer wg.Done() - for { - log.Debugf("waiting to read on %d", j) - // read from TCP connection - message, _, _, err := tcpConnections[j].Read() - // log.Debugf("message: %s", message) - if err != nil { - panic(err) - } - if bytes.Equal(message, []byte("magic")) { - log.Debugf("%d got magic, leaving", j) - return - } - dataChan <- message - } - }(&wg, i) - } - wg.Wait() - } - - _ = <-finished - log.Debug("telling sender i'm done") - c.WriteMessage(websocket.BinaryMessage, []byte("done")) - // we are finished - transferTime = time.Since(startTime) - - // close file - err = f.Close() - if err != nil { - return err - } - - // finish bar - bar.Finish() - - // check hash - hash256, err = utils.HashFile(fstats.SentName) - if err != nil { - log.Error(err) - return err - } - // tell the sender the hash so they can quit - c.WriteMessage(websocket.BinaryMessage, append([]byte("hash:"), hash256...)) - case 4: - // receive the hash from the sender so we can check it and quit - log.Debugf("got hash: %x", message) - if bytes.Equal(hash256, message) { - // open directory - if fstats.IsDir { - err = zipper.UnzipFile(fstats.SentName, ".") - if DebugLevel != "debug" { - os.Remove(fstats.SentName) - } - } else { - err = nil - } - if err == nil { - if useStdout && !fstats.IsDir { - var bFile []byte - bFile, err = ioutil.ReadFile(fstats.SentName) - if err != nil { - return err - } - os.Stdout.Write(bFile) - os.Remove(fstats.SentName) - } - transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() - transferType := "MB/s" - if transferRate < 1 { - transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() - transferType = "kB/s" - } - folderOrFile := "file" - if fstats.IsDir { - folderOrFile = "folder" - } - if useStdout { - fstats.Name = "stdout" - } - fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) - os.Remove(progressFile) - } - return err - } else { - if DebugLevel != "debug" { - log.Debug("removing corrupted file") - os.Remove(fstats.SentName) - } - return errors.New("file corrupted") - } - default: - return fmt.Errorf("unknown step") - } - step++ - } -} - -func connectToTCPServer(room string, address string) (com comm.Comm, err error) { - log.Debugf("recipient connecting to %s", address) - connection, err := net.Dial("tcp", address) - if err != nil { - return - } - connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) - connection.SetDeadline(time.Now().Add(3 * time.Hour)) - connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) - - com = comm.New(connection) - log.Debug("waiting for server contact") - ok, err := com.Receive() - if err != nil { - return - } - log.Debugf("[%s] server says: %s", address, ok) - - err = com.Send(room) - if err != nil { - return - } - ok, err = com.Receive() - log.Debugf("[%s] server says: %s", address, ok) - if err != nil { - return - } - if ok != "recipient" { - err = errors.New(ok) - } - return -} diff --git a/src/sender/sender.go b/src/sender/sender.go deleted file mode 100644 index 7e10af8a..00000000 --- a/src/sender/sender.go +++ /dev/null @@ -1,491 +0,0 @@ -package sender - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net" - "os" - "path/filepath" - "strconv" - "strings" - "sync" - "time" - - log "github.com/cihub/seelog" - "github.com/gorilla/websocket" - "github.com/pkg/errors" - "github.com/schollz/croc/src/comm" - "github.com/schollz/croc/src/compress" - "github.com/schollz/croc/src/crypt" - "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/models" - "github.com/schollz/croc/src/utils" - "github.com/schollz/croc/src/zipper" - "github.com/schollz/pake" - "github.com/schollz/progressbar/v2" - "github.com/schollz/spinner" - "github.com/tscholl2/siec" -) - -var DebugLevel string - -// Send is the async call to send data -func Send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { - logger.SetLogLevel(DebugLevel) - log.Debugf("sending %s", fname) - err := send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption) - if err != nil { - if !strings.HasPrefix(err.Error(), "websocket: close 100") { - fmt.Fprintf(os.Stderr, "\n"+err.Error()) - } - } - - done <- struct{}{} -} - -func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { - var f *os.File - defer f.Close() // ignore the error if it wasn't opened :( - var fstats models.FileStats - var fileHash []byte - var otherIP string - var startTransfer time.Time - var tcpConnections []comm.Comm - blocksToSkip := make(map[int64]struct{}) - isConnectedIfUsingTCP := make(chan bool) - - type DataChan struct { - b []byte - currentPostition int64 - bytesRead int - err error - } - dataChan := make(chan DataChan, 1024*1024) - defer close(dataChan) - - useWebsockets := true - switch forceSend { - case 0: - if !isLocal { - useWebsockets = false - } - case 1: - useWebsockets = true - case 2: - useWebsockets = false - } - - fileReady := make(chan error) - - // normalize the file name - fname, err = filepath.Abs(fname) - if err != nil { - return err - } - _, filename := filepath.Split(fname) - - // get ready to generate session key - var sessionKey []byte - - // start a spinner - spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) - spin.Writer = os.Stderr - - // pick an elliptic curve - curve := siec.SIEC255() - // both parties should have a weak key - pw := []byte(codephrase) - // initialize sender P ("0" indicates sender) - P, err := pake.Init(pw, 0, curve, 1*time.Millisecond) - if err != nil { - return - } - - step := 0 - for { - messageType, message, errRead := c.ReadMessage() - if errRead != nil { - return errRead - } - if messageType == websocket.PongMessage || messageType == websocket.PingMessage { - continue - } - if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { - return errors.New("\rinterrupted by other party") - } - log.Debugf("got %d: %s", messageType, message) - switch step { - case 0: - // sender initiates communication - ip := "" - if isLocal { - ip = utils.LocalIP() - } else { - ip, _ = utils.PublicIP() - } - // send my IP address - c.WriteMessage(websocket.BinaryMessage, []byte(ip)) - case 1: - // first receive the IP address from the sender - otherIP = string(message) - log.Debugf("recipient IP: %s", otherIP) - - go func() { - // recipient might want file! start gathering information about file - fstat, err := os.Stat(fname) - if err != nil { - fileReady <- err - return - } - fstats = models.FileStats{ - Name: filename, - Size: fstat.Size(), - ModTime: fstat.ModTime(), - IsDir: fstat.IsDir(), - SentName: fstat.Name(), - IsCompressed: useCompression, - IsEncrypted: useEncryption, - } - if fstats.IsDir { - // zip the directory - fstats.SentName, err = zipper.ZipFile(fname, true) - if err != nil { - log.Error(err) - fileReady <- err - return - } - fname = fstats.SentName - - fstat, err := os.Stat(fname) - if err != nil { - fileReady <- err - return - } - // get new size - fstats.Size = fstat.Size() - } - - // open the file - f, err = os.Open(fname) - if err != nil { - fileReady <- err - return - } - fileReady <- nil - - }() - - // send pake data - log.Debugf("[%d] first, P sends u to Q", step) - c.WriteMessage(websocket.BinaryMessage, P.Bytes()) - // start PAKE spinnner - spin.Suffix = " performing PAKE..." - spin.Start() - case 2: - // P recieves H(k),v from Q - log.Debugf("[%d] P computes k, H(k), sends H(k) to Q", step) - if err := P.Update(message); err != nil { - return err - } - c.WriteMessage(websocket.BinaryMessage, P.Bytes()) - sessionKey, _ = P.SessionKey() - // check(err) - log.Debugf("%x\n", sessionKey) - - // wait for readiness - spin.Stop() - spin.Suffix = " waiting for recipient ok..." - spin.Start() - case 3: - log.Debugf("[%d] recipient declares readiness for file info", step) - if !bytes.HasPrefix(message, []byte("ready")) { - return errors.New("recipient refused file") - } - - // connect to TCP in background - tcpConnections = make([]comm.Comm, len(tcpPorts)) - go func() { - if !useWebsockets { - log.Debugf("connecting to server") - for i, tcpPort := range tcpPorts { - log.Debugf("connecting to %s on connection %d", tcpPort, i) - tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) - if err != nil { - log.Error(err) - } - } - } - isConnectedIfUsingTCP <- true - }() - - err = <-fileReady // block until file is ready - if err != nil { - return err - } - fstatsBytes, err := json.Marshal(fstats) - if err != nil { - return err - } - - // encrypt the file meta data - enc := crypt.Encrypt(fstatsBytes, sessionKey) - // send the file meta data - c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) - case 4: - log.Debugf("[%d] recipient declares gives blocks", step) - // recipient sends blocks, and sender does not send anything back - // determine if any blocks were sent to skip - enc, err := crypt.FromBytes(message) - if err != nil { - log.Error(err) - return err - } - decrypted, err := enc.Decrypt(sessionKey) - if err != nil { - err = errors.Wrap(err, "could not decrypt blocks with session key") - log.Error(err) - return err - } - - var blocks []string - errBlocks := json.Unmarshal(decrypted, &blocks) - if errBlocks == nil { - for _, block := range blocks { - blockInt64, errBlock := strconv.Atoi(block) - if errBlock == nil { - blocksToSkip[int64(blockInt64)] = struct{}{} - } - } - } - log.Debugf("found blocks: %+v", blocksToSkip) - - // start loading the file into memory - // start streaming encryption/compression - if fstats.IsDir { - // remove file if zipped - defer os.Remove(fstats.SentName) - } - go func(dataChan chan DataChan) { - var buffer []byte - if useWebsockets { - buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) - } else { - buffer = make([]byte, models.TCP_BUFFER_SIZE/2) - } - - currentPostition := int64(0) - for { - bytesread, err := f.Read(buffer) - if bytesread > 0 { - if _, ok := blocksToSkip[currentPostition]; ok { - log.Debugf("skipping the sending of block %d", currentPostition) - currentPostition += int64(bytesread) - continue - } - - // do compression - var compressedBytes []byte - if useCompression && !fstats.IsDir { - compressedBytes = compress.Compress(buffer[:bytesread]) - } else { - compressedBytes = buffer[:bytesread] - } - - // if using TCP, prepend the location to write the data to in the resulting file - if !useWebsockets { - compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) - } - - // do encryption - enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) - encBytes, err := json.Marshal(enc) - if err != nil { - dataChan <- DataChan{ - b: nil, - bytesRead: 0, - err: err, - } - return - } - - dataChan <- DataChan{ - b: encBytes, - bytesRead: bytesread, - err: nil, - } - currentPostition += int64(bytesread) - } - if err != nil { - if err != io.EOF { - log.Error(err) - } - break - } - } - // finish - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - if !useWebsockets { - log.Debug("sending extra magic to %d others", len(tcpPorts)-1) - for i := 0; i < len(tcpPorts)-1; i++ { - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - } - } - }(dataChan) - - case 5: - spin.Stop() - - log.Debugf("[%d] recipient declares readiness for file data", step) - if !bytes.HasPrefix(message, []byte("ready")) { - return errors.New("recipient refused file") - } - - fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) - // send file, compure hash simultaneously - startTransfer = time.Now() - - blockSize := 0 - if useWebsockets { - blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 - } else { - blockSize = models.TCP_BUFFER_SIZE / 2 - } - bar := progressbar.NewOptions( - int(fstats.Size), - progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(fstats.Size)), - progressbar.OptionSetWriter(os.Stderr), - ) - bar.Add(blockSize * len(blocksToSkip)) - - if useWebsockets { - for { - data := <-dataChan - if data.err != nil { - return data.err - } - bar.Add(data.bytesRead) - // write data to websockets - err = c.WriteMessage(websocket.BinaryMessage, data.b) - if err != nil { - err = errors.Wrap(err, "problem writing message") - return err - } - if bytes.Equal(data.b, []byte("magic")) { - break - } - } - } else { - _ = <-isConnectedIfUsingTCP - log.Debug("connected and ready to send on tcp") - var wg sync.WaitGroup - wg.Add(len(tcpConnections)) - for i := range tcpConnections { - defer func(i int) { - log.Debugf("closing connection %d", i) - tcpConnections[i].Close() - }(i) - go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan) { - defer wg.Done() - for data := range dataChan { - if data.err != nil { - log.Error(data.err) - return - } - bar.Add(data.bytesRead) - // write data to tcp connection - _, err = tcpConnections[i].Write(data.b) - if err != nil { - err = errors.Wrap(err, "problem writing message") - log.Error(err) - return - } - if bytes.Equal(data.b, []byte("magic")) { - log.Debugf("%d got magic", i) - return - } - } - }(i, &wg, dataChan) - } - wg.Wait() - } - - bar.Finish() - log.Debug("send hash to finish file") - fileHash, err = utils.HashFile(fname) - if err != nil { - return err - } - case 6: - // recevied something, maybe the file hash - transferTime := time.Since(startTransfer) - if !bytes.HasPrefix(message, []byte("hash:")) { - log.Debugf("%s", message) - continue - } - c.WriteMessage(websocket.BinaryMessage, fileHash) - message = bytes.TrimPrefix(message, []byte("hash:")) - log.Debugf("[%d] determing whether it went ok", step) - if bytes.Equal(message, fileHash) { - log.Debug("file transfered successfully") - transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() - transferType := "MB/s" - if transferRate < 1 { - transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() - transferType = "kB/s" - } - fmt.Fprintf(os.Stderr, "\nTransfer complete (%2.1f %s)", transferRate, transferType) - return nil - } else { - fmt.Fprintf(os.Stderr, "\nTransfer corrupted") - return errors.New("file not transfered succesfully") - } - default: - return fmt.Errorf("unknown step") - } - step++ - } -} - -func connectToTCPServer(room string, address string) (com comm.Comm, err error) { - connection, err := net.DialTimeout("tcp", address, 3*time.Hour) - if err != nil { - return - } - connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) - connection.SetDeadline(time.Now().Add(3 * time.Hour)) - connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) - - com = comm.New(connection) - ok, err := com.Receive() - if err != nil { - return - } - log.Debugf("server says: %s", ok) - - err = com.Send(room) - if err != nil { - return - } - ok, err = com.Receive() - log.Debugf("server says: %s", ok) - if err != nil { - return - } - if ok != "sender" { - err = errors.New(ok) - } - return -}