diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 70 |
1 files changed, 48 insertions, 22 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e45db69..e63b214 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -48,20 +48,25 @@ type ingestToken struct { } type Server struct { - cfg SrvConfig - store *Store - wgInterfaces sync.WaitGroup - interfaces struct { + cfg SrvConfig + store *Store + wgShutdown sync.WaitGroup + interfaces struct { pipe net.Listener pipegram net.PacketConn web *http.Server } + forwarder struct { + sfive chan chan bool + es chan chan bool + graphite chan chan bool + piwik chan chan bool + } numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup wgWorker sync.WaitGroup ingestChan chan ingestToken - wgForwarder sync.WaitGroup } func (srv *Server) transform(update *UpdateFull) *UpdateFull { @@ -147,54 +152,73 @@ func (srv *Server) Start() (wg *sync.WaitGroup, err error) { srv.ServeWeb(srv.cfg.Interfaces.Web) } - // TODO: forwarder need a clean shutdown as well!!! if srv.cfg.Forwards.SFive.URL != "" { - go srv.RunForwarding(srv.cfg.Forwards.SFive) + srv.ForwardSFive(srv.cfg.Forwards.SFive) } if srv.cfg.Forwards.Elasticsearch.URL != "" { - go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) // TODO: clean shutdown } if srv.cfg.Forwards.Graphite.Host != "" { - go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) // TODO: clean shutdown } if srv.cfg.Forwards.Piwik.URL != "" { - go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) // TODO: clean shutdown } - return &srv.wgInterfaces, nil + return &srv.wgShutdown, nil } func (srv *Server) shutdownInterfaces() { - ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value defer cancel() - srv.wgInterfaces.Add(1) + var wg sync.WaitGroup + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.webStop(ctx); err != nil { - s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + s5l.Printf("srv|web: interface shutdown failed: %v", err) } }() - srv.wgInterfaces.Add(1) + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.pipeStop(ctx); err != nil { - s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + s5l.Printf("srv|pipe: interface shutdown failed: %v", err) } }() - srv.wgInterfaces.Add(1) + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.pipegramStop(ctx); err != nil { - s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + s5l.Printf("srv|pgram: interface shutdown failed: %v", err) } }() s5l.Printf("srv: waiting for all clients to disconnect") defer s5l.Printf("srv: all clients are now disconnected") - srv.wgInterfaces.Wait() + wg.Wait() + return +} + +func (srv *Server) shutdownForwarder() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := srv.forwardStop(ctx); err != nil { + s5l.Printf("srv|fwd: forwarder shutdown failed: %v", err) + } + }() + + s5l.Printf("srv: waiting for all forwarder to stop") + defer s5l.Printf("srv: all forwarder are stopped") + wg.Wait() return } @@ -209,6 +233,8 @@ func (srv *Server) Shutdown() { srv.wgWorker.Wait() s5l.Printf("srv: all worker stopped") + srv.shutdownForwarder() + srv.store.Close() s5l.Printf("srv: finished") } |