diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-07-15 04:55:20 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-07-15 04:55:20 +0200 |
commit | 50b3bb1acf202f7dc858618b61b9d0a608176796 (patch) | |
tree | c5ec2c0c832ff5b28f6a4a340c65ec9945f05358 /src/hub | |
parent | interface shutdown almost clean now (diff) |
pipe client connections can now be killed on shutdown
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 70 |
1 files changed, 46 insertions, 24 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 46a83cc..2a6a200 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -38,49 +38,71 @@ import ( "net" "strings" "sync" + "time" ) -func (srv *Server) pipeHandle(conn net.Conn) { +func (srv *Server) pipeRead(dec Decoder, updateCH chan<- *UpdateFull, errorCH chan<- error) { + for { + update, err := dec.Decode() + if err != nil { + if err == io.EOF { + close(updateCH) + return + } + opErr, isOpErr := err.(*net.OpError) + if isOpErr && opErr.Temporary() { + continue + } + errorCH <- err + return + } + updateCH <- update + } +} + +func (srv *Server) pipeHandle(conn net.Conn, quit <-chan bool) { defer conn.Close() + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) dec, err := NewStatefulDecoder(conn) if err != nil { s5l.Printf("srv|pipe: read(init) failed: %v\n", err) return } + conn.SetReadDeadline(time.Time{}) slug := dec.Slug() s5l.Printf("srv|pipe: new connection: %s\n", slug) defer s5l.Printf("srv|pipe(%s): connection closed\n", slug) + updateCH := make(chan *UpdateFull) + errorCH := make(chan error) + go srv.pipeRead(dec, updateCH, errorCH) + for { - update, err := dec.Decode() - if err != nil { - if err == io.EOF { - break + select { + case update := <-updateCH: + if update == nil { + return } - // TODO: send NACK? - - opErr, isOpErr := err.(*net.OpError) - if isOpErr && opErr.Temporary() { - s5l.Printf("srv|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) - } else { - s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) - break + if err = srv.Ingest(update); err != nil { + s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) + // TODO: send NACK + return } + // TODO: send ACK + case err := <-errorCH: + s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) + return + case <-quit: + return } - - if err = srv.Ingest(update); err != nil { - s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) - // TODO: send NACK? - break - } - // TODO: send ACK? } } func (srv *Server) pipeRun() { - var wgClients sync.WaitGroup + wgClients := &sync.WaitGroup{} + quit := make(chan bool) for { conn, err := srv.interfaces.pipe.Accept() if err != nil { @@ -88,17 +110,17 @@ func (srv *Server) pipeRun() { break } s5l.Printf("srv|pipe: accept() failed: %v", err) - // ignore + // TODO: ignore ... or is this permanent? continue } wgClients.Add(1) go func() { defer wgClients.Done() - srv.pipeHandle(conn) + srv.pipeHandle(conn, quit) }() } s5l.Println("srv|pipe: interface stopped listening") - // TODO: tell all clients to disconnect!!! + close(quit) wgClients.Wait() } |