diff options
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/proxy.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 71 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 79 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 51 |
4 files changed, 112 insertions, 91 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index 3bb0afb..f994982 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -212,7 +212,7 @@ func (p *Proxy) Run() error { httpsL := m.Match(cmux.Any()) go p.RunHTTPS(httpsL) - if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { + if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? return err } return nil diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index ba34697..9b28270 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -37,7 +37,7 @@ import ( "log" "os" "os/signal" - "sync" + "syscall" "spreadspace.org/sfive" ) @@ -100,64 +100,10 @@ func main() { if err != nil { s5hl.Fatalf(err.Error()) } - defer srv.Close() - var wg sync.WaitGroup - - if cfg.Interfaces.Pipe.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipe(cfg.Interfaces.Pipe) - }() - } - - if cfg.Interfaces.Pipegram.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipegram(cfg.Interfaces.Pipegram) - }() - } - - if cfg.Interfaces.Web.ListenAddr != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServeWeb(cfg.Interfaces.Web) - }() - } - - if cfg.Forwards.SFive.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwarding(cfg.Forwards.SFive) - }() - } - - if cfg.Forwards.Elasticsearch.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingEs(cfg.Forwards.Elasticsearch) - }() - } - - if cfg.Forwards.Graphite.Host != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingGraphite(cfg.Forwards.Graphite) - }() - } - - if cfg.Forwards.Piwik.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingPiwik(cfg.Forwards.Piwik) - }() + wg, err := srv.Start() + if err != nil { + s5hl.Fatalf(err.Error()) } alldone := make(chan bool) @@ -167,13 +113,12 @@ func main() { }() c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) select { - case <-c: - s5hl.Println("received interrupt, shutdown") - return + case sig := <-c: + s5hl.Printf("signal(%v) received, shutting down", sig) case <-alldone: - return } + srv.Shutdown() } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index b91d5c5..e8778df 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -35,6 +35,7 @@ package sfive import ( "context" "errors" + "net" "net/http" "runtime" "sync" @@ -47,6 +48,7 @@ type ingestToken struct { } type Server struct { + cfg SrvConfig store *Store numWorker int anonymization AnonymizationAlgo @@ -54,7 +56,8 @@ type Server struct { wgWorker *sync.WaitGroup ingestChan chan ingestToken interfaces struct { - web *http.Server + web *http.Server + pipegram net.PacketConn } } @@ -134,24 +137,83 @@ func (srv *Server) shutdownInterfaces() (errors int) { ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) c := make(chan error) go func() { c <- srv.webStop(ctx) }() - //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped - //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped + //go func() { c <- srv.pipeStop(ctx) }() // TODO: add this as soon as pipe interface can be stopped + go func() { c <- srv.pipegramStop(ctx) }() errors = 0 - for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled + for i := 0; i < 2; i++ { // TODO: set limit to 3 when the above has been enabled if err := <-c; err != nil { s5l.Printf("srv: interface shutdown failed failed: %v", err) errors++ } } close(c) // closing channel here in the hopes that this leads to a panic - // in case the number of channel reads (for loop above) is) doesn't match the - // number of interfaces + // in case the number of channel reads (for loop above) doesn't match the + // number of interfaces to be stopped cancel() return } -func (srv *Server) Close() { +func (srv *Server) Start() (wg sync.WaitGroup, err error) { + if srv.cfg.Interfaces.Pipe.ListenPath != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.ServePipe(srv.cfg.Interfaces.Pipe) + }() + } + + if srv.cfg.Interfaces.Pipegram.ListenPath != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.ServePipegram(srv.cfg.Interfaces.Pipegram) + }() + } + + if srv.cfg.Interfaces.Web.ListenAddr != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.ServeWeb(srv.cfg.Interfaces.Web) + }() + } + + if srv.cfg.Forwards.SFive.URL != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwarding(srv.cfg.Forwards.SFive) + }() + } + + if srv.cfg.Forwards.Elasticsearch.URL != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) + }() + } + + if srv.cfg.Forwards.Graphite.Host != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) + }() + } + + if srv.cfg.Forwards.Piwik.URL != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) + }() + } + return wg, nil +} + +func (srv *Server) Shutdown() { s5l.Printf("srv: shutting down") if errors := srv.shutdownInterfaces(); errors != 0 { @@ -160,13 +222,14 @@ func (srv *Server) Close() { close(srv.ingestChan) // close ingest channel to tell worker to stop srv.wgWorker.Wait() // wait for worker to finish up + s5l.Printf("srv: all worker stopped") srv.store.Close() s5l.Printf("srv: finished") } func NewServer(cfg SrvConfig) (srv *Server, err error) { - srv = &Server{} + srv = &Server{cfg: cfg} if srv.store, err = NewStore(cfg.Store); err != nil { return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 7d0fbe4..6cf2bc4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -34,8 +34,10 @@ package sfive import ( "bytes" + "context" "io" "net" + "strings" ) const ( @@ -51,13 +53,13 @@ func (srv *Server) pipeHandle(conn net.Conn) { dec, err := NewStatefulDecoder(conn) if err != nil { - s5l.Printf("server|pipe: read(init) failed: %v\n", err) + s5l.Printf("srv|pipe: read(init) failed: %v\n", err) return } slug := dec.Slug() - s5l.Printf("server|pipe: new connection: %s\n", slug) - defer s5l.Printf("server|pipe(%s): connection closed\n", slug) + s5l.Printf("srv|pipe: new connection: %s\n", slug) + defer s5l.Printf("srv|pipe(%s): connection closed\n", slug) for { update, err := dec.Decode() @@ -69,15 +71,15 @@ func (srv *Server) pipeHandle(conn net.Conn) { opErr, isOpErr := err.(*net.OpError) if isOpErr && opErr.Temporary() { - s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) + s5l.Printf("srv|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) } else { - s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err) + s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) break } } if err = srv.Ingest(update); err != nil { - s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err) + s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) // TODO: send NACK? break } @@ -88,18 +90,18 @@ func (srv *Server) pipeHandle(conn net.Conn) { func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { ln, err := net.Listen("unix", cfg.ListenPath) if err != nil { - s5l.Printf("server|pipe: listen() failed: %v", err) + s5l.Printf("srv|pipe: listen() failed: %v", err) return } defer ln.Close() - s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pipe: interface stopped") + s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pipe: interface stopped") for { conn, err := ln.Accept() if err != nil { - s5l.Printf("server|pipe: accept() failed: %v", err) + s5l.Printf("srv|pipe: accept() failed: %v", err) // ignore continue } @@ -111,12 +113,15 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { // Unix Socket Interface (datagrams) // -func (srv *Server) pipegramHandle(pconn net.PacketConn) { +func (srv *Server) pipegramRun() { buffer := make([]byte, PipegramMessageSizeLimit) for { - n, _, err := pconn.ReadFrom(buffer) + n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } s5l.Printf("srv|pgram: read() failed: %v", err) continue } @@ -134,16 +139,24 @@ func (srv *Server) pipegramHandle(pconn net.PacketConn) { } } -func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) { - pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) - if err != nil { +func (srv *Server) pipegramStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to webRun and webStop... + if srv.interfaces.pipegram == nil { + return nil + } + s5l.Printf("srv|pgram: shutting down") + return srv.interfaces.pipegram.Close() +} + +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { + if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { s5l.Printf("srv|pgram: listen() failed: %v", err) return } - defer pconn.Close() - s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pgram: interface stopped") + s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pgram: interface stopped") - srv.pipegramHandle(pconn) + srv.pipegramRun() + return } |