From 3bed0bc8bf8b29038587d14f0b73330ffb5faa58 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Sun, 14 Oct 2018 17:10:08 -0700 Subject: [PATCH] move files here --- src/croc/recipient.go | 497 ++++++++++++++++++++++++++++++++++++++++++ src/croc/sender.go | 487 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 984 insertions(+) create mode 100644 src/croc/recipient.go create mode 100644 src/croc/sender.go diff --git a/src/croc/recipient.go b/src/croc/recipient.go new file mode 100644 index 00000000..84b22071 --- /dev/null +++ b/src/croc/recipient.go @@ -0,0 +1,497 @@ +package croc + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "strconv" + "strings" + "sync" + "time" + + humanize "github.com/dustin/go-humanize" + + log "github.com/cihub/seelog" + "github.com/gorilla/websocket" + "github.com/schollz/croc/src/comm" + "github.com/schollz/croc/src/compress" + "github.com/schollz/croc/src/crypt" + "github.com/schollz/croc/src/logger" + "github.com/schollz/croc/src/models" + "github.com/schollz/croc/src/utils" + "github.com/schollz/croc/src/zipper" + "github.com/schollz/pake" + "github.com/schollz/progressbar/v2" + "github.com/schollz/spinner" + "github.com/tscholl2/siec" +) + +var DebugLevel string + +// Receive is the async operation to receive a file +func (cr *Croc) startRecipient(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { + logger.SetLogLevel(DebugLevel) + err := cr.receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout) + if err != nil { + if !strings.HasPrefix(err.Error(), "websocket: close 100") { + fmt.Fprintf(os.Stderr, "\n"+err.Error()) + } + } + done <- struct{}{} +} + +func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { + var fstats models.FileStats + var sessionKey []byte + var transferTime time.Duration + var hash256 []byte + var otherIP string + var progressFile string + var resumeFile bool + var tcpConnections []comm.Comm + dataChan := make(chan []byte, 1024*1024) + isConnectedIfUsingTCP := make(chan bool) + blocks := []string{} + + useWebsockets := true + switch forceSend { + case 0: + if !isLocal { + useWebsockets = false + } + case 1: + useWebsockets = true + case 2: + useWebsockets = false + } + + // start a spinner + spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) + spin.Writer = os.Stderr + spin.Suffix = " performing PAKE..." + spin.Start() + + // pick an elliptic curve + curve := siec.SIEC255() + // both parties should have a weak key + pw := []byte(codephrase) + + // initialize recipient Q ("1" indicates recipient) + Q, err := pake.Init(pw, 1, curve, 1*time.Millisecond) + if err != nil { + return + } + + step := 0 + for { + messageType, message, err := c.ReadMessage() + if err != nil { + return err + } + if messageType == websocket.PongMessage || messageType == websocket.PingMessage { + continue + } + if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { + return errors.New("\rinterrupted by other party") + } + + log.Debugf("got %d: %s", messageType, message) + switch step { + case 0: + // sender has initiated, sends their ip address + otherIP = string(message) + log.Debugf("sender IP: %s", otherIP) + + // recipient begins by sending address + ip := "" + if isLocal { + ip = utils.LocalIP() + } else { + ip, _ = utils.PublicIP() + } + c.WriteMessage(websocket.BinaryMessage, []byte(ip)) + case 1: + + // Q receives u + log.Debugf("[%d] Q computes k, sends H(k), v back to P", step) + if err := Q.Update(message); err != nil { + return err + } + + // Q has the session key now, but we will still check if its valid + sessionKey, err = Q.SessionKey() + if err != nil { + return err + } + log.Debugf("%x\n", sessionKey) + + // initialize TCP connections if using (possible, but unlikely, race condition) + go func() { + if !useWebsockets { + log.Debugf("connecting to server") + tcpConnections = make([]comm.Comm, len(tcpPorts)) + for i, tcpPort := range tcpPorts { + log.Debugf("connecting to %d", i) + var message string + tcpConnections[i], message, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + } + if message != "recipient" { + log.Errorf("got wrong message: %s", message) + } + } + log.Debugf("fully connected") + } + isConnectedIfUsingTCP <- true + }() + + c.WriteMessage(websocket.BinaryMessage, Q.Bytes()) + case 2: + log.Debugf("[%d] Q recieves H(k) from P", step) + // check if everything is still kosher with our computed session key + if err := Q.Update(message); err != nil { + return err + } + c.WriteMessage(websocket.BinaryMessage, []byte("ready")) + case 3: + spin.Stop() + + // unmarshal the file info + log.Debugf("[%d] recieve file info", step) + // do decryption on the file stats + enc, err := crypt.FromBytes(message) + if err != nil { + return err + } + decryptedFileData, err := enc.Decrypt(sessionKey) + if err != nil { + return err + } + err = json.Unmarshal(decryptedFileData, &fstats) + if err != nil { + return err + } + log.Debugf("got file stats: %+v", fstats) + + // determine if the file is resuming or not + progressFile = fmt.Sprintf("%s.progress", fstats.SentName) + overwritingOrReceiving := "Receiving" + if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { + overwritingOrReceiving = "Overwriting" + if utils.Exists(progressFile) { + overwritingOrReceiving = "Resume receiving" + resumeFile = true + } + } + + // send blocks + if resumeFile { + fileWithBlocks, _ := os.Open(progressFile) + scanner := bufio.NewScanner(fileWithBlocks) + for scanner.Scan() { + blocks = append(blocks, strings.TrimSpace(scanner.Text())) + } + fileWithBlocks.Close() + } + blocksBytes, _ := json.Marshal(blocks) + // encrypt the block data and send + encblockBytes := crypt.Encrypt(blocksBytes, sessionKey) + c.WriteMessage(websocket.BinaryMessage, encblockBytes.Bytes()) + + // prompt user about the file + fileOrFolder := "file" + if fstats.IsDir { + fileOrFolder = "folder" + } + fmt.Fprintf(os.Stderr, "\r%s %s (%s) into: %s\n", + overwritingOrReceiving, + fileOrFolder, + humanize.Bytes(uint64(fstats.Size)), + fstats.Name, + ) + if !noPrompt { + if "y" != utils.GetInput("ok? (y/N): ") { + fmt.Fprintf(os.Stderr, "cancelling request") + c.WriteMessage(websocket.BinaryMessage, []byte("no")) + return nil + } + } + + // await file + // erase file if overwriting + if overwritingOrReceiving == "Overwriting" { + os.Remove(fstats.SentName) + } + var f *os.File + if utils.Exists(fstats.SentName) && resumeFile { + if !useWebsockets { + f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + } else { + f, err = os.OpenFile(fstats.SentName, os.O_APPEND|os.O_WRONLY, 0644) + } + if err != nil { + log.Error(err) + return err + } + } else { + f, err = os.Create(fstats.SentName) + if err != nil { + log.Error(err) + return err + } + if !useWebsockets { + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } + } + } + + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } + + // start the ui for pgoress + bytesWritten := 0 + fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) + bar := progressbar.NewOptions( + int(fstats.Size), + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionSetBytes(int(fstats.Size)), + progressbar.OptionSetWriter(os.Stderr), + ) + bar.Add((len(blocks) * blockSize)) + finished := make(chan bool) + + go func(finished chan bool, dataChan chan []byte) (err error) { + // remove previous progress + var fProgress *os.File + var progressErr error + if resumeFile { + fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND|os.O_WRONLY, 0644) + bytesWritten = len(blocks) * blockSize + } else { + os.Remove(progressFile) + fProgress, progressErr = os.Create(progressFile) + } + if progressErr != nil { + panic(progressErr) + } + defer fProgress.Close() + + blocksWritten := 0.0 + blocksToWrite := float64(fstats.Size) + if useWebsockets { + blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) + } else { + blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) + } + for { + message := <-dataChan + // do decryption + var enc crypt.Encryption + err = json.Unmarshal(message, &enc) + if err != nil { + // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) + if err != nil { + log.Error(err) + return err + } + + // get location if TCP + var locationToWrite int + if !useWebsockets { + pieces := bytes.SplitN(decrypted, []byte("-"), 2) + decrypted = pieces[1] + locationToWrite, _ = strconv.Atoi(string(pieces[0])) + } + + // do decompression + if fstats.IsCompressed && !fstats.IsDir { + decrypted = compress.Decompress(decrypted) + } + + var n int + if !useWebsockets { + if err != nil { + log.Error(err) + return err + } + n, err = f.WriteAt(decrypted, int64(locationToWrite)) + fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) + log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, locationToWrite, blocksWritten, blocksToWrite) + } else { + // write to file + n, err = f.Write(decrypted) + log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, bytesWritten, blocksWritten, blocksToWrite) + fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten)) + } + if err != nil { + log.Error(err) + return err + } + + // update the bytes written + bytesWritten += n + blocksWritten += 1.0 + // update the progress bar + bar.Add(n) + if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { + log.Debug("finished", int64(bytesWritten), fstats.Size, blocksWritten, blocksToWrite) + break + } + } + finished <- true + return + }(finished, dataChan) + + log.Debug("telling sender i'm ready") + c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) + + startTime := time.Now() + if useWebsockets { + for { + var messageType int + // read from websockets + messageType, message, err = c.ReadMessage() + if messageType != websocket.BinaryMessage { + continue + } + if err != nil { + log.Error(err) + return err + } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + break + } + dataChan <- message + // select { + // case dataChan <- message: + // default: + // log.Debug("blocked") + // // no message sent + // // block + // dataChan <- message + // } + } + } else { + _ = <-isConnectedIfUsingTCP + log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) + // using TCP + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) + for i := range tcpConnections { + defer func(i int) { + log.Debugf("closing connection %d", i) + tcpConnections[i].Close() + }(i) + go func(wg *sync.WaitGroup, j int) { + defer wg.Done() + for { + log.Debugf("waiting to read on %d", j) + // read from TCP connection + message, _, _, err := tcpConnections[j].Read() + // log.Debugf("message: %s", message) + if err != nil { + panic(err) + } + if bytes.Equal(message, []byte("magic")) { + log.Debugf("%d got magic, leaving", j) + return + } + dataChan <- message + } + }(&wg, i) + } + wg.Wait() + } + + _ = <-finished + log.Debug("telling sender i'm done") + c.WriteMessage(websocket.BinaryMessage, []byte("done")) + // we are finished + transferTime = time.Since(startTime) + + // close file + err = f.Close() + if err != nil { + return err + } + + // finish bar + bar.Finish() + + // check hash + hash256, err = utils.HashFile(fstats.SentName) + if err != nil { + log.Error(err) + return err + } + // tell the sender the hash so they can quit + c.WriteMessage(websocket.BinaryMessage, append([]byte("hash:"), hash256...)) + case 4: + // receive the hash from the sender so we can check it and quit + log.Debugf("got hash: %x", message) + if bytes.Equal(hash256, message) { + // open directory + if fstats.IsDir { + err = zipper.UnzipFile(fstats.SentName, ".") + if DebugLevel != "debug" { + os.Remove(fstats.SentName) + } + } else { + err = nil + } + if err == nil { + if useStdout && !fstats.IsDir { + var bFile []byte + bFile, err = ioutil.ReadFile(fstats.SentName) + if err != nil { + return err + } + os.Stdout.Write(bFile) + os.Remove(fstats.SentName) + } + transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() + transferType := "MB/s" + if transferRate < 1 { + transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() + transferType = "kB/s" + } + folderOrFile := "file" + if fstats.IsDir { + folderOrFile = "folder" + } + if useStdout { + fstats.Name = "stdout" + } + fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) + os.Remove(progressFile) + } + return err + } else { + if DebugLevel != "debug" { + log.Debug("removing corrupted file") + os.Remove(fstats.SentName) + } + return errors.New("file corrupted") + } + default: + return fmt.Errorf("unknown step") + } + step++ + } +} diff --git a/src/croc/sender.go b/src/croc/sender.go new file mode 100644 index 00000000..75831b96 --- /dev/null +++ b/src/croc/sender.go @@ -0,0 +1,487 @@ +package croc + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + log "github.com/cihub/seelog" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/schollz/croc/src/comm" + "github.com/schollz/croc/src/compress" + "github.com/schollz/croc/src/crypt" + "github.com/schollz/croc/src/logger" + "github.com/schollz/croc/src/models" + "github.com/schollz/croc/src/utils" + "github.com/schollz/croc/src/zipper" + "github.com/schollz/pake" + "github.com/schollz/progressbar/v2" + "github.com/schollz/spinner" + "github.com/tscholl2/siec" +) + +// Send is the async call to send data +func (cr *Croc) startSender(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { + logger.SetLogLevel(DebugLevel) + log.Debugf("sending %s", fname) + err := cr.send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption) + if err != nil { + if !strings.HasPrefix(err.Error(), "websocket: close 100") { + fmt.Fprintf(os.Stderr, "\n"+err.Error()) + } + } + + done <- struct{}{} +} + +func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { + var f *os.File + defer f.Close() // ignore the error if it wasn't opened :( + var fstats models.FileStats + var fileHash []byte + var otherIP string + var startTransfer time.Time + var tcpConnections []comm.Comm + blocksToSkip := make(map[int64]struct{}) + isConnectedIfUsingTCP := make(chan bool) + + type DataChan struct { + b []byte + currentPostition int64 + bytesRead int + err error + } + dataChan := make(chan DataChan, 1024*1024) + defer close(dataChan) + + useWebsockets := true + switch forceSend { + case 0: + if !isLocal { + useWebsockets = false + } + case 1: + useWebsockets = true + case 2: + useWebsockets = false + } + + fileReady := make(chan error) + + // normalize the file name + fname, err = filepath.Abs(fname) + if err != nil { + return err + } + _, filename := filepath.Split(fname) + + // get ready to generate session key + var sessionKey []byte + + // start a spinner + spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond) + spin.Writer = os.Stderr + + // pick an elliptic curve + curve := siec.SIEC255() + // both parties should have a weak key + pw := []byte(codephrase) + // initialize sender P ("0" indicates sender) + P, err := pake.Init(pw, 0, curve, 1*time.Millisecond) + if err != nil { + return + } + + step := 0 + for { + messageType, message, errRead := c.ReadMessage() + if errRead != nil { + return errRead + } + if messageType == websocket.PongMessage || messageType == websocket.PingMessage { + continue + } + if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) { + return errors.New("\rinterrupted by other party") + } + log.Debugf("got %d: %s", messageType, message) + switch step { + case 0: + // sender initiates communication + ip := "" + if isLocal { + ip = utils.LocalIP() + } else { + ip, _ = utils.PublicIP() + } + // send my IP address + c.WriteMessage(websocket.BinaryMessage, []byte(ip)) + case 1: + // first receive the IP address from the sender + otherIP = string(message) + log.Debugf("recipient IP: %s", otherIP) + + go func() { + // recipient might want file! start gathering information about file + fstat, err := os.Stat(fname) + if err != nil { + fileReady <- err + return + } + fstats = models.FileStats{ + Name: filename, + Size: fstat.Size(), + ModTime: fstat.ModTime(), + IsDir: fstat.IsDir(), + SentName: fstat.Name(), + IsCompressed: useCompression, + IsEncrypted: useEncryption, + } + if fstats.IsDir { + // zip the directory + fstats.SentName, err = zipper.ZipFile(fname, true) + if err != nil { + log.Error(err) + fileReady <- err + return + } + fname = fstats.SentName + + fstat, err := os.Stat(fname) + if err != nil { + fileReady <- err + return + } + // get new size + fstats.Size = fstat.Size() + } + + // open the file + f, err = os.Open(fname) + if err != nil { + fileReady <- err + return + } + fileReady <- nil + + }() + + // send pake data + log.Debugf("[%d] first, P sends u to Q", step) + c.WriteMessage(websocket.BinaryMessage, P.Bytes()) + // start PAKE spinnner + spin.Suffix = " performing PAKE..." + spin.Start() + case 2: + // P recieves H(k),v from Q + log.Debugf("[%d] P computes k, H(k), sends H(k) to Q", step) + if err := P.Update(message); err != nil { + return err + } + c.WriteMessage(websocket.BinaryMessage, P.Bytes()) + sessionKey, _ = P.SessionKey() + // check(err) + log.Debugf("%x\n", sessionKey) + + // wait for readiness + spin.Stop() + spin.Suffix = " waiting for recipient ok..." + spin.Start() + case 3: + log.Debugf("[%d] recipient declares readiness for file info", step) + if !bytes.HasPrefix(message, []byte("ready")) { + return errors.New("recipient refused file") + } + + // connect to TCP in background + tcpConnections = make([]comm.Comm, len(tcpPorts)) + go func() { + if !useWebsockets { + log.Debugf("connecting to server") + for i, tcpPort := range tcpPorts { + log.Debugf("connecting to %s on connection %d", tcpPort, i) + var message string + tcpConnections[i], message, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + } + if message != "sender" { + log.Errorf("got wrong message: %s", message) + } + } + } + isConnectedIfUsingTCP <- true + }() + + err = <-fileReady // block until file is ready + if err != nil { + return err + } + fstatsBytes, err := json.Marshal(fstats) + if err != nil { + return err + } + + // encrypt the file meta data + enc := crypt.Encrypt(fstatsBytes, sessionKey) + // send the file meta data + c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) + case 4: + log.Debugf("[%d] recipient declares gives blocks", step) + // recipient sends blocks, and sender does not send anything back + // determine if any blocks were sent to skip + enc, err := crypt.FromBytes(message) + if err != nil { + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey) + if err != nil { + err = errors.Wrap(err, "could not decrypt blocks with session key") + log.Error(err) + return err + } + + var blocks []string + errBlocks := json.Unmarshal(decrypted, &blocks) + if errBlocks == nil { + for _, block := range blocks { + blockInt64, errBlock := strconv.Atoi(block) + if errBlock == nil { + blocksToSkip[int64(blockInt64)] = struct{}{} + } + } + } + log.Debugf("found blocks: %+v", blocksToSkip) + + // start loading the file into memory + // start streaming encryption/compression + if fstats.IsDir { + // remove file if zipped + defer os.Remove(fstats.SentName) + } + go func(dataChan chan DataChan) { + var buffer []byte + if useWebsockets { + buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) + } else { + buffer = make([]byte, models.TCP_BUFFER_SIZE/2) + } + + currentPostition := int64(0) + for { + bytesread, err := f.Read(buffer) + if bytesread > 0 { + if _, ok := blocksToSkip[currentPostition]; ok { + log.Debugf("skipping the sending of block %d", currentPostition) + currentPostition += int64(bytesread) + continue + } + + // do compression + var compressedBytes []byte + if useCompression && !fstats.IsDir { + compressedBytes = compress.Compress(buffer[:bytesread]) + } else { + compressedBytes = buffer[:bytesread] + } + + // if using TCP, prepend the location to write the data to in the resulting file + if !useWebsockets { + compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) + } + + // do encryption + enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) + encBytes, err := json.Marshal(enc) + if err != nil { + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: err, + } + return + } + + dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + } + currentPostition += int64(bytesread) + } + if err != nil { + if err != io.EOF { + log.Error(err) + } + break + } + } + // finish + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + if !useWebsockets { + log.Debug("sending extra magic to %d others", len(tcpPorts)-1) + for i := 0; i < len(tcpPorts)-1; i++ { + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + } + } + }(dataChan) + + case 5: + spin.Stop() + + log.Debugf("[%d] recipient declares readiness for file data", step) + if !bytes.HasPrefix(message, []byte("ready")) { + return errors.New("recipient refused file") + } + + fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) + // send file, compure hash simultaneously + startTransfer = time.Now() + + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } + bar := progressbar.NewOptions( + int(fstats.Size), + progressbar.OptionSetRenderBlankState(true), + progressbar.OptionSetBytes(int(fstats.Size)), + progressbar.OptionSetWriter(os.Stderr), + ) + bar.Add(blockSize * len(blocksToSkip)) + + if useWebsockets { + for { + data := <-dataChan + if data.err != nil { + return data.err + } + bar.Add(data.bytesRead) + // write data to websockets + err = c.WriteMessage(websocket.BinaryMessage, data.b) + if err != nil { + err = errors.Wrap(err, "problem writing message") + return err + } + if bytes.Equal(data.b, []byte("magic")) { + break + } + } + } else { + _ = <-isConnectedIfUsingTCP + log.Debug("connected and ready to send on tcp") + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) + for i := range tcpConnections { + defer func(i int) { + log.Debugf("closing connection %d", i) + tcpConnections[i].Close() + }(i) + go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan) { + defer wg.Done() + for data := range dataChan { + if data.err != nil { + log.Error(data.err) + return + } + bar.Add(data.bytesRead) + // write data to tcp connection + _, err = tcpConnections[i].Write(data.b) + if err != nil { + err = errors.Wrap(err, "problem writing message") + log.Error(err) + return + } + if bytes.Equal(data.b, []byte("magic")) { + log.Debugf("%d got magic", i) + return + } + } + }(i, &wg, dataChan) + } + wg.Wait() + } + + bar.Finish() + log.Debug("send hash to finish file") + fileHash, err = utils.HashFile(fname) + if err != nil { + return err + } + case 6: + // recevied something, maybe the file hash + transferTime := time.Since(startTransfer) + if !bytes.HasPrefix(message, []byte("hash:")) { + log.Debugf("%s", message) + continue + } + c.WriteMessage(websocket.BinaryMessage, fileHash) + message = bytes.TrimPrefix(message, []byte("hash:")) + log.Debugf("[%d] determing whether it went ok", step) + if bytes.Equal(message, fileHash) { + log.Debug("file transfered successfully") + transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds() + transferType := "MB/s" + if transferRate < 1 { + transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds() + transferType = "kB/s" + } + fmt.Fprintf(os.Stderr, "\nTransfer complete (%2.1f %s)", transferRate, transferType) + return nil + } else { + fmt.Fprintf(os.Stderr, "\nTransfer corrupted") + return errors.New("file not transfered succesfully") + } + default: + return fmt.Errorf("unknown step") + } + step++ + } +} + +func connectToTCPServer(room string, address string) (com comm.Comm, message string, err error) { + connection, err := net.DialTimeout("tcp", address, 3*time.Hour) + if err != nil { + return + } + connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) + connection.SetDeadline(time.Now().Add(3 * time.Hour)) + connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) + + com = comm.New(connection) + ok, err := com.Receive() + if err != nil { + return + } + log.Debugf("server says: %s", ok) + + err = com.Send(room) + if err != nil { + return + } + message, err = com.Receive() + log.Debugf("server says: %s", message) + return +}