0
0
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 21:30:16 +02:00

sender and recipient share blocks

This commit is contained in:
Zack Scholl 2018-10-09 06:01:21 -07:00
parent 66a37ae5b2
commit 61d57ad0af
2 changed files with 151 additions and 93 deletions

View file

@ -1,6 +1,7 @@
package recipient
import (
"bufio"
"bytes"
"encoding/json"
"errors"
@ -52,6 +53,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
var otherIP string
var tcpConnections []comm.Comm
dataChan := make(chan []byte, 1024*1024)
blocks := []string{}
useWebsockets := true
switch forceSend {
@ -129,7 +131,18 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
return err
}
log.Debugf("%x\n", sessionKey)
c.WriteMessage(websocket.BinaryMessage, []byte("ready"))
// append the previous blocks if there was progress previously
file, errCrocProgress := os.Open("croc-progress")
if errCrocProgress == nil {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
blocks = append(blocks, strings.TrimSpace(scanner.Text()))
}
file.Close()
}
blocksBytes, _ := json.Marshal(blocks)
c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...))
case 3:
spin.Stop()
@ -189,14 +202,23 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
}
// await file
f, err := os.Create(fstats.SentName)
if err != nil {
log.Error(err)
return err
}
if err = f.Truncate(fstats.Size); err != nil {
log.Error(err)
return err
var f *os.File
if utils.Exists(fstats.SentName) {
f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644)
if err != nil {
log.Error(err)
return err
}
} else {
f, err = os.Create(fstats.SentName)
if err != nil {
log.Error(err)
return err
}
if err = f.Truncate(fstats.Size); err != nil {
log.Error(err)
return err
}
}
bytesWritten := 0
fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP)
@ -209,6 +231,19 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
finished := make(chan bool)
go func(finished chan bool, dataChan chan []byte) (err error) {
os.Remove("croc-progress2")
fProgress, errCreate := os.Create("croc-progress2")
if errCreate != nil {
panic(errCreate)
}
defer fProgress.Close()
blocksWritten := 0.0
blocksToWrite := float64(fstats.Size)
if useWebsockets {
blocksToWrite = blocksToWrite / float64(models.WEBSOCKET_BUFFER_SIZE/8)
} else {
blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks))
}
for {
message := <-dataChan
// do decryption
@ -231,8 +266,9 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
pieces := bytes.SplitN(decrypted, []byte("-"), 2)
decrypted = pieces[1]
locationToWrite, _ = strconv.Atoi(string(pieces[0]))
log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite)
fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite))
}
// do decompression
if fstats.IsCompressed && !fstats.IsDir {
decrypted = compress.Decompress(decrypted)
@ -251,13 +287,15 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
}
if err != nil {
log.Error(err)
return err
}
// update the bytes written
bytesWritten += n
blocksWritten += 1.0
// update the progress bar
bar.Add(n)
if int64(bytesWritten) == fstats.Size {
if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite {
log.Debug("finished")
break
}

View file

@ -8,6 +8,7 @@ import (
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -52,6 +53,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
var otherIP string
var startTransfer time.Time
var tcpConnections []comm.Comm
blocksToSkip := make(map[int64]struct{})
type DataChan struct {
b []byte
@ -169,87 +171,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
}
fileReady <- nil
// start streaming encryption/compression
go func(dataChan chan DataChan) {
var buffer []byte
if useWebsockets {
buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8)
} else {
buffer = make([]byte, models.TCP_BUFFER_SIZE/2)
}
currentPostition := int64(0)
for {
bytesread, err := f.Read(buffer)
if bytesread > 0 {
// do compression
var compressedBytes []byte
if useCompression && !fstats.IsDir {
compressedBytes = compress.Compress(buffer[:bytesread])
} else {
compressedBytes = buffer[:bytesread]
}
// if using TCP, prepend the location to write the data to in the resulting file
if !useWebsockets {
compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...)
}
// do encryption
enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption)
encBytes, err := json.Marshal(enc)
if err != nil {
dataChan <- DataChan{
b: nil,
bytesRead: 0,
err: err,
}
return
}
select {
case dataChan <- DataChan{
b: encBytes,
bytesRead: bytesread,
err: nil,
}:
default:
log.Debug("blocked")
// no message sent
// block
dataChan <- DataChan{
b: encBytes,
bytesRead: bytesread,
err: nil,
}
}
currentPostition += int64(bytesread)
}
if err != nil {
if err != io.EOF {
log.Error(err)
}
break
}
}
// finish
log.Debug("sending magic")
dataChan <- DataChan{
b: []byte("magic"),
bytesRead: 0,
err: nil,
}
if !useWebsockets {
log.Debug("sending extra magic to %d others", len(tcpPorts)-1)
for i := 0; i < len(tcpPorts)-1; i++ {
log.Debug("sending magic")
dataChan <- DataChan{
b: []byte("magic"),
bytesRead: 0,
err: nil,
}
}
}
}(dataChan)
}()
// send pake data
@ -275,9 +196,108 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
spin.Start()
case 3:
log.Debugf("[%d] recipient declares readiness for file info", step)
if !bytes.Equal(message, []byte("ready")) {
if !bytes.HasPrefix(message, []byte("ready")) {
return errors.New("recipient refused file")
}
// determine if any blocks were sent to skip
var blocks []string
errBlocks := json.Unmarshal(message[5:], &blocks)
if errBlocks == nil {
log.Debugf("found blocks: %+v", blocks)
for _, block := range blocks {
blockInt64, _ := strconv.Atoi(block)
blocksToSkip[int64(blockInt64)] = struct{}{}
}
}
// start streaming encryption/compression
go func(dataChan chan DataChan) {
var buffer []byte
if useWebsockets {
buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8)
} else {
buffer = make([]byte, models.TCP_BUFFER_SIZE/2)
}
currentPostition := int64(0)
for {
bytesread, err := f.Read(buffer)
if bytesread > 0 {
if _, ok := blocksToSkip[currentPostition]; ok {
log.Debugf("skipping the sending of block %d", currentPostition)
currentPostition += int64(bytesread)
continue
}
// do compression
var compressedBytes []byte
if useCompression && !fstats.IsDir {
compressedBytes = compress.Compress(buffer[:bytesread])
} else {
compressedBytes = buffer[:bytesread]
}
// if using TCP, prepend the location to write the data to in the resulting file
if !useWebsockets {
compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...)
}
// do encryption
enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption)
encBytes, err := json.Marshal(enc)
if err != nil {
dataChan <- DataChan{
b: nil,
bytesRead: 0,
err: err,
}
return
}
select {
case dataChan <- DataChan{
b: encBytes,
bytesRead: bytesread,
err: nil,
}:
default:
log.Debug("blocked")
// no message sent
// block
dataChan <- DataChan{
b: encBytes,
bytesRead: bytesread,
err: nil,
}
}
currentPostition += int64(bytesread)
}
if err != nil {
if err != io.EOF {
log.Error(err)
}
break
}
}
// finish
log.Debug("sending magic")
dataChan <- DataChan{
b: []byte("magic"),
bytesRead: 0,
err: nil,
}
if !useWebsockets {
log.Debug("sending extra magic to %d others", len(tcpPorts)-1)
for i := 0; i < len(tcpPorts)-1; i++ {
log.Debug("sending magic")
dataChan <- DataChan{
b: []byte("magic"),
bytesRead: 0,
err: nil,
}
}
}
}(dataChan)
err = <-fileReady // block until file is ready
if err != nil {
return err