From fdfa7f209d7b48a95a2cc7ef182720dadb9d890f Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Wed, 26 Sep 2018 14:31:45 -0700 Subject: [PATCH] revert to da2b22f --- src/comm/comm.go | 47 ++++++++++++--------------------- src/compress/compress.go | 2 +- src/models/constants.go | 2 +- src/recipient/recipient.go | 22 +++++----------- src/sender/sender.go | 19 +++----------- src/tcp/tcp.go | 53 ++++++++++++++------------------------ 6 files changed, 48 insertions(+), 97 deletions(-) diff --git a/src/comm/comm.go b/src/comm/comm.go index 1db603ea..b68e7ecc 100644 --- a/src/comm/comm.go +++ b/src/comm/comm.go @@ -1,7 +1,6 @@ package comm import ( - "bufio" "bytes" "fmt" "net" @@ -13,52 +12,40 @@ import ( // Comm is some basic TCP communication type Comm struct { connection net.Conn - writer *bufio.Writer - reader *bufio.Reader } // New returns a new comm -func New(n net.Conn) *Comm { - c := new(Comm) - c.connection = n - c.connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) - c.connection.SetDeadline(time.Now().Add(3 * time.Hour)) - c.connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) - c.writer = bufio.NewWriter(n) - // c.connection = bufio.NewReader(n) - return c +func New(c net.Conn) Comm { + c.SetReadDeadline(time.Now().Add(3 * time.Hour)) + c.SetDeadline(time.Now().Add(3 * time.Hour)) + c.SetWriteDeadline(time.Now().Add(3 * time.Hour)) + return Comm{c} } -// Connection returns the net.TCPConn connection -func (c *Comm) Connection() net.Conn { +// Connection returns the net.Conn connection +func (c Comm) Connection() net.Conn { return c.connection } // Close closes the connection -func (c *Comm) Close() { +func (c Comm) Close() { c.connection.Close() } -func (c *Comm) Write(b []byte) (int, error) { - c.connection.Write([]byte(fmt.Sprintf("%0.6d", len(b)))) + +func (c Comm) Write(b []byte) (int, error) { + c.connection.Write([]byte(fmt.Sprintf("%0.5d", len(b)))) n, err := c.connection.Write(b) if n != len(b) { err = fmt.Errorf("wanted to write %d but wrote %d", n, len(b)) } - // if err == nil { - // c.writer.Flush() - // } // log.Printf("wanted to write %d but wrote %d", n, len(b)) return n, err } -// func (c *Comm) Flush() { -// c.connection.Flush() -// } - -func (c *Comm) Read() (buf []byte, numBytes int, bs []byte, err error) { - // read until we get 6 bytes - tmp := make([]byte, 6) +func (c Comm) Read() (buf []byte, numBytes int, bs []byte, err error) { + // read until we get 5 bytes + tmp := make([]byte, 5) n, err := c.connection.Read(tmp) if err != nil { return @@ -72,7 +59,7 @@ func (c *Comm) Read() (buf []byte, numBytes int, bs []byte, err error) { for { // see if we have enough bytes bs = bytes.Trim(bs, "\x00") - if len(bs) == 6 { + if len(bs) == 5 { break } n, err := c.connection.Read(tmp) @@ -112,13 +99,13 @@ func (c *Comm) Read() (buf []byte, numBytes int, bs []byte, err error) { } // Send a message -func (c *Comm) Send(message string) (err error) { +func (c Comm) Send(message string) (err error) { _, err = c.Write([]byte(message)) return } // Receive a message -func (c *Comm) Receive() (s string, err error) { +func (c Comm) Receive() (s string, err error) { b, _, _, err := c.Read() s = string(b) return diff --git a/src/compress/compress.go b/src/compress/compress.go index 86aded68..bd1a1b77 100644 --- a/src/compress/compress.go +++ b/src/compress/compress.go @@ -9,7 +9,7 @@ import ( // Compress returns a compressed byte slice. func Compress(src []byte) []byte { compressedData := new(bytes.Buffer) - compress(src, compressedData, 1) + compress(src, compressedData, 9) return compressedData.Bytes() } diff --git a/src/models/constants.go b/src/models/constants.go index 1f228a1c..0c23ef98 100644 --- a/src/models/constants.go +++ b/src/models/constants.go @@ -1,4 +1,4 @@ package models const WEBSOCKET_BUFFER_SIZE = 1024 * 1024 * 32 -const TCP_BUFFER_SIZE = 993280 +const TCP_BUFFER_SIZE = 1024 * 64 diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 7b515a81..5d283b28 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -3,6 +3,7 @@ package recipient import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net" @@ -13,7 +14,6 @@ import ( "time" humanize "github.com/dustin/go-humanize" - "github.com/pkg/errors" log "github.com/cihub/seelog" "github.com/gorilla/websocket" @@ -50,7 +50,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var transferTime time.Duration var hash256 []byte var otherIP string - var tcpConnections []*comm.Comm + var tcpConnections []comm.Comm dataChan := make(chan []byte, 1024*1024) useWebsockets := true @@ -176,7 +176,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") - tcpConnections = make([]*comm.Comm, len(tcpPorts)) + tcpConnections = make([]comm.Comm, len(tcpPorts)) for i, tcpPort := range tcpPorts { tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) if err != nil { @@ -300,7 +300,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var wg sync.WaitGroup wg.Add(len(tcpConnections)) for i := range tcpConnections { - go func(wg *sync.WaitGroup, tcpConnection *comm.Comm) { + go func(wg *sync.WaitGroup, tcpConnection comm.Comm) { defer wg.Done() for { // read from TCP connection @@ -405,26 +405,16 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } } -func connectToTCPServer(room string, address string) (com *comm.Comm, err error) { +func connectToTCPServer(room string, address string) (com comm.Comm, err error) { log.Debugf("recipient connecting to %s", address) - // rAddr, err := net.ResolveTCPAddr("tcp", address) - // if err != nil { - // return - // } - // connection, err := net.DialTCP("tcp", nil, rAddr) - // if err != nil { - // err = errors.Wrap(err, "bad connection to tcp") - // return - // } - // connection.SetNoDelay(true) connection, err := net.Dial("tcp", address) if err != nil { - err = errors.Wrap(err, "bad connection to tcp") return } connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) connection.SetDeadline(time.Now().Add(3 * time.Hour)) connection.SetWriteDeadline(time.Now().Add(3 * time.Hour)) + com = comm.New(connection) log.Debug("waiting for server contact") ok, err := com.Receive() diff --git a/src/sender/sender.go b/src/sender/sender.go index 6ea30fd6..caed83d3 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -51,7 +51,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var fileHash []byte var otherIP string var startTransfer time.Time - var tcpConnections []*comm.Comm + var tcpConnections []comm.Comm type DataChan struct { b []byte @@ -302,7 +302,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") - tcpConnections = make([]*comm.Comm, len(tcpPorts)) + tcpConnections = make([]comm.Comm, len(tcpPorts)) for i, tcpPort := range tcpPorts { log.Debug(tcpPort) tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) @@ -346,7 +346,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var wg sync.WaitGroup wg.Add(len(tcpConnections)) for i := range tcpConnections { - go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan, tcpConnection *comm.Comm) { + go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan, tcpConnection comm.Comm) { defer wg.Done() for data := range dataChan { if data.err != nil { @@ -407,20 +407,9 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } -func connectToTCPServer(room string, address string) (com *comm.Comm, err error) { - // rAddr, err := net.ResolveTCPAddr("tcp", address) - // if err != nil { - // return - // } - // connection, err := net.DialTCP("tcp", nil, rAddr) - // if err != nil { - // err = errors.Wrap(err, "bad connection to tcp") - // return - // } - // connection.SetNoDelay(true) +func connectToTCPServer(room string, address string) (com comm.Comm, err error) { connection, err := net.Dial("tcp", address) if err != nil { - err = errors.Wrap(err, "bad connection to tcp") return } connection.SetReadDeadline(time.Now().Add(3 * time.Hour)) diff --git a/src/tcp/tcp.go b/src/tcp/tcp.go index d22cef68..4177f77e 100644 --- a/src/tcp/tcp.go +++ b/src/tcp/tcp.go @@ -13,7 +13,7 @@ import ( ) type roomInfo struct { - receiver *comm.Comm + receiver comm.Comm opened time.Time } @@ -41,22 +41,13 @@ func Run(debugLevel, port string) { func run(port string) (err error) { log.Debugf("starting TCP server on " + port) - // rAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:"+port) - // if err != nil { - // panic(err) - // } - // server, err := net.ListenTCP("tcp", rAddr) - // if err != nil { - // return errors.Wrap(err, "Error listening on :"+port) - // } - server, err := net.Listen("tcp", ":"+port) + server, err := net.Listen("tcp", "0.0.0.0:"+port) if err != nil { return errors.Wrap(err, "Error listening on :"+port) } defer server.Close() // spawn a new goroutine whenever a client connects for { - // connection, err := server.AcceptTCP() connection, err := server.Accept() if err != nil { return errors.Wrap(err, "problem accepting connection") @@ -71,7 +62,7 @@ func run(port string) (err error) { } } -func clientCommuncation(port string, c *comm.Comm) (err error) { +func clientCommuncation(port string, c comm.Comm) (err error) { // send ok to tell client they are connected err = c.Send("ok") if err != nil { @@ -107,7 +98,7 @@ func clientCommuncation(port string, c *comm.Comm) (err error) { wg.Add(1) // start piping - go func(com1, com2 *comm.Comm, wg *sync.WaitGroup) { + go func(com1, com2 comm.Comm, wg *sync.WaitGroup) { log.Debug("starting pipes") pipe(com1.Connection(), com2.Connection()) wg.Done() @@ -133,14 +124,13 @@ func clientCommuncation(port string, c *comm.Comm) (err error) { // Read()s from the socket to the channel. func chanFromConn(conn net.Conn) chan []byte { c := make(chan []byte) - // reader := bufio.NewReader(conn) go func() { + b := make([]byte, models.TCP_BUFFER_SIZE) + for { - b := make([]byte, models.TCP_BUFFER_SIZE) n, err := conn.Read(b) if n > 0 { - // c <- b[:n] res := make([]byte, n) // Copy the buffer so it doesn't get changed while read by the recipient. copy(res, b[:n]) @@ -160,26 +150,21 @@ func chanFromConn(conn net.Conn) chan []byte { // transfers data from one to the other. func pipe(conn1 net.Conn, conn2 net.Conn) { chan1 := chanFromConn(conn1) - // chan2 := chanFromConn(conn2) - // writer1 := bufio.NewWriter(conn1) - // writer2 := bufio.NewWriter(conn2) + chan2 := chanFromConn(conn2) for { - b1 := <-chan1 - if b1 == nil { - return + select { + case b1 := <-chan1: + if b1 == nil { + return + } + conn2.Write(b1) + + case b2 := <-chan2: + if b2 == nil { + return + } + conn1.Write(b2) } - conn2.Write(b1) - // writer2.Write(b1) - // writer2.Flush() - - // case b2 := <-chan2: - // if b2 == nil { - // return - // } - // writer1.Write(b2) - // writer1.Flush() - // } - } }