0
0
Fork 0
mirror of https://github.com/schollz/croc.git synced 2025-10-11 13:21:00 +02:00

add finishing

This commit is contained in:
Zack Scholl 2019-04-04 09:29:42 -07:00
parent 21b81fc67c
commit b6e3679c54

View file

@ -49,7 +49,7 @@ type Client struct {
Step2FileInfoTransfered bool
Step3RecipientRequestFile bool
Step4FileTransfer bool
Step5RecipientCheckFile bool // TODO: Step5 should close files and reset things
Step5CloseChannels bool // TODO: Step5 should close files and reset things
// send / receive information of all files
FilesToTransfer []FileInfo
@ -258,6 +258,12 @@ func (c *Client) transfer(options TransferOptions) (err error) {
if err != nil {
return
}
if m.Type == "finished" {
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "finished",
}.String()).Err()
return err
}
err = c.processMessage(m)
if err != nil {
return
@ -378,12 +384,20 @@ func (c *Client) processMessage(m Message) (err error) {
answer := util.Decode(m.Message)
// Apply the answer as the remote description
err = c.peerConnection.SetRemoteDescription(answer)
case "finished-transfer":
c.Step3RecipientRequestFile = false
case "close-sender":
c.peerConnection.Close()
c.peerConnection = nil
c.dataChannel = nil
c.Step4FileTransfer = false
c.Step3RecipientRequestFile = false
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "thanks",
Type: "close-recipient",
}.String()).Err()
case "close-recipient":
c.peerConnection.Close()
c.peerConnection = nil
c.Step4FileTransfer = false
c.Step3RecipientRequestFile = false
}
if err != nil {
return
@ -424,7 +438,12 @@ func (c *Client) updateState() (err error) {
if finished {
// TODO: do the last finishing stuff
log.Debug("finished")
os.Exit(1)
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "finished",
}.String()).Err()
if err != nil {
panic(err)
}
}
// start initiating the process to receive a new file
@ -454,13 +473,14 @@ func (c *Client) updateState() (err error) {
c.CurrentFileChunks = []int64{}
}
if overwrite {
os.Remove(pathToFile)
c.CurrentFile, err = os.Create(pathToFile)
if err != nil {
return
}
err = c.CurrentFile.Truncate(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size)
} else {
c.CurrentFile, err = os.Open(pathToFile)
c.CurrentFile, err = os.OpenFile(pathToFile, os.O_RDWR|os.O_CREATE, 0755)
}
if err != nil {
return
@ -528,7 +548,7 @@ func (c *Client) dataChannelReceive() (err error) {
sendBytes := make(chan []byte, 1024)
// Register channel opening handling
d.OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label, d.ID)
log.Debugf("Data channel '%s'-'%d' open", d.Label, d.ID)
for {
data := <-sendBytes
err := d.Send(datachannel.PayloadBinary{Data: data})
@ -546,6 +566,13 @@ func (c *Client) dataChannelReceive() (err error) {
piecesToDo[i] = true
}
// Register message handling
bar := progressbar.NewOptions64(
c.FilesToTransfer[c.FilesToTransferCurrentNum].Size,
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetBytes64(c.FilesToTransfer[c.FilesToTransferCurrentNum].Size),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(1/60*time.Second),
)
d.OnMessage(func(payload datachannel.Payload) {
switch p := payload.(type) {
@ -555,10 +582,8 @@ func (c *Client) dataChannelReceive() (err error) {
c.CurrentFile.Close()
c.log.Debug(time.Since(timer))
c.log.Debug("telling transfer is over")
c.Step4FileTransfer = false
c.Step3RecipientRequestFile = false
err = c.redisdb.Publish(c.nameOutChannel, Message{
Type: "finished-transfer",
Type: "close-sender",
}.String()).Err()
if err != nil {
panic(err)
@ -581,27 +606,8 @@ func (c *Client) dataChannelReceive() (err error) {
if err != nil {
panic(err)
}
c.log.Debugf("wrote %d bytes to %d", n, chunk.Location)
mutex.Lock()
piecesToDo[chunk.Location] = false
mutex.Unlock()
go func() {
numToDo := 0
thingsToDo := make([]int64, len(piecesToDo))
mutex.Lock()
for k := range piecesToDo {
if piecesToDo[k] {
thingsToDo[numToDo] = k
numToDo++
}
}
mutex.Unlock()
thingsToDo = thingsToDo[:numToDo]
c.log.Debug("num to do: ", len(thingsToDo))
if len(thingsToDo) < 10 {
c.log.Debug(thingsToDo)
}
}()
// c.log.Debugf("wrote %d bytes to %d", n, chunk.Location)
bar.Add(n)
default:
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
}
@ -658,6 +664,15 @@ func (c *Client) dataChannelSend() (err error) {
}
defer file.Close()
fstats, _ := file.Stat()
bar := progressbar.NewOptions64(
fstats.Size(),
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetBytes64(fstats.Size()),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(1/60*time.Second),
)
buffer := make([]byte, BufferSize)
var location int64
for {
@ -675,7 +690,7 @@ func (c *Client) dataChannelSend() (err error) {
}
dataToSend, _ := json.Marshal(mSend)
c.log.Debugf("sending %d bytes at %d", bytesread, location)
bar.Add(bytesread)
err = c.dataChannel.Send(datachannel.PayloadBinary{Data: dataToSend})
if err != nil {
c.log.Debug("Could not send on data channel", err.Error())
@ -684,11 +699,24 @@ func (c *Client) dataChannelSend() (err error) {
location += int64(bytesread)
time.Sleep(100 * time.Microsecond)
}
c.log.Debug("sending done signal")
err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")})
if err != nil {
c.log.Debug(err)
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Debug("Recovered in f", r)
}
}()
for {
c.log.Debug("sending done signal")
if c.dataChannel == nil {
return
}
err = c.dataChannel.Send(datachannel.PayloadString{Data: []byte("done")})
if err != nil {
c.log.Debug(err)
}
time.Sleep(1 * time.Second)
}
}()
})
// Register the OnMessage to handle incoming messages
@ -753,5 +781,10 @@ func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) {
}
currentLocation += int64(bytesread)
}
if chunkNum == 0 {
chunks = []int64{}
} else {
chunks = chunks[:chunkNum]
}
return
}