From 311c144ebd18cd56c982548c4274c485043ef354 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Thu, 4 Apr 2019 06:35:27 -0700 Subject: [PATCH] logic in place for handling multiple files - hash file with info and check hash - make explicit request for chunks - check file directly for empty chunks, request them - skip files if they already exist on machine --- main.go | 18 +++- src/croc/croc.go | 275 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 207 insertions(+), 86 deletions(-) diff --git a/main.go b/main.go index 8ce46aff..07cb249e 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,22 @@ import ( ) func main() { + // f, _ := os.Create("test.1") + // f.Truncate(8096) + // f.Close() + + // file, _ := os.Open("test.1") + // defer file.Close() + + // buffer := make([]byte, 4096) + // emptyBuffer := make([]byte, 4096) + // for { + // bytesread, err := file.Read(buffer) + // if err != nil { + // break + // } + // fmt.Println(bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread])) + // } var sender bool flag.BoolVar(&sender, "sender", false, "sender") flag.Parse() @@ -18,7 +34,7 @@ func main() { err = c.Send(croc.TransferOptions{ // PathToFile: "../wskeystore/README.md", // PathToFile: "./src/croc/croc.go", - PathToFile: "C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", + PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"}, KeepPathInRemote: false, }) } else { diff --git a/src/croc/croc.go b/src/croc/croc.go index ab75ad1b..29c6ca6a 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math" "os" "path" "path/filepath" @@ -19,6 +20,7 @@ import ( "github.com/pions/webrtc/examples/util" "github.com/pions/webrtc/pkg/datachannel" "github.com/pions/webrtc/pkg/ice" + "github.com/schollz/croc/src/utils" "github.com/schollz/pake" "github.com/schollz/progressbar/v2" "github.com/sirupsen/logrus" @@ -26,6 +28,8 @@ import ( var log = logrus.New() +const BufferSize = 4096 + func init() { log.SetFormatter(&logrus.TextFormatter{ForceColors: true}) log.SetOutput(colorable.NewColorableStdout()) @@ -39,19 +43,21 @@ type Client struct { IsSender bool SharedSecret string Pake *pake.Pake - Filename string - Folder string // steps involved in forming relationship - Step1ChannelSecured bool - Step2FileInfoTransfered bool - Step3RecipientReady bool - Step4SendingData bool + Step1ChannelSecured bool + Step2FileInfoTransfered bool + Step3RecipientRequestFile bool + Step4FileTransfer bool + Step5RecipientCheckFile bool // TODO: Step5 should close files and reset things - // send / receive information - f *os.File - FileInfo FileInfo - chunksToSend []int64 + // send / receive information of all files + FilesToTransfer []FileInfo + FilesToTransferCurrentNum int + + // send / receive information of current file + CurrentFile *os.File + CurrentFileChunks []int64 // channel data incomingMessageChannel <-chan *redis.Message @@ -78,13 +84,20 @@ type Chunk struct { type FileInfo struct { Name string `json:"n,omitempty"` - Folder string `json:"f,omitempty"` + FolderRemote string `json:"fr,omitempty"` + FolderSource string `json:"fs,omitempty"` + Hash []byte `json:"h,omitempty"` Size int64 `json:"s,omitempty"` ModTime time.Time `json:"m,omitempty"` IsCompressed bool `json:"c,omitempty"` IsEncrypted bool `json:"e,omitempty"` } +type RemoteFileRequest struct { + CurrentFileChunks []int64 + FilesToTransferCurrentNum int +} + func (m Message) String() string { b, _ := json.Marshal(m) return string(b) @@ -151,7 +164,7 @@ func New(sender bool, sharedSecret string) (c *Client, err error) { } type TransferOptions struct { - PathToFile string + PathToFiles []string KeepPathInRemote bool } @@ -167,46 +180,56 @@ func (c *Client) Receive() (err error) { func (c *Client) transfer(options TransferOptions) (err error) { if c.IsSender { - var fstats os.FileInfo - fstats, err = os.Stat(path.Join(options.PathToFile)) - if err != nil { - return - } - c.FileInfo = FileInfo{ - Name: fstats.Name(), - Folder: ".", - Size: fstats.Size(), - ModTime: fstats.ModTime(), - } - if options.KeepPathInRemote { - var fullPath, curFolder string - fullPath, err = filepath.Abs(options.PathToFile) + c.FilesToTransfer = make([]FileInfo, len(options.PathToFiles)) + for i, pathToFile := range options.PathToFiles { + var fstats os.FileInfo + var fullPath string + fullPath, err = filepath.Abs(pathToFile) if err != nil { return } fullPath = filepath.Clean(fullPath) - folderName, _ := filepath.Split(fullPath) - curFolder, err = os.Getwd() + fstats, err = os.Stat(path.Join(fullPath)) if err != nil { return } - curFolder, err = filepath.Abs(curFolder) + c.FilesToTransfer[i] = FileInfo{ + Name: fstats.Name(), + FolderRemote: ".", + FolderSource: fullPath, + Size: fstats.Size(), + ModTime: fstats.ModTime(), + } + c.FilesToTransfer[i].Hash, err = utils.HashFile(fullPath) if err != nil { return } - if !strings.HasPrefix(folderName, curFolder) { - err = fmt.Errorf("remote directory must be relative to current") - return - } - c.FileInfo.Folder = strings.TrimPrefix(folderName, curFolder) - c.FileInfo.Folder = filepath.ToSlash(c.FileInfo.Folder) - c.FileInfo.Folder = strings.TrimPrefix(c.FileInfo.Folder, "/") - if c.FileInfo.Folder == "" { - c.FileInfo.Folder = "." + if options.KeepPathInRemote { + var curFolder string + folderName, _ := filepath.Split(fullPath) + + curFolder, err = os.Getwd() + if err != nil { + return + } + curFolder, err = filepath.Abs(curFolder) + if err != nil { + return + } + if !strings.HasPrefix(folderName, curFolder) { + err = fmt.Errorf("remote directory must be relative to current") + return + } + c.FilesToTransfer[i].FolderRemote = strings.TrimPrefix(folderName, curFolder) + c.FilesToTransfer[i].FolderRemote = filepath.ToSlash(c.FilesToTransfer[i].FolderRemote) + c.FilesToTransfer[i].FolderRemote = strings.TrimPrefix(c.FilesToTransfer[i].FolderRemote, "/") + if c.FilesToTransfer[i].FolderRemote == "" { + c.FilesToTransfer[i].FolderRemote = "." + } } + log.Debugf("file %d info: %+v", i, c.FilesToTransfer[i]) } - log.Debugf("file info: %+v", c.FileInfo) } // create channel for quitting // quit with c.quit <- true @@ -249,20 +272,20 @@ func (c *Client) transfer(options TransferOptions) (err error) { func (c *Client) sendOverRedis() (err error) { go func() { bar := progressbar.NewOptions( - int(c.FileInfo.Size), + int(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size), progressbar.OptionSetRenderBlankState(true), - progressbar.OptionSetBytes(int(c.FileInfo.Size)), + progressbar.OptionSetBytes(int(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size)), progressbar.OptionSetWriter(os.Stderr), progressbar.OptionThrottle(1/60*time.Second), ) - c.f, err = os.Open(c.FileInfo.Name) + c.CurrentFile, err = os.Open(c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) if err != nil { panic(err) } location := int64(0) for { buf := make([]byte, 4096*128) - n, errRead := c.f.Read(buf) + n, errRead := c.CurrentFile.Read(buf) bar.Add(n) chunk := Chunk{ Bytes: buf[:n], @@ -307,35 +330,28 @@ func (c *Client) processMessage(m Message) (err error) { c.Step1ChannelSecured = true } case "fileinfo": - err = json.Unmarshal(m.Bytes, &c.FileInfo) - if err != nil { - return - } - c.log.Debug(c.FileInfo) - if c.FileInfo.Folder != "." { - err = os.MkdirAll(c.FileInfo.Folder, os.ModeDir) - if err != nil { - return - } - } - c.f, err = os.Create(path.Join(c.FileInfo.Folder, c.FileInfo.Name)) - if err != nil { - return - } - err = c.f.Truncate(c.FileInfo.Size) + err = json.Unmarshal(m.Bytes, &c.FilesToTransfer) if err != nil { return } + c.log.Debug(c.FilesToTransfer) c.Step2FileInfoTransfered = true case "recipientready": - c.Step3RecipientReady = true + var remoteFile RemoteFileRequest + err = json.Unmarshal(m.Bytes, &remoteFile) + if err != nil { + return + } + c.FilesToTransferCurrentNum = remoteFile.FilesToTransferCurrentNum + c.CurrentFileChunks = remoteFile.CurrentFileChunks + c.Step3RecipientRequestFile = true case "chunk": var chunk Chunk err = json.Unmarshal(m.Bytes, &chunk) if err != nil { return } - _, err = c.f.WriteAt(chunk.Bytes, chunk.Location) + _, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) c.log.Debug("writing chunk", chunk.Location) case "datachannel-offer": offer := util.Decode(m.Message) @@ -373,18 +389,7 @@ func (c *Client) processMessage(m Message) (err error) { func (c *Client) updateState() (err error) { if c.IsSender && c.Step1ChannelSecured && !c.Step2FileInfoTransfered { - var fstats os.FileInfo - fstats, err = os.Stat(path.Join(c.Folder, c.Filename)) - if err != nil { - return - } - c.FileInfo = FileInfo{ - Name: c.Filename, - Folder: c.Folder, - Size: fstats.Size(), - ModTime: fstats.ModTime(), - } - b, _ := json.Marshal(c.FileInfo) + b, _ := json.Marshal(c.FilesToTransfer) err = c.redisdb.Publish(c.nameOutChannel, Message{ Type: "fileinfo", Bytes: b, @@ -394,15 +399,80 @@ func (c *Client) updateState() (err error) { } c.Step2FileInfoTransfered = true } - if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientReady { - // TODO: recipient requests the chunk locations (if empty, then should receive all chunks) + if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientRequestFile { + // find the next file to transfer and send that number + // if the files are the same size, then look for missing chunks + finished := true + for i, fileInfo := range c.FilesToTransfer { + if i < c.FilesToTransferCurrentNum { + continue + } + fileHash, errHash := utils.HashFile(path.Join(fileInfo.FolderRemote, fileInfo.Name)) + if errHash != nil || !bytes.Equal(fileHash, fileInfo.Hash) { + finished = false + c.FilesToTransferCurrentNum = i + break + } + // TODO: print out something about this file already existing + } + if finished { + // TODO: do the last finishing stuff + log.Debug("finished") + os.Exit(1) + } + + // start initiating the process to receive a new file + log.Debugf("working on file %d", c.FilesToTransferCurrentNum) + + // setup folder for new file + if c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote != "." { + err = os.MkdirAll(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, os.ModeDir) + if err != nil { + return + } + } + + pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) + + // check if file should be overwritten, or simply fixed with missing chunks + overwrite := true + fstats, errStats := os.Stat(pathToFile) + if errStats == nil { + if fstats.Size() == c.FilesToTransfer[c.FilesToTransferCurrentNum].Size { + // just request missing chunks + c.CurrentFileChunks = MissingChunks(pathToFile, fstats.Size(), 4096) + log.Debugf("found %d missing chunks", len(c.CurrentFileChunks)) + overwrite = false + } + } else { + c.CurrentFileChunks = []int64{} + } + if overwrite { + c.CurrentFile, err = os.Create(pathToFile) + if err != nil { + return + } + err = c.CurrentFile.Truncate(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size) + } else { + c.CurrentFile, err = os.Open(pathToFile) + } + if err != nil { + return + } + + // recipient requests the file and chunks (if empty, then should receive all chunks) + bRequest, _ := json.Marshal(RemoteFileRequest{ + CurrentFileChunks: c.CurrentFileChunks, + FilesToTransferCurrentNum: c.FilesToTransferCurrentNum, + }) err = c.redisdb.Publish(c.nameOutChannel, Message{ - Type: "recipientready", + Type: "recipientready", + Bytes: bRequest, }.String()).Err() if err != nil { return } - c.Step3RecipientReady = true + c.Step3RecipientRequestFile = true // start receiving data go func() { err = c.dataChannelReceive() @@ -411,9 +481,9 @@ func (c *Client) updateState() (err error) { } }() } - if c.IsSender && c.Step3RecipientReady && !c.Step4SendingData { + if c.IsSender && c.Step3RecipientRequestFile && !c.Step4FileTransfer { c.log.Debug("start sending data!") - c.Step4SendingData = true + c.Step4FileTransfer = true go func() { err = c.dataChannelSend() if err != nil { @@ -466,7 +536,7 @@ func (c *Client) dataChannelReceive() (err error) { timer := time.Now() var mutex = &sync.Mutex{} piecesToDo := make(map[int64]bool) - for i := int64(0); i < c.FileInfo.Size; i += 4096 { + for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += 4096 { piecesToDo[i] = true } // Register message handling @@ -476,8 +546,9 @@ func (c *Client) dataChannelReceive() (err error) { case *datachannel.PayloadString: fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data)) if bytes.Equal(p.Data, []byte("done")) { - c.f.Close() + c.CurrentFile.Close() c.log.Debug(time.Since(timer)) + // TODO: handle done, close file, reset things and check for missing blocks } case *datachannel.PayloadBinary: if !startTime { @@ -491,7 +562,7 @@ func (c *Client) dataChannelReceive() (err error) { } var n int mutex.Lock() - n, err = c.f.WriteAt(chunk.Bytes, chunk.Location) + n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location) mutex.Unlock() if err != nil { panic(err) @@ -563,9 +634,10 @@ func (c *Client) dataChannelSend() (err error) { fmt.Printf("Data channel '%s'-'%d' open\n", c.dataChannel.Label, c.dataChannel.ID) time.Sleep(100 * time.Microsecond) - c.log.Debug("sending file") - const BufferSize = 4096 - file, err := os.Open("test.txt") + pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name) + c.log.Debugf("sending '%s'", pathToFile) + + file, err := os.Open(pathToFile) if err != nil { c.log.Debug(err) return @@ -636,3 +708,36 @@ func (c *Client) dataChannelSend() (err error) { return } + +// MissingChunks returns the positions of missing chunks. +// If file doesn't exist, it returns an empty chunk list (all chunks). +// If the file size is not the same as requested, it returns an empty chunk list (all chunks). +func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) { + fstat, err := os.Stat(fname) + if fstat.Size() != fsize { + return + } + + f, err := os.Open(fname) + if err != nil { + return + } + defer f.Close() + + buffer := make([]byte, chunkSize) + emptyBuffer := make([]byte, chunkSize) + chunkNum := 0 + chunks = make([]int64, int64(math.Ceil(float64(fsize)/float64(chunkSize)))) + var currentLocation int64 + for { + bytesread, err := f.Read(buffer) + if err != nil { + break + } + if bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread]) { + chunks[chunkNum] = currentLocation + } + currentLocation += int64(bytesread) + } + return +}