summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-13 22:52:53 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-13 22:52:53 +0200
commita4146ac1faf5937741c2eb09ca2cc35be3b7b1f9 (patch)
tree42fc924470ebdf073782b169b86e655145fcd496 /src/hub
parentmove pipgram into separate file (diff)
added (stupid) shutdown for pipe interface
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go7
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go42
2 files changed, 34 insertions, 15 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index e8778df..6742b6f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -56,8 +56,9 @@ type Server struct {
wgWorker *sync.WaitGroup
ingestChan chan ingestToken
interfaces struct {
- web *http.Server
+ pipe net.Listener
pipegram net.PacketConn
+ web *http.Server
}
}
@@ -137,11 +138,11 @@ func (srv *Server) shutdownInterfaces() (errors int) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
c := make(chan error)
go func() { c <- srv.webStop(ctx) }()
- //go func() { c <- srv.pipeStop(ctx) }() // TODO: add this as soon as pipe interface can be stopped
+ go func() { c <- srv.pipeStop(ctx) }()
go func() { c <- srv.pipegramStop(ctx) }()
errors = 0
- for i := 0; i < 2; i++ { // TODO: set limit to 3 when the above has been enabled
+ for i := 0; i < 3; i++ {
if err := <-c; err != nil {
s5l.Printf("srv: interface shutdown failed failed: %v", err)
errors++
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 20e3085..c291513 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -33,8 +33,10 @@
package sfive
import (
+ "context"
"io"
"net"
+ "strings"
)
func (srv *Server) pipeHandle(conn net.Conn) {
@@ -76,20 +78,13 @@ func (srv *Server) pipeHandle(conn net.Conn) {
}
}
-func (srv *Server) ServePipe(cfg PipeInterfaceConfig) {
- ln, err := net.Listen("unix", cfg.ListenPath)
- if err != nil {
- s5l.Printf("srv|pipe: listen() failed: %v", err)
- return
- }
- defer ln.Close()
-
- s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("srv|pipe: interface stopped")
-
+func (srv *Server) pipeRun() {
for {
- conn, err := ln.Accept()
+ conn, err := srv.interfaces.pipe.Accept()
if err != nil {
+ if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
+ return
+ }
s5l.Printf("srv|pipe: accept() failed: %v", err)
// ignore
continue
@@ -97,3 +92,26 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) {
go srv.pipeHandle(conn)
}
}
+
+func (srv *Server) pipeStop(ctx context.Context) (err error) {
+ // TODO: this is a race condition between a call to pipeRun and pipeStop...
+ if srv.interfaces.pipe == nil {
+ return nil
+ }
+ s5l.Printf("srv|pipe: shutting down")
+ return srv.interfaces.pipe.Close()
+ // TODO: also close alle open client file descriptors
+}
+
+func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) {
+ if srv.interfaces.pipe, err = net.Listen("unix", cfg.ListenPath); err != nil {
+ s5l.Printf("srv|pipe: listen() failed: %v", err)
+ return
+ }
+
+ s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("srv|pipe: interface stopped")
+
+ srv.pipeRun()
+ return
+}