From 746f1fe193312d90baa27a4ceb6fd88f4b04d272 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Fri, 20 Oct 2017 17:26:04 -0600 Subject: [PATCH 1/3] Add SplitFile func --- utils.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ utils_test.go | 13 +++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 utils_test.go diff --git a/utils.go b/utils.go index f40c4a1e..62917444 100644 --- a/utils.go +++ b/utils.go @@ -4,9 +4,56 @@ import ( "crypto/md5" "fmt" "io" + "math" "os" + "strconv" ) +// SplitFile +func SplitFile(fileName string, numPieces int) (err error) { + file, err := os.Open(fileName) + if err != nil { + return err + } + defer file.Close() + fi, err := file.Stat() + if err != nil { + return err + } + + bytesPerPiece := int(math.Ceil(float64(fi.Size()) / float64(numPieces))) + bytesRead := 0 + i := 0 + out, err := os.Create(fileName + "." + strconv.Itoa(i)) + if err != nil { + return err + } + buf := make([]byte, 4096) + if bytesPerPiece < 4096/numPieces { + buf = make([]byte, bytesPerPiece) + } + for { + n, err := file.Read(buf) + out.Write(buf[:n]) + bytesRead += n + if err == io.EOF { + break + } + if bytesRead >= bytesPerPiece { + // Close file and open a new one + out.Close() + i++ + out, err = os.Create(fileName + "." + strconv.Itoa(i)) + if err != nil { + return err + } + bytesRead = 0 + } + } + out.Close() + return nil +} + // CopyFile copies a file from src to dst. If src and dst files exist, and are // the same, then return success. Otherise, attempt to create a hard link // between the two files. If that fail, copy the file contents from src to dst. diff --git a/utils_test.go b/utils_test.go new file mode 100644 index 00000000..df8be9b4 --- /dev/null +++ b/utils_test.go @@ -0,0 +1,13 @@ +package main + +import ( + "testing" +) + +func TestSplitFile(t *testing.T) { + err := SplitFile("README.md", 3) + if err != nil { + t.Error(err) + } + +} From 5eff228a3d29e39dc13d2fc55c4c1a32a3627c4b Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Fri, 20 Oct 2017 18:49:19 -0600 Subject: [PATCH 2/3] Add catfiles --- connect.go | 31 +++++++++++-------------------- utils.go | 24 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/connect.go b/connect.go index 7c98f6b3..bfbb0bad 100644 --- a/connect.go +++ b/connect.go @@ -290,7 +290,9 @@ func (c *Connection) runClient() error { if !gotOK { return errors.New("Transfer interrupted") } - c.catFile(c.File.Name) + if err := c.catFile(); err != nil { + return err + } log.Debugf("Code: [%s]", c.Code) if c.DontEncrypt { if err := CopyFile(c.File.Name+".enc", c.File.Name); err != nil { @@ -330,28 +332,17 @@ func fileAlreadyExists(s []string, f string) bool { return false } -func (c *Connection) catFile(fname string) { +func (c *Connection) catFile() error { // cat the file - os.Remove(fname) - finished, err := os.Create(fname + ".enc") - defer finished.Close() - if err != nil { - log.Fatal(err) + files := make([]string, c.NumberOfConnections) + for id := range files { + files[id] = c.File.Name + "." + strconv.Itoa(id) } - for id := 0; id < c.NumberOfConnections; id++ { - fh, err := os.Open(fname + "." + strconv.Itoa(id)) - if err != nil { - log.Fatal(err) - } - - _, err = io.Copy(finished, fh) - if err != nil { - log.Fatal(err) - } - fh.Close() - os.Remove(fname + "." + strconv.Itoa(id)) + toRemove := true + if c.Debug { + toRemove = false } - + return CatFiles(files, c.File.Name+".enc", toRemove) } func (c *Connection) receiveFile(id int, connection net.Conn) error { diff --git a/utils.go b/utils.go index 62917444..b15a4b90 100644 --- a/utils.go +++ b/utils.go @@ -9,6 +9,30 @@ import ( "strconv" ) +func CatFiles(files []string, outfile string, remove ...bool) error { + finished, err := os.Create(outfile) + defer finished.Close() + if err != nil { + return err + } + for i := range files { + fh, err := os.Open(files[i]) + if err != nil { + return err + } + + _, err = io.Copy(finished, fh) + if err != nil { + return err + } + fh.Close() + if len(remove) > 0 && remove[0] { + os.Remove(files[i]) + } + } + return nil +} + // SplitFile func SplitFile(fileName string, numPieces int) (err error) { file, err := os.Open(fileName) From a43e61b78aa858c2147bb08b379a36171b60506f Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Fri, 20 Oct 2017 19:09:40 -0600 Subject: [PATCH 3/3] Split encrypted bits into chunks to send in parallel --- connect.go | 91 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 47 insertions(+), 44 deletions(-) diff --git a/connect.go b/connect.go index bfbb0bad..360f7422 100644 --- a/connect.go +++ b/connect.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "math" "net" "os" "strconv" @@ -125,6 +124,9 @@ func (c *Connection) Run() error { if err := EncryptFile(c.File.Name, c.File.Name+".enc", c.Code); err != nil { return err } + if err := SplitFile(c.File.Name+".enc", c.NumberOfConnections); err != nil { + return err + } } // get file hash var err error @@ -137,6 +139,10 @@ func (c *Connection) Run() error { if err != nil { return err } + // remove the file now since we still have pieces + if err := os.Remove(c.File.Name + ".enc"); err != nil { + return err + } fmt.Printf("Sending %d byte file named '%s'\n", c.File.Size, c.File.Name) fmt.Printf("Code is: %s\n", c.Code) } @@ -207,8 +213,9 @@ func (c *Connection) runClient() error { time.Sleep(100 * time.Millisecond) // Write data from file logger.Debug("send file") - c.sendFile(id, connection) - fmt.Println("File sent.") + if err := c.sendFile(id, connection); err != nil { + log.Error(err) + } } } else { // this is a receiver logger.Debug("waiting for meta data from sender") @@ -281,7 +288,7 @@ func (c *Connection) runClient() error { if c.IsSender { // TODO: Add confirmation - fmt.Println("File sent.") + fmt.Println("\nFile sent.") } else { // Is a Receiver if notPresent { fmt.Println("Sender/Code not present") @@ -336,7 +343,7 @@ func (c *Connection) catFile() error { // cat the file files := make([]string, c.NumberOfConnections) for id := range files { - files[id] = c.File.Name + "." + strconv.Itoa(id) + files[id] = c.File.Name + ".enc." + strconv.Itoa(id) } toRemove := true if c.Debug { @@ -358,8 +365,8 @@ func (c *Connection) receiveFile(id int, connection net.Conn) error { chunkSize := int64(fileSizeInt) logger.Debugf("chunk size: %d", chunkSize) - os.Remove(c.File.Name + "." + strconv.Itoa(id)) - newFile, err := os.Create(c.File.Name + "." + strconv.Itoa(id)) + os.Remove(c.File.Name + ".enc." + strconv.Itoa(id)) + newFile, err := os.Create(c.File.Name + ".enc." + strconv.Itoa(id)) if err != nil { panic(err) } @@ -397,66 +404,62 @@ func (c *Connection) receiveFile(id int, connection net.Conn) error { return nil } -func (c *Connection) sendFile(id int, connection net.Conn) { +func (c *Connection) sendFile(id int, connection net.Conn) error { logger := log.WithFields(log.Fields{ "function": "sendFile #" + strconv.Itoa(id), }) defer connection.Close() - var err error - - numChunks := math.Ceil(float64(c.File.Size) / float64(BUFFERSIZE)) - chunksPerWorker := int(math.Ceil(numChunks / float64(c.NumberOfConnections))) - - chunkSize := int64(chunksPerWorker * BUFFERSIZE) - if id+1 == c.NumberOfConnections { - chunkSize = int64(c.File.Size) - int64(c.NumberOfConnections-1)*chunkSize - } - - if id == 0 || id == c.NumberOfConnections-1 { - logger.Debugf("numChunks: %v", numChunks) - logger.Debugf("chunksPerWorker: %v", chunksPerWorker) - logger.Debugf("bytesPerchunkSizeConnection: %v", chunkSize) - } - - logger.Debugf("sending chunk size: %d", chunkSize) - connection.Write([]byte(fillString(strconv.FormatInt(int64(chunkSize), 10), 10))) - - sendBuffer := make([]byte, BUFFERSIZE) - - // open encrypted file - file, err := os.Open(c.File.Name + ".enc") + // open encrypted file chunk + logger.Debug("opening encrypted file chunk: " + c.File.Name + ".enc." + strconv.Itoa(id)) + file, err := os.Open(c.File.Name + ".enc." + strconv.Itoa(id)) if err != nil { - log.Error(err) - return + return err } defer file.Close() - chunkI := 0 + // determine and send the file size to client + fi, err := file.Stat() + if err != nil { + return err + } + logger.Debugf("sending chunk size: %d", fi.Size()) + connection.Write([]byte(fillString(strconv.FormatInt(int64(fi.Size()), 10), 10))) + + // show the progress if !c.Debug { - c.bars[id] = uiprogress.AddBar(chunksPerWorker).AppendCompleted().PrependElapsed() + logger.Debug("going to show progress") + c.bars[id] = uiprogress.AddBar(int(fi.Size())).AppendCompleted().PrependElapsed() } + // rate limit the bandwidth + logger.Debug("determining rate limiting") bufferSizeInKilobytes := BUFFERSIZE / 1024 rate := float64(c.rate) / float64(c.NumberOfConnections*bufferSizeInKilobytes) throttle := time.NewTicker(time.Second / time.Duration(rate)) defer throttle.Stop() + // send the file + sendBuffer := make([]byte, BUFFERSIZE) + totalBytesSent := 0 for range throttle.C { - _, err = file.Read(sendBuffer) + n, err := file.Read(sendBuffer) + connection.Write(sendBuffer) + totalBytesSent += n + if !c.Debug { + c.bars[id].Set(totalBytesSent) + } if err == io.EOF { //End of file reached, break out of for loop logger.Debug("EOF") break } - if (chunkI >= chunksPerWorker*id && chunkI < chunksPerWorker*id+chunksPerWorker) || (id == c.NumberOfConnections-1 && chunkI >= chunksPerWorker*id) { - connection.Write(sendBuffer) - if !c.Debug { - c.bars[id].Incr() - } - } - chunkI++ } logger.Debug("file is sent") - return + logger.Debug("removing piece") + if !c.Debug { + file.Close() + err = os.Remove(c.File.Name + ".enc." + strconv.Itoa(id)) + } + return err }