diff options
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 7 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 42 |
2 files changed, 34 insertions, 15 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e8778df..6742b6f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -56,8 +56,9 @@ type Server struct { wgWorker *sync.WaitGroup ingestChan chan ingestToken interfaces struct { - web *http.Server + pipe net.Listener pipegram net.PacketConn + web *http.Server } } @@ -137,11 +138,11 @@ func (srv *Server) shutdownInterfaces() (errors int) { ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) c := make(chan error) go func() { c <- srv.webStop(ctx) }() - //go func() { c <- srv.pipeStop(ctx) }() // TODO: add this as soon as pipe interface can be stopped + go func() { c <- srv.pipeStop(ctx) }() go func() { c <- srv.pipegramStop(ctx) }() errors = 0 - for i := 0; i < 2; i++ { // TODO: set limit to 3 when the above has been enabled + for i := 0; i < 3; i++ { if err := <-c; err != nil { s5l.Printf("srv: interface shutdown failed failed: %v", err) errors++ diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 20e3085..c291513 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -33,8 +33,10 @@ package sfive import ( + "context" "io" "net" + "strings" ) func (srv *Server) pipeHandle(conn net.Conn) { @@ -76,20 +78,13 @@ func (srv *Server) pipeHandle(conn net.Conn) { } } -func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { - ln, err := net.Listen("unix", cfg.ListenPath) - if err != nil { - s5l.Printf("srv|pipe: listen() failed: %v", err) - return - } - defer ln.Close() - - s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pipe: interface stopped") - +func (srv *Server) pipeRun() { for { - conn, err := ln.Accept() + conn, err := srv.interfaces.pipe.Accept() if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } s5l.Printf("srv|pipe: accept() failed: %v", err) // ignore continue @@ -97,3 +92,26 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { go srv.pipeHandle(conn) } } + +func (srv *Server) pipeStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to pipeRun and pipeStop... + if srv.interfaces.pipe == nil { + return nil + } + s5l.Printf("srv|pipe: shutting down") + return srv.interfaces.pipe.Close() + // TODO: also close alle open client file descriptors +} + +func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { + if srv.interfaces.pipe, err = net.Listen("unix", cfg.ListenPath); err != nil { + s5l.Printf("srv|pipe: listen() failed: %v", err) + return + } + + s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pipe: interface stopped") + + srv.pipeRun() + return +} |