diff options
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 137 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 23 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 13 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 47 | ||||
-rwxr-xr-x | src/hub/test-srv-single | 27 |
5 files changed, 138 insertions, 109 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 6742b6f..e45db69 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -48,18 +48,20 @@ type ingestToken struct { } type Server struct { - cfg SrvConfig - store *Store - numWorker int - anonymization AnonymizationAlgo - geoip GeoIPLookup - wgWorker *sync.WaitGroup - ingestChan chan ingestToken - interfaces struct { + cfg SrvConfig + store *Store + wgInterfaces sync.WaitGroup + interfaces struct { pipe net.Listener pipegram net.PacketConn web *http.Server } + numWorker int + anonymization AnonymizationAlgo + geoip GeoIPLookup + wgWorker sync.WaitGroup + ingestChan chan ingestToken + wgForwarder sync.WaitGroup } func (srv *Server) transform(update *UpdateFull) *UpdateFull { @@ -134,95 +136,77 @@ func (srv *Server) IngestMany(updates []*UpdateFull) error { return <-token.response } -func (srv *Server) shutdownInterfaces() (errors int) { - ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - c := make(chan error) - go func() { c <- srv.webStop(ctx) }() - go func() { c <- srv.pipeStop(ctx) }() - go func() { c <- srv.pipegramStop(ctx) }() - - errors = 0 - for i := 0; i < 3; i++ { - if err := <-c; err != nil { - s5l.Printf("srv: interface shutdown failed failed: %v", err) - errors++ - } - } - close(c) // closing channel here in the hopes that this leads to a panic - // in case the number of channel reads (for loop above) doesn't match the - // number of interfaces to be stopped - cancel() - return -} - -func (srv *Server) Start() (wg sync.WaitGroup, err error) { +func (srv *Server) Start() (wg *sync.WaitGroup, err error) { if srv.cfg.Interfaces.Pipe.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipe(srv.cfg.Interfaces.Pipe) - }() + srv.ServePipe(srv.cfg.Interfaces.Pipe) } - if srv.cfg.Interfaces.Pipegram.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipegram(srv.cfg.Interfaces.Pipegram) - }() + srv.ServePipegram(srv.cfg.Interfaces.Pipegram) } - if srv.cfg.Interfaces.Web.ListenAddr != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServeWeb(srv.cfg.Interfaces.Web) - }() + srv.ServeWeb(srv.cfg.Interfaces.Web) } + // TODO: forwarder need a clean shutdown as well!!! if srv.cfg.Forwards.SFive.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwarding(srv.cfg.Forwards.SFive) - }() + go srv.RunForwarding(srv.cfg.Forwards.SFive) } - if srv.cfg.Forwards.Elasticsearch.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) - }() + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) } - if srv.cfg.Forwards.Graphite.Host != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) - }() + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) } - if srv.cfg.Forwards.Piwik.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) - }() + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) } - return wg, nil + + return &srv.wgInterfaces, nil +} + +func (srv *Server) shutdownInterfaces() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.webStop(ctx); err != nil { + s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipeStop(ctx); err != nil { + s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipegramStop(ctx); err != nil { + s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + } + }() + + s5l.Printf("srv: waiting for all clients to disconnect") + defer s5l.Printf("srv: all clients are now disconnected") + srv.wgInterfaces.Wait() + return } +// must not be called before Start() func (srv *Server) Shutdown() { s5l.Printf("srv: shutting down") - if errors := srv.shutdownInterfaces(); errors != 0 { - s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!") - } + srv.shutdownInterfaces() - close(srv.ingestChan) // close ingest channel to tell worker to stop - srv.wgWorker.Wait() // wait for worker to finish up + s5l.Printf("srv: shutting down worker") + close(srv.ingestChan) + srv.wgWorker.Wait() s5l.Printf("srv: all worker stopped") srv.store.Close() @@ -256,7 +240,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { srv.numWorker = cfg.Workers } - srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { srv.wgWorker.Add(1) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index c291513..46a83cc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -37,6 +37,7 @@ import ( "io" "net" "strings" + "sync" ) func (srv *Server) pipeHandle(conn net.Conn) { @@ -79,28 +80,34 @@ func (srv *Server) pipeHandle(conn net.Conn) { } func (srv *Server) pipeRun() { + var wgClients sync.WaitGroup for { conn, err := srv.interfaces.pipe.Accept() if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? - return + break } s5l.Printf("srv|pipe: accept() failed: %v", err) // ignore continue } - go srv.pipeHandle(conn) + wgClients.Add(1) + go func() { + defer wgClients.Done() + srv.pipeHandle(conn) + }() } + s5l.Println("srv|pipe: interface stopped listening") + // TODO: tell all clients to disconnect!!! + wgClients.Wait() } func (srv *Server) pipeStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to pipeRun and pipeStop... if srv.interfaces.pipe == nil { return nil } s5l.Printf("srv|pipe: shutting down") return srv.interfaces.pipe.Close() - // TODO: also close alle open client file descriptors } func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { @@ -110,8 +117,12 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { } s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pipe: interface stopped") - srv.pipeRun() + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + + srv.pipeRun() + }() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go index b2807ed..65ff0e0 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -40,12 +40,11 @@ import ( ) const ( - PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? + PipegramMessageSizeLimit = 1024 * 1024 ) func (srv *Server) pipegramRun() { buffer := make([]byte, PipegramMessageSizeLimit) - for { n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) if err != nil { @@ -70,7 +69,6 @@ func (srv *Server) pipegramRun() { } func (srv *Server) pipegramStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to pipegramRun and pipegramStop... if srv.interfaces.pipegram == nil { return nil } @@ -85,8 +83,13 @@ func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { } s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pgram: interface stopped") - srv.pipegramRun() + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + defer s5l.Println("srv|pgram: interface stopped listening") + + srv.pipegramRun() + }() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index a112585..65e677a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -397,26 +397,11 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { return tc, nil } -func (srv *Server) webRun(listener *net.TCPListener) (err error) { - mux := http.NewServeMux() - mux.Handle("/healthz", webHandler{srv, webHealthz}) - mux.Handle("/hubs", webHandler{srv, webHubs}) - mux.Handle("/sources", webHandler{srv, webSources}) - mux.Handle("/clients", webHandler{srv, webClients}) - mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) - mux.Handle("/updates", webHandler{srv, webUpdates}) - mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) - mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID}) - - // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) - mux.Handle("/", webHandler{srv, webNotFound}) - - srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} - return srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) +func (srv *Server) webRun(listener *net.TCPListener) { + srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) } func (srv *Server) webStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to webRun and webStop... if srv.interfaces.web == nil { return nil } @@ -424,15 +409,35 @@ func (srv *Server) webStop(ctx context.Context) (err error) { return srv.interfaces.web.Shutdown(ctx) } -func (srv *Server) ServeWeb(cfg WebInterfaceConfig) { +func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error { ln, err := net.Listen("tcp", cfg.ListenAddr) if err != nil { s5l.Printf("srv|web: listen() failed: %v", err) - return + return err } s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) - defer s5l.Println("srv|web: interface stopped") - srv.webRun(ln.(*net.TCPListener)) + mux := http.NewServeMux() + mux.Handle("/healthz", webHandler{srv, webHealthz}) + mux.Handle("/hubs", webHandler{srv, webHubs}) + mux.Handle("/sources", webHandler{srv, webSources}) + mux.Handle("/clients", webHandler{srv, webClients}) + mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) + mux.Handle("/updates", webHandler{srv, webUpdates}) + mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) + mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID}) + // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) + mux.Handle("/", webHandler{srv, webNotFound}) + + srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + defer s5l.Println("srv|web: interface stopped listening") + + srv.webRun(ln.(*net.TCPListener)) + }() + return nil } diff --git a/src/hub/test-srv-single b/src/hub/test-srv-single new file mode 100755 index 0000000..902c78b --- /dev/null +++ b/src/hub/test-srv-single @@ -0,0 +1,27 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 <db-name> <interface>" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +case "$2" in + pipe) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server=false -start-web-server=false + ;; + pipegram) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server=false + ;; + web) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server -web ":8000" + ;; + *) + "unknown interface $2" + return 1 + ;; +esac |