diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 137 |
1 files changed, 60 insertions, 77 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 6742b6f..e45db69 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -48,18 +48,20 @@ type ingestToken struct { } type Server struct { - cfg SrvConfig - store *Store - numWorker int - anonymization AnonymizationAlgo - geoip GeoIPLookup - wgWorker *sync.WaitGroup - ingestChan chan ingestToken - interfaces struct { + cfg SrvConfig + store *Store + wgInterfaces sync.WaitGroup + interfaces struct { pipe net.Listener pipegram net.PacketConn web *http.Server } + numWorker int + anonymization AnonymizationAlgo + geoip GeoIPLookup + wgWorker sync.WaitGroup + ingestChan chan ingestToken + wgForwarder sync.WaitGroup } func (srv *Server) transform(update *UpdateFull) *UpdateFull { @@ -134,95 +136,77 @@ func (srv *Server) IngestMany(updates []*UpdateFull) error { return <-token.response } -func (srv *Server) shutdownInterfaces() (errors int) { - ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - c := make(chan error) - go func() { c <- srv.webStop(ctx) }() - go func() { c <- srv.pipeStop(ctx) }() - go func() { c <- srv.pipegramStop(ctx) }() - - errors = 0 - for i := 0; i < 3; i++ { - if err := <-c; err != nil { - s5l.Printf("srv: interface shutdown failed failed: %v", err) - errors++ - } - } - close(c) // closing channel here in the hopes that this leads to a panic - // in case the number of channel reads (for loop above) doesn't match the - // number of interfaces to be stopped - cancel() - return -} - -func (srv *Server) Start() (wg sync.WaitGroup, err error) { +func (srv *Server) Start() (wg *sync.WaitGroup, err error) { if srv.cfg.Interfaces.Pipe.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipe(srv.cfg.Interfaces.Pipe) - }() + srv.ServePipe(srv.cfg.Interfaces.Pipe) } - if srv.cfg.Interfaces.Pipegram.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipegram(srv.cfg.Interfaces.Pipegram) - }() + srv.ServePipegram(srv.cfg.Interfaces.Pipegram) } - if srv.cfg.Interfaces.Web.ListenAddr != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServeWeb(srv.cfg.Interfaces.Web) - }() + srv.ServeWeb(srv.cfg.Interfaces.Web) } + // TODO: forwarder need a clean shutdown as well!!! if srv.cfg.Forwards.SFive.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwarding(srv.cfg.Forwards.SFive) - }() + go srv.RunForwarding(srv.cfg.Forwards.SFive) } - if srv.cfg.Forwards.Elasticsearch.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) - }() + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) } - if srv.cfg.Forwards.Graphite.Host != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) - }() + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) } - if srv.cfg.Forwards.Piwik.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) - }() + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) } - return wg, nil + + return &srv.wgInterfaces, nil +} + +func (srv *Server) shutdownInterfaces() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.webStop(ctx); err != nil { + s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipeStop(ctx); err != nil { + s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipegramStop(ctx); err != nil { + s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + } + }() + + s5l.Printf("srv: waiting for all clients to disconnect") + defer s5l.Printf("srv: all clients are now disconnected") + srv.wgInterfaces.Wait() + return } +// must not be called before Start() func (srv *Server) Shutdown() { s5l.Printf("srv: shutting down") - if errors := srv.shutdownInterfaces(); errors != 0 { - s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!") - } + srv.shutdownInterfaces() - close(srv.ingestChan) // close ingest channel to tell worker to stop - srv.wgWorker.Wait() // wait for worker to finish up + s5l.Printf("srv: shutting down worker") + close(srv.ingestChan) + srv.wgWorker.Wait() s5l.Printf("srv: all worker stopped") srv.store.Close() @@ -256,7 +240,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { srv.numWorker = cfg.Workers } - srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { srv.wgWorker.Add(1) |