1
1
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

logic in place for handling multiple files

- hash file with info and check hash
- make explicit request for chunks
- check file directly for empty chunks, request them
- skip files if they already exist on machine
This commit is contained in:
Zack Scholl 2019-04-04 06:35:27 -07:00
parent f74816c7c7
commit 311c144ebd
2 changed files with 207 additions and 86 deletions

18
main.go
View file

@ -7,6 +7,22 @@ import (
)
func main() {
// f, _ := os.Create("test.1")
// f.Truncate(8096)
// f.Close()
// file, _ := os.Open("test.1")
// defer file.Close()
// buffer := make([]byte, 4096)
// emptyBuffer := make([]byte, 4096)
// for {
// bytesread, err := file.Read(buffer)
// if err != nil {
// break
// }
// fmt.Println(bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread]))
// }
var sender bool
flag.BoolVar(&sender, "sender", false, "sender")
flag.Parse()
@ -18,7 +34,7 @@ func main() {
err = c.Send(croc.TransferOptions{
// PathToFile: "../wskeystore/README.md",
// PathToFile: "./src/croc/croc.go",
PathToFile: "C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go",
PathToFiles: []string{"C:\\Users\\zacks\\go\\src\\github.com\\schollz\\croc\\src\\croc\\croc.go", "croc.exe"},
KeepPathInRemote: false,
})
} else {

View file

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"os"
"path"
"path/filepath"
@ -19,6 +20,7 @@ import (
"github.com/pions/webrtc/examples/util"
"github.com/pions/webrtc/pkg/datachannel"
"github.com/pions/webrtc/pkg/ice"
"github.com/schollz/croc/src/utils"
"github.com/schollz/pake"
"github.com/schollz/progressbar/v2"
"github.com/sirupsen/logrus"
@ -26,6 +28,8 @@ import (
var log = logrus.New()
const BufferSize = 4096
func init() {
log.SetFormatter(&logrus.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout())
@ -39,19 +43,21 @@ type Client struct {
IsSender bool
SharedSecret string
Pake *pake.Pake
Filename string
Folder string
// steps involved in forming relationship
Step1ChannelSecured bool
Step2FileInfoTransfered bool
Step3RecipientReady bool
Step4SendingData bool
Step1ChannelSecured bool
Step2FileInfoTransfered bool
Step3RecipientRequestFile bool
Step4FileTransfer bool
Step5RecipientCheckFile bool // TODO: Step5 should close files and reset things
// send / receive information
f *os.File
FileInfo FileInfo
chunksToSend []int64
// send / receive information of all files
FilesToTransfer []FileInfo
FilesToTransferCurrentNum int
// send / receive information of current file
CurrentFile *os.File
CurrentFileChunks []int64
// channel data
incomingMessageChannel <-chan *redis.Message
@ -78,13 +84,20 @@ type Chunk struct {
type FileInfo struct {
Name string `json:"n,omitempty"`
Folder string `json:"f,omitempty"`
FolderRemote string `json:"fr,omitempty"`
FolderSource string `json:"fs,omitempty"`
Hash []byte `json:"h,omitempty"`
Size int64 `json:"s,omitempty"`
ModTime time.Time `json:"m,omitempty"`
IsCompressed bool `json:"c,omitempty"`
IsEncrypted bool `json:"e,omitempty"`
}
type RemoteFileRequest struct {
CurrentFileChunks []int64
FilesToTransferCurrentNum int
}
func (m Message) String() string {
b, _ := json.Marshal(m)
return string(b)
@ -151,7 +164,7 @@ func New(sender bool, sharedSecret string) (c *Client, err error) {
}
type TransferOptions struct {
PathToFile string
PathToFiles []string
KeepPathInRemote bool
}
@ -167,46 +180,56 @@ func (c *Client) Receive() (err error) {
func (c *Client) transfer(options TransferOptions) (err error) {
if c.IsSender {
var fstats os.FileInfo
fstats, err = os.Stat(path.Join(options.PathToFile))
if err != nil {
return
}
c.FileInfo = FileInfo{
Name: fstats.Name(),
Folder: ".",
Size: fstats.Size(),
ModTime: fstats.ModTime(),
}
if options.KeepPathInRemote {
var fullPath, curFolder string
fullPath, err = filepath.Abs(options.PathToFile)
c.FilesToTransfer = make([]FileInfo, len(options.PathToFiles))
for i, pathToFile := range options.PathToFiles {
var fstats os.FileInfo
var fullPath string
fullPath, err = filepath.Abs(pathToFile)
if err != nil {
return
}
fullPath = filepath.Clean(fullPath)
folderName, _ := filepath.Split(fullPath)
curFolder, err = os.Getwd()
fstats, err = os.Stat(path.Join(fullPath))
if err != nil {
return
}
curFolder, err = filepath.Abs(curFolder)
c.FilesToTransfer[i] = FileInfo{
Name: fstats.Name(),
FolderRemote: ".",
FolderSource: fullPath,
Size: fstats.Size(),
ModTime: fstats.ModTime(),
}
c.FilesToTransfer[i].Hash, err = utils.HashFile(fullPath)
if err != nil {
return
}
if !strings.HasPrefix(folderName, curFolder) {
err = fmt.Errorf("remote directory must be relative to current")
return
}
c.FileInfo.Folder = strings.TrimPrefix(folderName, curFolder)
c.FileInfo.Folder = filepath.ToSlash(c.FileInfo.Folder)
c.FileInfo.Folder = strings.TrimPrefix(c.FileInfo.Folder, "/")
if c.FileInfo.Folder == "" {
c.FileInfo.Folder = "."
if options.KeepPathInRemote {
var curFolder string
folderName, _ := filepath.Split(fullPath)
curFolder, err = os.Getwd()
if err != nil {
return
}
curFolder, err = filepath.Abs(curFolder)
if err != nil {
return
}
if !strings.HasPrefix(folderName, curFolder) {
err = fmt.Errorf("remote directory must be relative to current")
return
}
c.FilesToTransfer[i].FolderRemote = strings.TrimPrefix(folderName, curFolder)
c.FilesToTransfer[i].FolderRemote = filepath.ToSlash(c.FilesToTransfer[i].FolderRemote)
c.FilesToTransfer[i].FolderRemote = strings.TrimPrefix(c.FilesToTransfer[i].FolderRemote, "/")
if c.FilesToTransfer[i].FolderRemote == "" {
c.FilesToTransfer[i].FolderRemote = "."
}
}
log.Debugf("file %d info: %+v", i, c.FilesToTransfer[i])
}
log.Debugf("file info: %+v", c.FileInfo)
}
// create channel for quitting
// quit with c.quit <- true
@ -249,20 +272,20 @@ func (c *Client) transfer(options TransferOptions) (err error) {
func (c *Client) sendOverRedis() (err error) {
go func() {
bar := progressbar.NewOptions(
int(c.FileInfo.Size),
int(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size),
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetBytes(int(c.FileInfo.Size)),
progressbar.OptionSetBytes(int(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size)),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(1/60*time.Second),
)
c.f, err = os.Open(c.FileInfo.Name)
c.CurrentFile, err = os.Open(c.FilesToTransfer[c.FilesToTransferCurrentNum].Name)
if err != nil {
panic(err)
}
location := int64(0)
for {
buf := make([]byte, 4096*128)
n, errRead := c.f.Read(buf)
n, errRead := c.CurrentFile.Read(buf)
bar.Add(n)
chunk := Chunk{
Bytes: buf[:n],
@ -307,35 +330,28 @@ func (c *Client) processMessage(m Message) (err error) {
c.Step1ChannelSecured = true
}
case "fileinfo":
err = json.Unmarshal(m.Bytes, &c.FileInfo)
if err != nil {
return
}
c.log.Debug(c.FileInfo)
if c.FileInfo.Folder != "." {
err = os.MkdirAll(c.FileInfo.Folder, os.ModeDir)
if err != nil {
return
}
}
c.f, err = os.Create(path.Join(c.FileInfo.Folder, c.FileInfo.Name))
if err != nil {
return
}
err = c.f.Truncate(c.FileInfo.Size)
err = json.Unmarshal(m.Bytes, &c.FilesToTransfer)
if err != nil {
return
}
c.log.Debug(c.FilesToTransfer)
c.Step2FileInfoTransfered = true
case "recipientready":
c.Step3RecipientReady = true
var remoteFile RemoteFileRequest
err = json.Unmarshal(m.Bytes, &remoteFile)
if err != nil {
return
}
c.FilesToTransferCurrentNum = remoteFile.FilesToTransferCurrentNum
c.CurrentFileChunks = remoteFile.CurrentFileChunks
c.Step3RecipientRequestFile = true
case "chunk":
var chunk Chunk
err = json.Unmarshal(m.Bytes, &chunk)
if err != nil {
return
}
_, err = c.f.WriteAt(chunk.Bytes, chunk.Location)
_, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location)
c.log.Debug("writing chunk", chunk.Location)
case "datachannel-offer":
offer := util.Decode(m.Message)
@ -373,18 +389,7 @@ func (c *Client) processMessage(m Message) (err error) {
func (c *Client) updateState() (err error) {
if c.IsSender && c.Step1ChannelSecured && !c.Step2FileInfoTransfered {
var fstats os.FileInfo
fstats, err = os.Stat(path.Join(c.Folder, c.Filename))
if err != nil {
return
}
c.FileInfo = FileInfo{
Name: c.Filename,
Folder: c.Folder,
Size: fstats.Size(),
ModTime: fstats.ModTime(),
}
b, _ := json.Marshal(c.FileInfo)
b, _ := json.Marshal(c.FilesToTransfer)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "fileinfo",
Bytes: b,
@ -394,15 +399,80 @@ func (c *Client) updateState() (err error) {
}
c.Step2FileInfoTransfered = true
}
if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientReady {
// TODO: recipient requests the chunk locations (if empty, then should receive all chunks)
if !c.IsSender && c.Step2FileInfoTransfered && !c.Step3RecipientRequestFile {
// find the next file to transfer and send that number
// if the files are the same size, then look for missing chunks
finished := true
for i, fileInfo := range c.FilesToTransfer {
if i < c.FilesToTransferCurrentNum {
continue
}
fileHash, errHash := utils.HashFile(path.Join(fileInfo.FolderRemote, fileInfo.Name))
if errHash != nil || !bytes.Equal(fileHash, fileInfo.Hash) {
finished = false
c.FilesToTransferCurrentNum = i
break
}
// TODO: print out something about this file already existing
}
if finished {
// TODO: do the last finishing stuff
log.Debug("finished")
os.Exit(1)
}
// start initiating the process to receive a new file
log.Debugf("working on file %d", c.FilesToTransferCurrentNum)
// setup folder for new file
if c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote != "." {
err = os.MkdirAll(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, os.ModeDir)
if err != nil {
return
}
}
pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderRemote, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name)
// check if file should be overwritten, or simply fixed with missing chunks
overwrite := true
fstats, errStats := os.Stat(pathToFile)
if errStats == nil {
if fstats.Size() == c.FilesToTransfer[c.FilesToTransferCurrentNum].Size {
// just request missing chunks
c.CurrentFileChunks = MissingChunks(pathToFile, fstats.Size(), 4096)
log.Debugf("found %d missing chunks", len(c.CurrentFileChunks))
overwrite = false
}
} else {
c.CurrentFileChunks = []int64{}
}
if overwrite {
c.CurrentFile, err = os.Create(pathToFile)
if err != nil {
return
}
err = c.CurrentFile.Truncate(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size)
} else {
c.CurrentFile, err = os.Open(pathToFile)
}
if err != nil {
return
}
// recipient requests the file and chunks (if empty, then should receive all chunks)
bRequest, _ := json.Marshal(RemoteFileRequest{
CurrentFileChunks: c.CurrentFileChunks,
FilesToTransferCurrentNum: c.FilesToTransferCurrentNum,
})
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "recipientready",
Type: "recipientready",
Bytes: bRequest,
}.String()).Err()
if err != nil {
return
}
c.Step3RecipientReady = true
c.Step3RecipientRequestFile = true
// start receiving data
go func() {
err = c.dataChannelReceive()
@ -411,9 +481,9 @@ func (c *Client) updateState() (err error) {
}
}()
}
if c.IsSender && c.Step3RecipientReady && !c.Step4SendingData {
if c.IsSender && c.Step3RecipientRequestFile && !c.Step4FileTransfer {
c.log.Debug("start sending data!")
c.Step4SendingData = true
c.Step4FileTransfer = true
go func() {
err = c.dataChannelSend()
if err != nil {
@ -466,7 +536,7 @@ func (c *Client) dataChannelReceive() (err error) {
timer := time.Now()
var mutex = &sync.Mutex{}
piecesToDo := make(map[int64]bool)
for i := int64(0); i < c.FileInfo.Size; i += 4096 {
for i := int64(0); i < c.FilesToTransfer[c.FilesToTransferCurrentNum].Size; i += 4096 {
piecesToDo[i] = true
}
// Register message handling
@ -476,8 +546,9 @@ func (c *Client) dataChannelReceive() (err error) {
case *datachannel.PayloadString:
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
if bytes.Equal(p.Data, []byte("done")) {
c.f.Close()
c.CurrentFile.Close()
c.log.Debug(time.Since(timer))
// TODO: handle done, close file, reset things and check for missing blocks
}
case *datachannel.PayloadBinary:
if !startTime {
@ -491,7 +562,7 @@ func (c *Client) dataChannelReceive() (err error) {
}
var n int
mutex.Lock()
n, err = c.f.WriteAt(chunk.Bytes, chunk.Location)
n, err = c.CurrentFile.WriteAt(chunk.Bytes, chunk.Location)
mutex.Unlock()
if err != nil {
panic(err)
@ -563,9 +634,10 @@ func (c *Client) dataChannelSend() (err error) {
fmt.Printf("Data channel '%s'-'%d' open\n", c.dataChannel.Label, c.dataChannel.ID)
time.Sleep(100 * time.Microsecond)
c.log.Debug("sending file")
const BufferSize = 4096
file, err := os.Open("test.txt")
pathToFile := path.Join(c.FilesToTransfer[c.FilesToTransferCurrentNum].FolderSource, c.FilesToTransfer[c.FilesToTransferCurrentNum].Name)
c.log.Debugf("sending '%s'", pathToFile)
file, err := os.Open(pathToFile)
if err != nil {
c.log.Debug(err)
return
@ -636,3 +708,36 @@ func (c *Client) dataChannelSend() (err error) {
return
}
// MissingChunks returns the positions of missing chunks.
// If file doesn't exist, it returns an empty chunk list (all chunks).
// If the file size is not the same as requested, it returns an empty chunk list (all chunks).
func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) {
fstat, err := os.Stat(fname)
if fstat.Size() != fsize {
return
}
f, err := os.Open(fname)
if err != nil {
return
}
defer f.Close()
buffer := make([]byte, chunkSize)
emptyBuffer := make([]byte, chunkSize)
chunkNum := 0
chunks = make([]int64, int64(math.Ceil(float64(fsize)/float64(chunkSize))))
var currentLocation int64
for {
bytesread, err := f.Read(buffer)
if err != nil {
break
}
if bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread]) {
chunks[chunkNum] = currentLocation
}
currentLocation += int64(bytesread)
}
return
}