From 22cd0afd11e0c8630c9a3f82f2f770d9d076beff Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Thu, 28 Jun 2018 18:47:34 -0700 Subject: [PATCH] websockets working --- main.go | 16 +++++- src/api.go | 2 +- src/models.go | 8 ++- src/sender.go | 73 +++++++++++++++++++++++++++ src/server.go | 133 +++++++++++++++++++++++++++++++++++--------------- 5 files changed, 187 insertions(+), 45 deletions(-) create mode 100644 src/sender.go diff --git a/main.go b/main.go index e21119ad..bddde771 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,22 @@ package main -import croc "github.com/schollz/croc/src" +import ( + "flag" + + croc "github.com/schollz/croc/src" +) func main() { + var err error + role := flag.Int("role", 0, "role number") + flag.Parse() + c := croc.Init() - err := c.Relay() + if *role == 0 { + err = c.Relay() + } else if *role == 1 { + err = c.Send("foo") + } if err != nil { panic(err) } diff --git a/src/api.go b/src/api.go index 35893ae3..c1fb7312 100644 --- a/src/api.go +++ b/src/api.go @@ -44,7 +44,7 @@ func (c *Croc) Relay() error { // Send will take an existing file or folder and send it through the croc relay func (c *Croc) Send(fname string) (err error) { - + err = c.send(fname) return } diff --git a/src/models.go b/src/models.go index fa455eac..9dfb5f9c 100644 --- a/src/models.go +++ b/src/models.go @@ -36,6 +36,9 @@ type channelData struct { // Ports returns which TCP ports to connect to Ports []string `json:"ports"` + // UUID is sent out only to one person at a time + UUID string `json:"uuid"` + // Private // isopen determine whether or not the channel has been opened isopen bool @@ -74,7 +77,8 @@ type payload struct { Curve string `json:"curve"` // Update set to true when updating - Update bool `json:"update"` + Update bool `json:"update"` + UUID string `json:"uuid"` // State is the state information to be updated State map[string][]byte `json:"state"` @@ -84,7 +88,7 @@ type payload struct { func newChannelData(name string) (cd *channelData) { cd = new(channelData) - cd.Name = name + cd.Channel = name cd.State = make(map[string][]byte) for _, state := range availableStates { cd.State[state] = []byte{} diff --git a/src/sender.go b/src/sender.go new file mode 100644 index 00000000..0a54be6b --- /dev/null +++ b/src/sender.go @@ -0,0 +1,73 @@ +package croc + +import ( + "net/url" + "os" + "os/signal" + "time" + + log "github.com/cihub/seelog" + "github.com/gorilla/websocket" +) + +func (c *Croc) send(fname string) (err error) { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + u := url.URL{Scheme: "ws", Host: "localhost:8003", Path: "/"} + log.Debugf("connecting to %s", u.String()) + + ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Error("dial:", err) + return + } + defer ws.Close() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + var cd channelData + err := ws.ReadJSON(&cd) + if err != nil { + log.Debugf("sender read error:", err) + return + } + log.Debugf("recv: %+v", cd) + } + }() + + // initialize + err = ws.WriteJSON(payload{ + Open: true, + }) + if err != nil { + log.Errorf("problem opening: %s", err.Error()) + return + } + + for { + select { + case <-done: + return + case <-interrupt: + log.Debugf("interrupt") + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + errWrite := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if errWrite != nil { + log.Debugf("write close:", err) + return + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } + return +} diff --git a/src/server.go b/src/server.go index 1ed66b3e..0c58e5ca 100644 --- a/src/server.go +++ b/src/server.go @@ -2,20 +2,48 @@ package croc import ( "crypto/elliptic" - "encoding/json" - "fmt" + "net/http" "time" log "github.com/cihub/seelog" "github.com/frankenbeanies/uuid4" - "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" "github.com/pkg/errors" ) -func (c *Croc) updateChannel(p payloadChannel) (r response, err error) { +func (c *Croc) startServer(tcpPorts []string, port string) (err error) { + // start cleanup on dangling channels + go c.channelCleanup() + + var upgrader = websocket.Upgrader{} // use default options + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error("upgrade:", err) + return + } + defer ws.Close() + for { + var p payload + err := ws.ReadJSON(&p) + if err != nil { + log.Debugf("read:", err) + break + } + err = c.processPayload(ws, p) + if err != nil { + log.Warn("problem processing payload %+v: %s", err.Error()) + } + } + }) + log.Debugf("listening on port %s", c.ServerPort) + err = http.ListenAndServe(":"+c.ServerPort, nil) + return +} + +func (c *Croc) updateChannel(p payload) (err error) { c.rs.Lock() defer c.rs.Unlock() - r.Success = true // determine if channel is invalid if _, ok := c.rs.channel[p.Channel]; !ok { @@ -30,13 +58,6 @@ func (c *Croc) updateChannel(p payloadChannel) (r response, err error) { return } - // check if the action is to close the channel - if p.Close { - delete(c.rs.channel, p.Channel) - r.Message = "deleted " + p.Channel - return - } - // assign each key provided assignedKeys := []string{} for key := range p.State { @@ -50,17 +71,13 @@ func (c *Croc) updateChannel(p payloadChannel) (r response, err error) { } } - // return the current state - r.Data = c.rs.channel[p.Channel] - - r.Message = fmt.Sprintf("assigned %d keys: %v", len(assignedKeys), assignedKeys) + log.Debugf("assigned %d keys: %v", len(assignedKeys), assignedKeys) return } -func (c *Croc) joinChannel(p payloadChannel) (r response, err error) { +func (c *Croc) joinChannel(ws *websocket.Conn, p payload) (channel string, err error) { c.rs.Lock() defer c.rs.Unlock() - r.Success = true // determine if sender or recipient if p.Role != 0 && p.Role != 1 { @@ -81,50 +98,86 @@ func (c *Croc) joinChannel(p payloadChannel) (r response, err error) { return } } - r.Channel = p.Channel - if _, ok := c.rs.channel[r.Channel]; !ok { - c.rs.channel[r.Channel] = newChannelData(r.Channel) + if _, ok := c.rs.channel[p.Channel]; !ok { + c.rs.channel[p.Channel] = newChannelData(p.Channel) } + channel = p.Channel // assign UUID for the role in the channel - c.rs.channel[r.Channel].uuids[p.Role] = uuid4.New().String() - r.UUID = c.rs.channel[r.Channel].uuids[p.Role] - log.Debugf("(%s) %s has joined as role %d", r.Channel, r.UUID, p.Role) + c.rs.channel[p.Channel].uuids[p.Role] = uuid4.New().String() + log.Debugf("(%s) %s has joined as role %d", p.Channel, c.rs.channel[p.Channel].uuids[p.Role], p.Role) + // send Channel+UUID back to the current person + err = ws.WriteJSON(channelData{ + Channel: p.Channel, + UUID: c.rs.channel[p.Channel].uuids[p.Role], + }) + if err != nil { + return + } // if channel is not open, set initial parameters - if !c.rs.channel[r.Channel].isopen { - c.rs.channel[r.Channel].isopen = true - c.rs.channel[r.Channel].Ports = tcpPorts - c.rs.channel[r.Channel].startTime = time.Now() + if !c.rs.channel[p.Channel].isopen { + c.rs.channel[p.Channel].isopen = true + c.rs.channel[p.Channel].Ports = c.TcpPorts + c.rs.channel[p.Channel].startTime = time.Now() switch curve := p.Curve; curve { case "p224": - c.rs.channel[r.Channel].curve = elliptic.P224() + c.rs.channel[p.Channel].curve = elliptic.P224() case "p256": - c.rs.channel[r.Channel].curve = elliptic.P256() + c.rs.channel[p.Channel].curve = elliptic.P256() case "p384": - c.rs.channel[r.Channel].curve = elliptic.P384() + c.rs.channel[p.Channel].curve = elliptic.P384() case "p521": - c.rs.channel[r.Channel].curve = elliptic.P521() + c.rs.channel[p.Channel].curve = elliptic.P521() default: // TODO: // add SIEC p.Curve = "p256" - c.rs.channel[r.Channel].curve = elliptic.P256() + c.rs.channel[p.Channel].curve = elliptic.P256() } - log.Debugf("(%s) using curve '%s'", r.Channel, p.Curve) - c.rs.channel[r.Channel].State["curve"] = []byte(p.Curve) + log.Debugf("(%s) using curve '%s'", p.Channel, p.Curve) + c.rs.channel[p.Channel].State["curve"] = []byte(p.Curve) } + c.rs.channel[p.Channel].websocketConn[p.Role] = ws - r.Message = fmt.Sprintf("assigned role %d in channel '%s'", p.Role, r.Channel) + log.Debugf("assigned role %d in channel '%s'", p.Role, p.Channel) return } -func (c *Croc) startServer(tcpPorts []string, port string) (err error) { - // start cleanup on dangling channels - go c.channelCleanup() +func (c *Croc) processPayload(ws *websocket.Conn, p payload) (err error) { + if p.Close { + c.rs.Lock() + delete(c.rs.channel, p.Channel) + c.rs.Unlock() + return + } + + channel := p.Channel + if p.Open { + channel, err = c.joinChannel(ws, p) + } else if p.Update { + // update + } // TODO: - // insert websockets here + // relay state logic here + + // send out the data to both sender + receiver each time + c.rs.Lock() + if _, ok := c.rs.channel[channel]; ok { + for role, wsConn := range c.rs.channel[channel].websocketConn { + if wsConn == nil { + continue + } + log.Debugf("writing latest data %+v to %d", c.rs.channel[channel], role) + err = wsConn.WriteJSON(c.rs.channel[channel]) + if err != nil { + log.Debugf("problem writing to role %d: %s", role, err.Error()) + } + } + } + c.rs.Lock() + return } func (c *Croc) channelCleanup() {