summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-15 04:55:20 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-15 04:55:20 +0200
commit50b3bb1acf202f7dc858618b61b9d0a608176796 (patch)
treec5ec2c0c832ff5b28f6a4a340c65ec9945f05358
parentinterface shutdown almost clean now (diff)
pipe client connections can now be killed on shutdown
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go70
1 files changed, 46 insertions, 24 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 46a83cc..2a6a200 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -38,49 +38,71 @@ import (
"net"
"strings"
"sync"
+ "time"
)
-func (srv *Server) pipeHandle(conn net.Conn) {
+func (srv *Server) pipeRead(dec Decoder, updateCH chan<- *UpdateFull, errorCH chan<- error) {
+ for {
+ update, err := dec.Decode()
+ if err != nil {
+ if err == io.EOF {
+ close(updateCH)
+ return
+ }
+ opErr, isOpErr := err.(*net.OpError)
+ if isOpErr && opErr.Temporary() {
+ continue
+ }
+ errorCH <- err
+ return
+ }
+ updateCH <- update
+ }
+}
+
+func (srv *Server) pipeHandle(conn net.Conn, quit <-chan bool) {
defer conn.Close()
+ conn.SetReadDeadline(time.Now().Add(10 * time.Second))
dec, err := NewStatefulDecoder(conn)
if err != nil {
s5l.Printf("srv|pipe: read(init) failed: %v\n", err)
return
}
+ conn.SetReadDeadline(time.Time{})
slug := dec.Slug()
s5l.Printf("srv|pipe: new connection: %s\n", slug)
defer s5l.Printf("srv|pipe(%s): connection closed\n", slug)
+ updateCH := make(chan *UpdateFull)
+ errorCH := make(chan error)
+ go srv.pipeRead(dec, updateCH, errorCH)
+
for {
- update, err := dec.Decode()
- if err != nil {
- if err == io.EOF {
- break
+ select {
+ case update := <-updateCH:
+ if update == nil {
+ return
}
- // TODO: send NACK?
-
- opErr, isOpErr := err.(*net.OpError)
- if isOpErr && opErr.Temporary() {
- s5l.Printf("srv|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err)
- } else {
- s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err)
- break
+ if err = srv.Ingest(update); err != nil {
+ s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err)
+ // TODO: send NACK
+ return
}
+ // TODO: send ACK
+ case err := <-errorCH:
+ s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err)
+ return
+ case <-quit:
+ return
}
-
- if err = srv.Ingest(update); err != nil {
- s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err)
- // TODO: send NACK?
- break
- }
- // TODO: send ACK?
}
}
func (srv *Server) pipeRun() {
- var wgClients sync.WaitGroup
+ wgClients := &sync.WaitGroup{}
+ quit := make(chan bool)
for {
conn, err := srv.interfaces.pipe.Accept()
if err != nil {
@@ -88,17 +110,17 @@ func (srv *Server) pipeRun() {
break
}
s5l.Printf("srv|pipe: accept() failed: %v", err)
- // ignore
+ // TODO: ignore ... or is this permanent?
continue
}
wgClients.Add(1)
go func() {
defer wgClients.Done()
- srv.pipeHandle(conn)
+ srv.pipeHandle(conn, quit)
}()
}
s5l.Println("srv|pipe: interface stopped listening")
- // TODO: tell all clients to disconnect!!!
+ close(quit)
wgClients.Wait()
}