diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 5d283b28..1e181d7a 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -1,6 +1,7 @@ package recipient import ( + "bufio" "bytes" "encoding/json" "errors" @@ -52,6 +53,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var otherIP string var tcpConnections []comm.Comm dataChan := make(chan []byte, 1024*1024) + blocks := []string{} useWebsockets := true switch forceSend { @@ -129,7 +131,18 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } log.Debugf("%x\n", sessionKey) - c.WriteMessage(websocket.BinaryMessage, []byte("ready")) + + // append the previous blocks if there was progress previously + file, errCrocProgress := os.Open("croc-progress") + if errCrocProgress == nil { + scanner := bufio.NewScanner(file) + for scanner.Scan() { + blocks = append(blocks, strings.TrimSpace(scanner.Text())) + } + file.Close() + } + blocksBytes, _ := json.Marshal(blocks) + c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) case 3: spin.Stop() @@ -189,14 +202,23 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } // await file - f, err := os.Create(fstats.SentName) - if err != nil { - log.Error(err) - return err - } - if err = f.Truncate(fstats.Size); err != nil { - log.Error(err) - return err + var f *os.File + if utils.Exists(fstats.SentName) { + f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + if err != nil { + log.Error(err) + return err + } + } else { + f, err = os.Create(fstats.SentName) + if err != nil { + log.Error(err) + return err + } + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } } bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) @@ -209,6 +231,19 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo finished := make(chan bool) go func(finished chan bool, dataChan chan []byte) (err error) { + os.Remove("croc-progress2") + fProgress, errCreate := os.Create("croc-progress2") + if errCreate != nil { + panic(errCreate) + } + defer fProgress.Close() + blocksWritten := 0.0 + blocksToWrite := float64(fstats.Size) + if useWebsockets { + blocksToWrite = blocksToWrite / float64(models.WEBSOCKET_BUFFER_SIZE/8) + } else { + blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) + } for { message := <-dataChan // do decryption @@ -231,8 +266,9 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo pieces := bytes.SplitN(decrypted, []byte("-"), 2) decrypted = pieces[1] locationToWrite, _ = strconv.Atoi(string(pieces[0])) + log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite) + fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) } - // do decompression if fstats.IsCompressed && !fstats.IsDir { decrypted = compress.Decompress(decrypted) @@ -251,13 +287,15 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } if err != nil { + log.Error(err) return err } // update the bytes written bytesWritten += n + blocksWritten += 1.0 // update the progress bar bar.Add(n) - if int64(bytesWritten) == fstats.Size { + if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { log.Debug("finished") break } diff --git a/src/sender/sender.go b/src/sender/sender.go index caed83d3..335557cc 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -52,6 +53,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var otherIP string var startTransfer time.Time var tcpConnections []comm.Comm + blocksToSkip := make(map[int64]struct{}) type DataChan struct { b []byte @@ -169,87 +171,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } fileReady <- nil - // start streaming encryption/compression - go func(dataChan chan DataChan) { - var buffer []byte - if useWebsockets { - buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) - } else { - buffer = make([]byte, models.TCP_BUFFER_SIZE/2) - } - currentPostition := int64(0) - for { - bytesread, err := f.Read(buffer) - if bytesread > 0 { - // do compression - var compressedBytes []byte - if useCompression && !fstats.IsDir { - compressedBytes = compress.Compress(buffer[:bytesread]) - } else { - compressedBytes = buffer[:bytesread] - } - - // if using TCP, prepend the location to write the data to in the resulting file - if !useWebsockets { - compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) - } - - // do encryption - enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) - encBytes, err := json.Marshal(enc) - if err != nil { - dataChan <- DataChan{ - b: nil, - bytesRead: 0, - err: err, - } - return - } - - select { - case dataChan <- DataChan{ - b: encBytes, - bytesRead: bytesread, - err: nil, - }: - default: - log.Debug("blocked") - // no message sent - // block - dataChan <- DataChan{ - b: encBytes, - bytesRead: bytesread, - err: nil, - } - } - currentPostition += int64(bytesread) - } - if err != nil { - if err != io.EOF { - log.Error(err) - } - break - } - } - // finish - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - if !useWebsockets { - log.Debug("sending extra magic to %d others", len(tcpPorts)-1) - for i := 0; i < len(tcpPorts)-1; i++ { - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - } - } - }(dataChan) }() // send pake data @@ -275,9 +196,108 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, spin.Start() case 3: log.Debugf("[%d] recipient declares readiness for file info", step) - if !bytes.Equal(message, []byte("ready")) { + if !bytes.HasPrefix(message, []byte("ready")) { return errors.New("recipient refused file") } + // determine if any blocks were sent to skip + var blocks []string + errBlocks := json.Unmarshal(message[5:], &blocks) + if errBlocks == nil { + log.Debugf("found blocks: %+v", blocks) + for _, block := range blocks { + blockInt64, _ := strconv.Atoi(block) + blocksToSkip[int64(blockInt64)] = struct{}{} + } + } + + // start streaming encryption/compression + go func(dataChan chan DataChan) { + var buffer []byte + if useWebsockets { + buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) + } else { + buffer = make([]byte, models.TCP_BUFFER_SIZE/2) + } + currentPostition := int64(0) + for { + bytesread, err := f.Read(buffer) + if bytesread > 0 { + if _, ok := blocksToSkip[currentPostition]; ok { + log.Debugf("skipping the sending of block %d", currentPostition) + currentPostition += int64(bytesread) + continue + } + + // do compression + var compressedBytes []byte + if useCompression && !fstats.IsDir { + compressedBytes = compress.Compress(buffer[:bytesread]) + } else { + compressedBytes = buffer[:bytesread] + } + + // if using TCP, prepend the location to write the data to in the resulting file + if !useWebsockets { + compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) + } + + // do encryption + enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) + encBytes, err := json.Marshal(enc) + if err != nil { + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: err, + } + return + } + + select { + case dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + }: + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + } + } + currentPostition += int64(bytesread) + } + if err != nil { + if err != io.EOF { + log.Error(err) + } + break + } + } + // finish + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + if !useWebsockets { + log.Debug("sending extra magic to %d others", len(tcpPorts)-1) + for i := 0; i < len(tcpPorts)-1; i++ { + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + } + } + }(dataChan) + err = <-fileReady // block until file is ready if err != nil { return err