From c600b51888716fbbd7b4f89cbec593371158c48c Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 9 Oct 2018 06:56:38 -0700 Subject: [PATCH] resume works with websockets or tcp --- src/recipient/recipient.go | 28 +++++++++++++++++++--------- src/sender/sender.go | 9 ++++++++- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 0f7840b3..12e618c1 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -160,7 +160,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo overwritingOrReceiving := "Receiving" if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { overwritingOrReceiving = "Overwriting" - if utils.Exists(progressFile) && !useWebsockets { + if utils.Exists(progressFile) { overwritingOrReceiving = "Resume receiving" resumeFile = true } @@ -200,8 +200,12 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo // await file var f *os.File - if utils.Exists(fstats.SentName) { - f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + if utils.Exists(fstats.SentName) && resumeFile { + if !useWebsockets { + f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + } else { + f, err = os.OpenFile(fstats.SentName, os.O_APPEND, 0644) + } if err != nil { log.Error(err) return err @@ -212,9 +216,11 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo log.Error(err) return err } - if err = f.Truncate(fstats.Size); err != nil { - log.Error(err) - return err + if !useWebsockets { + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } } } @@ -254,6 +260,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var progressErr error if resumeFile { fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND, 0644) + bytesWritten = len(blocks) * blockSize } else { os.Remove(progressFile) fProgress, progressErr = os.Create(progressFile) @@ -266,7 +273,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo blocksWritten := 0.0 blocksToWrite := float64(fstats.Size) if useWebsockets { - blocksToWrite = blocksToWrite / float64(models.WEBSOCKET_BUFFER_SIZE/8) + blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) } else { blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) } @@ -292,9 +299,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo pieces := bytes.SplitN(decrypted, []byte("-"), 2) decrypted = pieces[1] locationToWrite, _ = strconv.Atoi(string(pieces[0])) - log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite) - fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) } + // do decompression if fstats.IsCompressed && !fstats.IsDir { decrypted = compress.Decompress(decrypted) @@ -307,9 +313,13 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } n, err = f.WriteAt(decrypted, int64(locationToWrite)) + fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) + log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite) } else { // write to file n, err = f.Write(decrypted) + log.Debugf("writing to location %d (%2.0f/%2.0f)", bytesWritten, blocksWritten, blocksToWrite) + fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten)) } if err != nil { diff --git a/src/sender/sender.go b/src/sender/sender.go index 03ce1456..f386c413 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -225,7 +225,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var blocks []string errBlocks := json.Unmarshal(message[5:], &blocks) if errBlocks == nil { - log.Debugf("found blocks: %+v", blocks) for _, block := range blocks { blockInt64, errBlock := strconv.Atoi(block) if errBlock == nil { @@ -233,6 +232,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } } + log.Debugf("found blocks: %+v", blocksToSkip) // start streaming encryption/compression go func(dataChan chan DataChan) { @@ -341,12 +341,19 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, // send file, compure hash simultaneously startTransfer = time.Now() + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } bar := progressbar.NewOptions( int(fstats.Size), progressbar.OptionSetRenderBlankState(true), progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + bar.Add(blockSize * len(blocksToSkip)) if useWebsockets { for {