mirror of
https://github.com/schollz/croc.git
synced 2025-10-11 13:21:00 +02:00
merge sender and recipient into croc
This commit is contained in:
parent
0bbf2841ca
commit
8734394bb8
4 changed files with 2 additions and 1026 deletions
|
@ -5,9 +5,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/schollz/croc/src/logger"
|
||||
"github.com/schollz/croc/src/recipient"
|
||||
"github.com/schollz/croc/src/relay"
|
||||
"github.com/schollz/croc/src/sender"
|
||||
"github.com/schollz/croc/src/zipper"
|
||||
)
|
||||
|
||||
|
@ -78,8 +76,6 @@ func Init(debug bool) (c *Croc) {
|
|||
|
||||
func SetDebugLevel(debugLevel string) {
|
||||
logger.SetLogLevel(debugLevel)
|
||||
sender.DebugLevel = debugLevel
|
||||
recipient.DebugLevel = debugLevel
|
||||
relay.DebugLevel = debugLevel
|
||||
zipper.DebugLevel = debugLevel
|
||||
}
|
||||
|
|
|
@ -11,9 +11,7 @@ import (
|
|||
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/schollz/croc/src/recipient"
|
||||
"github.com/schollz/croc/src/relay"
|
||||
"github.com/schollz/croc/src/sender"
|
||||
"github.com/schollz/peerdiscovery"
|
||||
"github.com/schollz/utils"
|
||||
)
|
||||
|
@ -161,9 +159,9 @@ func (c *Croc) sendReceive(address, websocketPort string, tcpPorts []string, fna
|
|||
}
|
||||
|
||||
if isSender {
|
||||
go sender.Send(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption)
|
||||
go c.startSender(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption)
|
||||
} else {
|
||||
go recipient.Receive(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout)
|
||||
go c.startRecipient(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout)
|
||||
}
|
||||
|
||||
for {
|
||||
|
|
|
@ -1,527 +0,0 @@
|
|||
package recipient
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/schollz/croc/src/comm"
|
||||
"github.com/schollz/croc/src/compress"
|
||||
"github.com/schollz/croc/src/crypt"
|
||||
"github.com/schollz/croc/src/logger"
|
||||
"github.com/schollz/croc/src/models"
|
||||
"github.com/schollz/croc/src/utils"
|
||||
"github.com/schollz/croc/src/zipper"
|
||||
"github.com/schollz/pake"
|
||||
"github.com/schollz/progressbar/v2"
|
||||
"github.com/schollz/spinner"
|
||||
"github.com/tscholl2/siec"
|
||||
)
|
||||
|
||||
var DebugLevel string
|
||||
|
||||
// Receive is the async operation to receive a file
|
||||
func Receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) {
|
||||
logger.SetLogLevel(DebugLevel)
|
||||
err := receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout)
|
||||
if err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "websocket: close 100") {
|
||||
fmt.Fprintf(os.Stderr, "\n"+err.Error())
|
||||
}
|
||||
}
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) {
|
||||
var fstats models.FileStats
|
||||
var sessionKey []byte
|
||||
var transferTime time.Duration
|
||||
var hash256 []byte
|
||||
var otherIP string
|
||||
var progressFile string
|
||||
var resumeFile bool
|
||||
var tcpConnections []comm.Comm
|
||||
dataChan := make(chan []byte, 1024*1024)
|
||||
isConnectedIfUsingTCP := make(chan bool)
|
||||
blocks := []string{}
|
||||
|
||||
useWebsockets := true
|
||||
switch forceSend {
|
||||
case 0:
|
||||
if !isLocal {
|
||||
useWebsockets = false
|
||||
}
|
||||
case 1:
|
||||
useWebsockets = true
|
||||
case 2:
|
||||
useWebsockets = false
|
||||
}
|
||||
|
||||
// start a spinner
|
||||
spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond)
|
||||
spin.Writer = os.Stderr
|
||||
spin.Suffix = " performing PAKE..."
|
||||
spin.Start()
|
||||
|
||||
// pick an elliptic curve
|
||||
curve := siec.SIEC255()
|
||||
// both parties should have a weak key
|
||||
pw := []byte(codephrase)
|
||||
|
||||
// initialize recipient Q ("1" indicates recipient)
|
||||
Q, err := pake.Init(pw, 1, curve, 1*time.Millisecond)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
step := 0
|
||||
for {
|
||||
messageType, message, err := c.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if messageType == websocket.PongMessage || messageType == websocket.PingMessage {
|
||||
continue
|
||||
}
|
||||
if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) {
|
||||
return errors.New("\rinterrupted by other party")
|
||||
}
|
||||
|
||||
log.Debugf("got %d: %s", messageType, message)
|
||||
switch step {
|
||||
case 0:
|
||||
// sender has initiated, sends their ip address
|
||||
otherIP = string(message)
|
||||
log.Debugf("sender IP: %s", otherIP)
|
||||
|
||||
// recipient begins by sending address
|
||||
ip := ""
|
||||
if isLocal {
|
||||
ip = utils.LocalIP()
|
||||
} else {
|
||||
ip, _ = utils.PublicIP()
|
||||
}
|
||||
c.WriteMessage(websocket.BinaryMessage, []byte(ip))
|
||||
case 1:
|
||||
|
||||
// Q receives u
|
||||
log.Debugf("[%d] Q computes k, sends H(k), v back to P", step)
|
||||
if err := Q.Update(message); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Q has the session key now, but we will still check if its valid
|
||||
sessionKey, err = Q.SessionKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("%x\n", sessionKey)
|
||||
|
||||
// initialize TCP connections if using (possible, but unlikely, race condition)
|
||||
go func() {
|
||||
if !useWebsockets {
|
||||
log.Debugf("connecting to server")
|
||||
tcpConnections = make([]comm.Comm, len(tcpPorts))
|
||||
for i, tcpPort := range tcpPorts {
|
||||
log.Debugf("connecting to %d", i)
|
||||
tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
log.Debugf("fully connected")
|
||||
}
|
||||
isConnectedIfUsingTCP <- true
|
||||
}()
|
||||
|
||||
c.WriteMessage(websocket.BinaryMessage, Q.Bytes())
|
||||
case 2:
|
||||
log.Debugf("[%d] Q recieves H(k) from P", step)
|
||||
// check if everything is still kosher with our computed session key
|
||||
if err := Q.Update(message); err != nil {
|
||||
return err
|
||||
}
|
||||
c.WriteMessage(websocket.BinaryMessage, []byte("ready"))
|
||||
case 3:
|
||||
spin.Stop()
|
||||
|
||||
// unmarshal the file info
|
||||
log.Debugf("[%d] recieve file info", step)
|
||||
// do decryption on the file stats
|
||||
enc, err := crypt.FromBytes(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
decryptedFileData, err := enc.Decrypt(sessionKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = json.Unmarshal(decryptedFileData, &fstats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("got file stats: %+v", fstats)
|
||||
|
||||
// determine if the file is resuming or not
|
||||
progressFile = fmt.Sprintf("%s.progress", fstats.SentName)
|
||||
overwritingOrReceiving := "Receiving"
|
||||
if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) {
|
||||
overwritingOrReceiving = "Overwriting"
|
||||
if utils.Exists(progressFile) {
|
||||
overwritingOrReceiving = "Resume receiving"
|
||||
resumeFile = true
|
||||
}
|
||||
}
|
||||
|
||||
// send blocks
|
||||
if resumeFile {
|
||||
fileWithBlocks, _ := os.Open(progressFile)
|
||||
scanner := bufio.NewScanner(fileWithBlocks)
|
||||
for scanner.Scan() {
|
||||
blocks = append(blocks, strings.TrimSpace(scanner.Text()))
|
||||
}
|
||||
fileWithBlocks.Close()
|
||||
}
|
||||
blocksBytes, _ := json.Marshal(blocks)
|
||||
// encrypt the block data and send
|
||||
encblockBytes := crypt.Encrypt(blocksBytes, sessionKey)
|
||||
c.WriteMessage(websocket.BinaryMessage, encblockBytes.Bytes())
|
||||
|
||||
// prompt user about the file
|
||||
fileOrFolder := "file"
|
||||
if fstats.IsDir {
|
||||
fileOrFolder = "folder"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "\r%s %s (%s) into: %s\n",
|
||||
overwritingOrReceiving,
|
||||
fileOrFolder,
|
||||
humanize.Bytes(uint64(fstats.Size)),
|
||||
fstats.Name,
|
||||
)
|
||||
if !noPrompt {
|
||||
if "y" != utils.GetInput("ok? (y/N): ") {
|
||||
fmt.Fprintf(os.Stderr, "cancelling request")
|
||||
c.WriteMessage(websocket.BinaryMessage, []byte("no"))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// await file
|
||||
// erase file if overwriting
|
||||
if overwritingOrReceiving == "Overwriting" {
|
||||
os.Remove(fstats.SentName)
|
||||
}
|
||||
var f *os.File
|
||||
if utils.Exists(fstats.SentName) && resumeFile {
|
||||
if !useWebsockets {
|
||||
f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644)
|
||||
} else {
|
||||
f, err = os.OpenFile(fstats.SentName, os.O_APPEND|os.O_WRONLY, 0644)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
f, err = os.Create(fstats.SentName)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
if !useWebsockets {
|
||||
if err = f.Truncate(fstats.Size); err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blockSize := 0
|
||||
if useWebsockets {
|
||||
blockSize = models.WEBSOCKET_BUFFER_SIZE / 8
|
||||
} else {
|
||||
blockSize = models.TCP_BUFFER_SIZE / 2
|
||||
}
|
||||
|
||||
// start the ui for pgoress
|
||||
bytesWritten := 0
|
||||
fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP)
|
||||
bar := progressbar.NewOptions(
|
||||
int(fstats.Size),
|
||||
progressbar.OptionSetRenderBlankState(true),
|
||||
progressbar.OptionSetBytes(int(fstats.Size)),
|
||||
progressbar.OptionSetWriter(os.Stderr),
|
||||
)
|
||||
bar.Add((len(blocks) * blockSize))
|
||||
finished := make(chan bool)
|
||||
|
||||
go func(finished chan bool, dataChan chan []byte) (err error) {
|
||||
// remove previous progress
|
||||
var fProgress *os.File
|
||||
var progressErr error
|
||||
if resumeFile {
|
||||
fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND|os.O_WRONLY, 0644)
|
||||
bytesWritten = len(blocks) * blockSize
|
||||
} else {
|
||||
os.Remove(progressFile)
|
||||
fProgress, progressErr = os.Create(progressFile)
|
||||
}
|
||||
if progressErr != nil {
|
||||
panic(progressErr)
|
||||
}
|
||||
defer fProgress.Close()
|
||||
|
||||
blocksWritten := 0.0
|
||||
blocksToWrite := float64(fstats.Size)
|
||||
if useWebsockets {
|
||||
blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks))
|
||||
} else {
|
||||
blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks))
|
||||
}
|
||||
for {
|
||||
message := <-dataChan
|
||||
// do decryption
|
||||
var enc crypt.Encryption
|
||||
err = json.Unmarshal(message, &enc)
|
||||
if err != nil {
|
||||
// log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs)
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// get location if TCP
|
||||
var locationToWrite int
|
||||
if !useWebsockets {
|
||||
pieces := bytes.SplitN(decrypted, []byte("-"), 2)
|
||||
decrypted = pieces[1]
|
||||
locationToWrite, _ = strconv.Atoi(string(pieces[0]))
|
||||
}
|
||||
|
||||
// do decompression
|
||||
if fstats.IsCompressed && !fstats.IsDir {
|
||||
decrypted = compress.Decompress(decrypted)
|
||||
}
|
||||
|
||||
var n int
|
||||
if !useWebsockets {
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
n, err = f.WriteAt(decrypted, int64(locationToWrite))
|
||||
fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite))
|
||||
log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, locationToWrite, blocksWritten, blocksToWrite)
|
||||
} else {
|
||||
// write to file
|
||||
n, err = f.Write(decrypted)
|
||||
log.Debugf("wrote %d bytes to location %d (%2.0f/%2.0f)", n, bytesWritten, blocksWritten, blocksToWrite)
|
||||
fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten))
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// update the bytes written
|
||||
bytesWritten += n
|
||||
blocksWritten += 1.0
|
||||
// update the progress bar
|
||||
bar.Add(n)
|
||||
if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite {
|
||||
log.Debug("finished", int64(bytesWritten), fstats.Size, blocksWritten, blocksToWrite)
|
||||
break
|
||||
}
|
||||
}
|
||||
finished <- true
|
||||
return
|
||||
}(finished, dataChan)
|
||||
|
||||
log.Debug("telling sender i'm ready")
|
||||
c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...))
|
||||
|
||||
startTime := time.Now()
|
||||
if useWebsockets {
|
||||
for {
|
||||
var messageType int
|
||||
// read from websockets
|
||||
messageType, message, err = c.ReadMessage()
|
||||
if messageType != websocket.BinaryMessage {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
if bytes.Equal(message, []byte("magic")) {
|
||||
log.Debug("got magic")
|
||||
break
|
||||
}
|
||||
dataChan <- message
|
||||
// select {
|
||||
// case dataChan <- message:
|
||||
// default:
|
||||
// log.Debug("blocked")
|
||||
// // no message sent
|
||||
// // block
|
||||
// dataChan <- message
|
||||
// }
|
||||
}
|
||||
} else {
|
||||
_ = <-isConnectedIfUsingTCP
|
||||
log.Debugf("starting listening with tcp with %d connections", len(tcpConnections))
|
||||
// using TCP
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(tcpConnections))
|
||||
for i := range tcpConnections {
|
||||
defer func(i int) {
|
||||
log.Debugf("closing connection %d", i)
|
||||
tcpConnections[i].Close()
|
||||
}(i)
|
||||
go func(wg *sync.WaitGroup, j int) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
log.Debugf("waiting to read on %d", j)
|
||||
// read from TCP connection
|
||||
message, _, _, err := tcpConnections[j].Read()
|
||||
// log.Debugf("message: %s", message)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if bytes.Equal(message, []byte("magic")) {
|
||||
log.Debugf("%d got magic, leaving", j)
|
||||
return
|
||||
}
|
||||
dataChan <- message
|
||||
}
|
||||
}(&wg, i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
_ = <-finished
|
||||
log.Debug("telling sender i'm done")
|
||||
c.WriteMessage(websocket.BinaryMessage, []byte("done"))
|
||||
// we are finished
|
||||
transferTime = time.Since(startTime)
|
||||
|
||||
// close file
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// finish bar
|
||||
bar.Finish()
|
||||
|
||||
// check hash
|
||||
hash256, err = utils.HashFile(fstats.SentName)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
// tell the sender the hash so they can quit
|
||||
c.WriteMessage(websocket.BinaryMessage, append([]byte("hash:"), hash256...))
|
||||
case 4:
|
||||
// receive the hash from the sender so we can check it and quit
|
||||
log.Debugf("got hash: %x", message)
|
||||
if bytes.Equal(hash256, message) {
|
||||
// open directory
|
||||
if fstats.IsDir {
|
||||
err = zipper.UnzipFile(fstats.SentName, ".")
|
||||
if DebugLevel != "debug" {
|
||||
os.Remove(fstats.SentName)
|
||||
}
|
||||
} else {
|
||||
err = nil
|
||||
}
|
||||
if err == nil {
|
||||
if useStdout && !fstats.IsDir {
|
||||
var bFile []byte
|
||||
bFile, err = ioutil.ReadFile(fstats.SentName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
os.Stdout.Write(bFile)
|
||||
os.Remove(fstats.SentName)
|
||||
}
|
||||
transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds()
|
||||
transferType := "MB/s"
|
||||
if transferRate < 1 {
|
||||
transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds()
|
||||
transferType = "kB/s"
|
||||
}
|
||||
folderOrFile := "file"
|
||||
if fstats.IsDir {
|
||||
folderOrFile = "folder"
|
||||
}
|
||||
if useStdout {
|
||||
fstats.Name = "stdout"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType)
|
||||
os.Remove(progressFile)
|
||||
}
|
||||
return err
|
||||
} else {
|
||||
if DebugLevel != "debug" {
|
||||
log.Debug("removing corrupted file")
|
||||
os.Remove(fstats.SentName)
|
||||
}
|
||||
return errors.New("file corrupted")
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown step")
|
||||
}
|
||||
step++
|
||||
}
|
||||
}
|
||||
|
||||
func connectToTCPServer(room string, address string) (com comm.Comm, err error) {
|
||||
log.Debugf("recipient connecting to %s", address)
|
||||
connection, err := net.Dial("tcp", address)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
connection.SetReadDeadline(time.Now().Add(3 * time.Hour))
|
||||
connection.SetDeadline(time.Now().Add(3 * time.Hour))
|
||||
connection.SetWriteDeadline(time.Now().Add(3 * time.Hour))
|
||||
|
||||
com = comm.New(connection)
|
||||
log.Debug("waiting for server contact")
|
||||
ok, err := com.Receive()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Debugf("[%s] server says: %s", address, ok)
|
||||
|
||||
err = com.Send(room)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ok, err = com.Receive()
|
||||
log.Debugf("[%s] server says: %s", address, ok)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if ok != "recipient" {
|
||||
err = errors.New(ok)
|
||||
}
|
||||
return
|
||||
}
|
|
@ -1,491 +0,0 @@
|
|||
package sender
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/schollz/croc/src/comm"
|
||||
"github.com/schollz/croc/src/compress"
|
||||
"github.com/schollz/croc/src/crypt"
|
||||
"github.com/schollz/croc/src/logger"
|
||||
"github.com/schollz/croc/src/models"
|
||||
"github.com/schollz/croc/src/utils"
|
||||
"github.com/schollz/croc/src/zipper"
|
||||
"github.com/schollz/pake"
|
||||
"github.com/schollz/progressbar/v2"
|
||||
"github.com/schollz/spinner"
|
||||
"github.com/tscholl2/siec"
|
||||
)
|
||||
|
||||
var DebugLevel string
|
||||
|
||||
// Send is the async call to send data
|
||||
func Send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) {
|
||||
logger.SetLogLevel(DebugLevel)
|
||||
log.Debugf("sending %s", fname)
|
||||
err := send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption)
|
||||
if err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "websocket: close 100") {
|
||||
fmt.Fprintf(os.Stderr, "\n"+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) {
|
||||
var f *os.File
|
||||
defer f.Close() // ignore the error if it wasn't opened :(
|
||||
var fstats models.FileStats
|
||||
var fileHash []byte
|
||||
var otherIP string
|
||||
var startTransfer time.Time
|
||||
var tcpConnections []comm.Comm
|
||||
blocksToSkip := make(map[int64]struct{})
|
||||
isConnectedIfUsingTCP := make(chan bool)
|
||||
|
||||
type DataChan struct {
|
||||
b []byte
|
||||
currentPostition int64
|
||||
bytesRead int
|
||||
err error
|
||||
}
|
||||
dataChan := make(chan DataChan, 1024*1024)
|
||||
defer close(dataChan)
|
||||
|
||||
useWebsockets := true
|
||||
switch forceSend {
|
||||
case 0:
|
||||
if !isLocal {
|
||||
useWebsockets = false
|
||||
}
|
||||
case 1:
|
||||
useWebsockets = true
|
||||
case 2:
|
||||
useWebsockets = false
|
||||
}
|
||||
|
||||
fileReady := make(chan error)
|
||||
|
||||
// normalize the file name
|
||||
fname, err = filepath.Abs(fname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, filename := filepath.Split(fname)
|
||||
|
||||
// get ready to generate session key
|
||||
var sessionKey []byte
|
||||
|
||||
// start a spinner
|
||||
spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond)
|
||||
spin.Writer = os.Stderr
|
||||
|
||||
// pick an elliptic curve
|
||||
curve := siec.SIEC255()
|
||||
// both parties should have a weak key
|
||||
pw := []byte(codephrase)
|
||||
// initialize sender P ("0" indicates sender)
|
||||
P, err := pake.Init(pw, 0, curve, 1*time.Millisecond)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
step := 0
|
||||
for {
|
||||
messageType, message, errRead := c.ReadMessage()
|
||||
if errRead != nil {
|
||||
return errRead
|
||||
}
|
||||
if messageType == websocket.PongMessage || messageType == websocket.PingMessage {
|
||||
continue
|
||||
}
|
||||
if messageType == websocket.TextMessage && bytes.Equal(message, []byte("interrupt")) {
|
||||
return errors.New("\rinterrupted by other party")
|
||||
}
|
||||
log.Debugf("got %d: %s", messageType, message)
|
||||
switch step {
|
||||
case 0:
|
||||
// sender initiates communication
|
||||
ip := ""
|
||||
if isLocal {
|
||||
ip = utils.LocalIP()
|
||||
} else {
|
||||
ip, _ = utils.PublicIP()
|
||||
}
|
||||
// send my IP address
|
||||
c.WriteMessage(websocket.BinaryMessage, []byte(ip))
|
||||
case 1:
|
||||
// first receive the IP address from the sender
|
||||
otherIP = string(message)
|
||||
log.Debugf("recipient IP: %s", otherIP)
|
||||
|
||||
go func() {
|
||||
// recipient might want file! start gathering information about file
|
||||
fstat, err := os.Stat(fname)
|
||||
if err != nil {
|
||||
fileReady <- err
|
||||
return
|
||||
}
|
||||
fstats = models.FileStats{
|
||||
Name: filename,
|
||||
Size: fstat.Size(),
|
||||
ModTime: fstat.ModTime(),
|
||||
IsDir: fstat.IsDir(),
|
||||
SentName: fstat.Name(),
|
||||
IsCompressed: useCompression,
|
||||
IsEncrypted: useEncryption,
|
||||
}
|
||||
if fstats.IsDir {
|
||||
// zip the directory
|
||||
fstats.SentName, err = zipper.ZipFile(fname, true)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
fileReady <- err
|
||||
return
|
||||
}
|
||||
fname = fstats.SentName
|
||||
|
||||
fstat, err := os.Stat(fname)
|
||||
if err != nil {
|
||||
fileReady <- err
|
||||
return
|
||||
}
|
||||
// get new size
|
||||
fstats.Size = fstat.Size()
|
||||
}
|
||||
|
||||
// open the file
|
||||
f, err = os.Open(fname)
|
||||
if err != nil {
|
||||
fileReady <- err
|
||||
return
|
||||
}
|
||||
fileReady <- nil
|
||||
|
||||
}()
|
||||
|
||||
// send pake data
|
||||
log.Debugf("[%d] first, P sends u to Q", step)
|
||||
c.WriteMessage(websocket.BinaryMessage, P.Bytes())
|
||||
// start PAKE spinnner
|
||||
spin.Suffix = " performing PAKE..."
|
||||
spin.Start()
|
||||
case 2:
|
||||
// P recieves H(k),v from Q
|
||||
log.Debugf("[%d] P computes k, H(k), sends H(k) to Q", step)
|
||||
if err := P.Update(message); err != nil {
|
||||
return err
|
||||
}
|
||||
c.WriteMessage(websocket.BinaryMessage, P.Bytes())
|
||||
sessionKey, _ = P.SessionKey()
|
||||
// check(err)
|
||||
log.Debugf("%x\n", sessionKey)
|
||||
|
||||
// wait for readiness
|
||||
spin.Stop()
|
||||
spin.Suffix = " waiting for recipient ok..."
|
||||
spin.Start()
|
||||
case 3:
|
||||
log.Debugf("[%d] recipient declares readiness for file info", step)
|
||||
if !bytes.HasPrefix(message, []byte("ready")) {
|
||||
return errors.New("recipient refused file")
|
||||
}
|
||||
|
||||
// connect to TCP in background
|
||||
tcpConnections = make([]comm.Comm, len(tcpPorts))
|
||||
go func() {
|
||||
if !useWebsockets {
|
||||
log.Debugf("connecting to server")
|
||||
for i, tcpPort := range tcpPorts {
|
||||
log.Debugf("connecting to %s on connection %d", tcpPort, i)
|
||||
tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
isConnectedIfUsingTCP <- true
|
||||
}()
|
||||
|
||||
err = <-fileReady // block until file is ready
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fstatsBytes, err := json.Marshal(fstats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// encrypt the file meta data
|
||||
enc := crypt.Encrypt(fstatsBytes, sessionKey)
|
||||
// send the file meta data
|
||||
c.WriteMessage(websocket.BinaryMessage, enc.Bytes())
|
||||
case 4:
|
||||
log.Debugf("[%d] recipient declares gives blocks", step)
|
||||
// recipient sends blocks, and sender does not send anything back
|
||||
// determine if any blocks were sent to skip
|
||||
enc, err := crypt.FromBytes(message)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
decrypted, err := enc.Decrypt(sessionKey)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "could not decrypt blocks with session key")
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
var blocks []string
|
||||
errBlocks := json.Unmarshal(decrypted, &blocks)
|
||||
if errBlocks == nil {
|
||||
for _, block := range blocks {
|
||||
blockInt64, errBlock := strconv.Atoi(block)
|
||||
if errBlock == nil {
|
||||
blocksToSkip[int64(blockInt64)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Debugf("found blocks: %+v", blocksToSkip)
|
||||
|
||||
// start loading the file into memory
|
||||
// start streaming encryption/compression
|
||||
if fstats.IsDir {
|
||||
// remove file if zipped
|
||||
defer os.Remove(fstats.SentName)
|
||||
}
|
||||
go func(dataChan chan DataChan) {
|
||||
var buffer []byte
|
||||
if useWebsockets {
|
||||
buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8)
|
||||
} else {
|
||||
buffer = make([]byte, models.TCP_BUFFER_SIZE/2)
|
||||
}
|
||||
|
||||
currentPostition := int64(0)
|
||||
for {
|
||||
bytesread, err := f.Read(buffer)
|
||||
if bytesread > 0 {
|
||||
if _, ok := blocksToSkip[currentPostition]; ok {
|
||||
log.Debugf("skipping the sending of block %d", currentPostition)
|
||||
currentPostition += int64(bytesread)
|
||||
continue
|
||||
}
|
||||
|
||||
// do compression
|
||||
var compressedBytes []byte
|
||||
if useCompression && !fstats.IsDir {
|
||||
compressedBytes = compress.Compress(buffer[:bytesread])
|
||||
} else {
|
||||
compressedBytes = buffer[:bytesread]
|
||||
}
|
||||
|
||||
// if using TCP, prepend the location to write the data to in the resulting file
|
||||
if !useWebsockets {
|
||||
compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...)
|
||||
}
|
||||
|
||||
// do encryption
|
||||
enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption)
|
||||
encBytes, err := json.Marshal(enc)
|
||||
if err != nil {
|
||||
dataChan <- DataChan{
|
||||
b: nil,
|
||||
bytesRead: 0,
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
dataChan <- DataChan{
|
||||
b: encBytes,
|
||||
bytesRead: bytesread,
|
||||
err: nil,
|
||||
}
|
||||
currentPostition += int64(bytesread)
|
||||
}
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
log.Error(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
// finish
|
||||
log.Debug("sending magic")
|
||||
dataChan <- DataChan{
|
||||
b: []byte("magic"),
|
||||
bytesRead: 0,
|
||||
err: nil,
|
||||
}
|
||||
if !useWebsockets {
|
||||
log.Debug("sending extra magic to %d others", len(tcpPorts)-1)
|
||||
for i := 0; i < len(tcpPorts)-1; i++ {
|
||||
log.Debug("sending magic")
|
||||
dataChan <- DataChan{
|
||||
b: []byte("magic"),
|
||||
bytesRead: 0,
|
||||
err: nil,
|
||||
}
|
||||
}
|
||||
}
|
||||
}(dataChan)
|
||||
|
||||
case 5:
|
||||
spin.Stop()
|
||||
|
||||
log.Debugf("[%d] recipient declares readiness for file data", step)
|
||||
if !bytes.HasPrefix(message, []byte("ready")) {
|
||||
return errors.New("recipient refused file")
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP)
|
||||
// send file, compure hash simultaneously
|
||||
startTransfer = time.Now()
|
||||
|
||||
blockSize := 0
|
||||
if useWebsockets {
|
||||
blockSize = models.WEBSOCKET_BUFFER_SIZE / 8
|
||||
} else {
|
||||
blockSize = models.TCP_BUFFER_SIZE / 2
|
||||
}
|
||||
bar := progressbar.NewOptions(
|
||||
int(fstats.Size),
|
||||
progressbar.OptionSetRenderBlankState(true),
|
||||
progressbar.OptionSetBytes(int(fstats.Size)),
|
||||
progressbar.OptionSetWriter(os.Stderr),
|
||||
)
|
||||
bar.Add(blockSize * len(blocksToSkip))
|
||||
|
||||
if useWebsockets {
|
||||
for {
|
||||
data := <-dataChan
|
||||
if data.err != nil {
|
||||
return data.err
|
||||
}
|
||||
bar.Add(data.bytesRead)
|
||||
// write data to websockets
|
||||
err = c.WriteMessage(websocket.BinaryMessage, data.b)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "problem writing message")
|
||||
return err
|
||||
}
|
||||
if bytes.Equal(data.b, []byte("magic")) {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_ = <-isConnectedIfUsingTCP
|
||||
log.Debug("connected and ready to send on tcp")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(tcpConnections))
|
||||
for i := range tcpConnections {
|
||||
defer func(i int) {
|
||||
log.Debugf("closing connection %d", i)
|
||||
tcpConnections[i].Close()
|
||||
}(i)
|
||||
go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan) {
|
||||
defer wg.Done()
|
||||
for data := range dataChan {
|
||||
if data.err != nil {
|
||||
log.Error(data.err)
|
||||
return
|
||||
}
|
||||
bar.Add(data.bytesRead)
|
||||
// write data to tcp connection
|
||||
_, err = tcpConnections[i].Write(data.b)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "problem writing message")
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if bytes.Equal(data.b, []byte("magic")) {
|
||||
log.Debugf("%d got magic", i)
|
||||
return
|
||||
}
|
||||
}
|
||||
}(i, &wg, dataChan)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
bar.Finish()
|
||||
log.Debug("send hash to finish file")
|
||||
fileHash, err = utils.HashFile(fname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case 6:
|
||||
// recevied something, maybe the file hash
|
||||
transferTime := time.Since(startTransfer)
|
||||
if !bytes.HasPrefix(message, []byte("hash:")) {
|
||||
log.Debugf("%s", message)
|
||||
continue
|
||||
}
|
||||
c.WriteMessage(websocket.BinaryMessage, fileHash)
|
||||
message = bytes.TrimPrefix(message, []byte("hash:"))
|
||||
log.Debugf("[%d] determing whether it went ok", step)
|
||||
if bytes.Equal(message, fileHash) {
|
||||
log.Debug("file transfered successfully")
|
||||
transferRate := float64(fstats.Size) / 1000000.0 / transferTime.Seconds()
|
||||
transferType := "MB/s"
|
||||
if transferRate < 1 {
|
||||
transferRate = float64(fstats.Size) / 1000.0 / transferTime.Seconds()
|
||||
transferType = "kB/s"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "\nTransfer complete (%2.1f %s)", transferRate, transferType)
|
||||
return nil
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "\nTransfer corrupted")
|
||||
return errors.New("file not transfered succesfully")
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown step")
|
||||
}
|
||||
step++
|
||||
}
|
||||
}
|
||||
|
||||
func connectToTCPServer(room string, address string) (com comm.Comm, err error) {
|
||||
connection, err := net.DialTimeout("tcp", address, 3*time.Hour)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
connection.SetReadDeadline(time.Now().Add(3 * time.Hour))
|
||||
connection.SetDeadline(time.Now().Add(3 * time.Hour))
|
||||
connection.SetWriteDeadline(time.Now().Add(3 * time.Hour))
|
||||
|
||||
com = comm.New(connection)
|
||||
ok, err := com.Receive()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Debugf("server says: %s", ok)
|
||||
|
||||
err = com.Send(room)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ok, err = com.Receive()
|
||||
log.Debugf("server says: %s", ok)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if ok != "sender" {
|
||||
err = errors.New(ok)
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue