From 9940856317943793a9a665a7e0b922823286f69a Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 9 Oct 2018 06:32:49 -0700 Subject: [PATCH] rearrange block communication --- src/recipient/recipient.go | 63 +++++++++++++++++++++++++++----------- src/sender/sender.go | 49 +++++++++++++++-------------- src/zipper/zip.go | 3 +- 3 files changed, 72 insertions(+), 43 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 1e181d7a..0f7840b3 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -51,6 +51,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var transferTime time.Duration var hash256 []byte var otherIP string + var progressFile string + var resumeFile bool var tcpConnections []comm.Comm dataChan := make(chan []byte, 1024*1024) blocks := []string{} @@ -132,17 +134,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } log.Debugf("%x\n", sessionKey) - // append the previous blocks if there was progress previously - file, errCrocProgress := os.Open("croc-progress") - if errCrocProgress == nil { - scanner := bufio.NewScanner(file) - for scanner.Scan() { - blocks = append(blocks, strings.TrimSpace(scanner.Text())) - } - file.Close() - } - blocksBytes, _ := json.Marshal(blocks) - c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) + c.WriteMessage(websocket.BinaryMessage, []byte("ready")) case 3: spin.Stop() @@ -164,9 +156,14 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo log.Debugf("got file stats: %+v", fstats) // prompt user if its okay to receive file + progressFile = fmt.Sprintf("%s.progress", fstats.SentName) overwritingOrReceiving := "Receiving" - if utils.Exists(fstats.Name) { + if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { overwritingOrReceiving = "Overwriting" + if utils.Exists(progressFile) && !useWebsockets { + overwritingOrReceiving = "Resume receiving" + resumeFile = true + } } fileOrFolder := "file" if fstats.IsDir { @@ -220,6 +217,26 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } } + + // append the previous blocks if there was progress previously + if resumeFile { + file, _ := os.Open(progressFile) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + blocks = append(blocks, strings.TrimSpace(scanner.Text())) + } + file.Close() + } + blocksBytes, _ := json.Marshal(blocks) + + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } + + // start the ui for pgoress bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) bar := progressbar.NewOptions( @@ -228,15 +245,24 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + bar.Add((len(blocks) * blockSize)) finished := make(chan bool) go func(finished chan bool, dataChan chan []byte) (err error) { - os.Remove("croc-progress2") - fProgress, errCreate := os.Create("croc-progress2") - if errCreate != nil { - panic(errCreate) + // remove previous progress + var fProgress *os.File + var progressErr error + if resumeFile { + fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND, 0644) + } else { + os.Remove(progressFile) + fProgress, progressErr = os.Create(progressFile) + } + if progressErr != nil { + panic(progressErr) } defer fProgress.Close() + blocksWritten := 0.0 blocksToWrite := float64(fstats.Size) if useWebsockets { @@ -305,7 +331,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo }(finished, dataChan) log.Debug("telling sender i'm ready") - c.WriteMessage(websocket.BinaryMessage, []byte("ready")) + c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) + startTime := time.Now() if useWebsockets { for { @@ -426,6 +453,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo fstats.Name = "stdout" } fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) + os.Remove(progressFile) } return err } else { @@ -435,7 +463,6 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } return errors.New("file corrupted") } - default: return fmt.Errorf("unknown step") } diff --git a/src/sender/sender.go b/src/sender/sender.go index 335557cc..03ce1456 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -199,14 +199,38 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, if !bytes.HasPrefix(message, []byte("ready")) { return errors.New("recipient refused file") } + + err = <-fileReady // block until file is ready + if err != nil { + return err + } + fstatsBytes, err := json.Marshal(fstats) + if err != nil { + return err + } + + // encrypt the file meta data + enc := crypt.Encrypt(fstatsBytes, sessionKey) + // send the file meta data + c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) + case 4: + spin.Stop() + + log.Debugf("[%d] recipient declares readiness for file data", step) + if !bytes.HasPrefix(message, []byte("ready")) { + return errors.New("recipient refused file") + } + // determine if any blocks were sent to skip var blocks []string errBlocks := json.Unmarshal(message[5:], &blocks) if errBlocks == nil { log.Debugf("found blocks: %+v", blocks) for _, block := range blocks { - blockInt64, _ := strconv.Atoi(block) - blocksToSkip[int64(blockInt64)] = struct{}{} + blockInt64, errBlock := strconv.Atoi(block) + if errBlock == nil { + blocksToSkip[int64(blockInt64)] = struct{}{} + } } } @@ -298,27 +322,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } }(dataChan) - err = <-fileReady // block until file is ready - if err != nil { - return err - } - fstatsBytes, err := json.Marshal(fstats) - if err != nil { - return err - } - - // encrypt the file meta data - enc := crypt.Encrypt(fstatsBytes, sessionKey) - // send the file meta data - c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) - case 4: - spin.Stop() - - log.Debugf("[%d] recipient declares readiness for file data", step) - if !bytes.Equal(message, []byte("ready")) { - return errors.New("recipient refused file") - } - // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") diff --git a/src/zipper/zip.go b/src/zipper/zip.go index faffbd7a..1d1fa066 100644 --- a/src/zipper/zip.go +++ b/src/zipper/zip.go @@ -4,7 +4,6 @@ import ( "archive/zip" "compress/flate" "io" - "io/ioutil" "os" "path/filepath" "strings" @@ -97,7 +96,7 @@ func ZipFile(fname string, compress bool) (writtenFilename string, err error) { return } log.Debugf("current directory: %s", curdir) - newfile, err := ioutil.TempFile(curdir, filename+".") + newfile, err := os.Create(fname + ".zip") if err != nil { log.Error(err) return