diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-11-25 17:48:54 +0100 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-11-25 18:31:46 +0100 |
commit | 22448bda4081d9a1d1a4aa97837a483e611dcd31 (patch) | |
tree | 73c1a3c794fe5ff532defba1760a8c8f9964f868 /src | |
parent | added some scripts for forwarding tests (diff) |
clean shutdown for forwarder (not finished)
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 70 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go (renamed from src/hub/src/spreadspace.org/sfive/s5srvForward.go) | 34 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 4 |
5 files changed, 85 insertions, 31 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e45db69..e63b214 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -48,20 +48,25 @@ type ingestToken struct { } type Server struct { - cfg SrvConfig - store *Store - wgInterfaces sync.WaitGroup - interfaces struct { + cfg SrvConfig + store *Store + wgShutdown sync.WaitGroup + interfaces struct { pipe net.Listener pipegram net.PacketConn web *http.Server } + forwarder struct { + sfive chan chan bool + es chan chan bool + graphite chan chan bool + piwik chan chan bool + } numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup wgWorker sync.WaitGroup ingestChan chan ingestToken - wgForwarder sync.WaitGroup } func (srv *Server) transform(update *UpdateFull) *UpdateFull { @@ -147,54 +152,73 @@ func (srv *Server) Start() (wg *sync.WaitGroup, err error) { srv.ServeWeb(srv.cfg.Interfaces.Web) } - // TODO: forwarder need a clean shutdown as well!!! if srv.cfg.Forwards.SFive.URL != "" { - go srv.RunForwarding(srv.cfg.Forwards.SFive) + srv.ForwardSFive(srv.cfg.Forwards.SFive) } if srv.cfg.Forwards.Elasticsearch.URL != "" { - go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) // TODO: clean shutdown } if srv.cfg.Forwards.Graphite.Host != "" { - go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) // TODO: clean shutdown } if srv.cfg.Forwards.Piwik.URL != "" { - go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) // TODO: clean shutdown } - return &srv.wgInterfaces, nil + return &srv.wgShutdown, nil } func (srv *Server) shutdownInterfaces() { - ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value defer cancel() - srv.wgInterfaces.Add(1) + var wg sync.WaitGroup + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.webStop(ctx); err != nil { - s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + s5l.Printf("srv|web: interface shutdown failed: %v", err) } }() - srv.wgInterfaces.Add(1) + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.pipeStop(ctx); err != nil { - s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + s5l.Printf("srv|pipe: interface shutdown failed: %v", err) } }() - srv.wgInterfaces.Add(1) + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.pipegramStop(ctx); err != nil { - s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + s5l.Printf("srv|pgram: interface shutdown failed: %v", err) } }() s5l.Printf("srv: waiting for all clients to disconnect") defer s5l.Printf("srv: all clients are now disconnected") - srv.wgInterfaces.Wait() + wg.Wait() + return +} + +func (srv *Server) shutdownForwarder() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := srv.forwardStop(ctx); err != nil { + s5l.Printf("srv|fwd: forwarder shutdown failed: %v", err) + } + }() + + s5l.Printf("srv: waiting for all forwarder to stop") + defer s5l.Printf("srv: all forwarder are stopped") + wg.Wait() return } @@ -209,6 +233,8 @@ func (srv *Server) Shutdown() { srv.wgWorker.Wait() s5l.Printf("srv: all worker stopped") + srv.shutdownForwarder() + srv.store.Close() s5l.Printf("srv: finished") } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go index 9e395be..d09c500 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go @@ -33,6 +33,7 @@ package sfive import ( + "context" "encoding/json" "errors" "io" @@ -124,6 +125,13 @@ tryResync: nextBatch: for { + select { + case ch := <-srv.forwarder.sfive: + close(ch) + return + default: + } + updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { s5l.Printf("srv|fwd: reading updates failed: %v", err) @@ -151,9 +159,29 @@ tryResync: } } -func (srv *Server) RunForwarding(cfg SFiveForwardConfig) { +func (srv *Server) forwardStop(ctx context.Context) (err error) { + if srv.forwarder.sfive == nil { + return nil + } + s5l.Printf("srv|fwd: shutting down") + stopCH := make(chan bool) + srv.forwarder.sfive <- stopCH + select { + case <-stopCH: + case <-ctx.Done(): + } + return nil +} + +func (srv *Server) ForwardSFive(cfg SFiveForwardConfig) { s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL) - defer s5l.Println("srv|fwd: forwarder stopped") - srv.forwardRun(cfg.URL, http.DefaultClient) + srv.wgShutdown.Add(1) + go func() { + defer srv.wgShutdown.Done() + defer s5l.Println("srv|fwd: forwarder stopped") + + srv.forwarder.sfive = make(chan chan bool) + srv.forwardRun(cfg.URL, http.DefaultClient) + }() } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 2a6a200..5668735 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -140,9 +140,9 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) - srv.wgInterfaces.Add(1) + srv.wgShutdown.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer srv.wgShutdown.Done() srv.pipeRun() }() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go index 65ff0e0..b72185d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -84,9 +84,9 @@ func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) - srv.wgInterfaces.Add(1) + srv.wgShutdown.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer srv.wgShutdown.Done() defer s5l.Println("srv|pgram: interface stopped listening") srv.pipegramRun() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 54f1d18..7094dc4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -434,9 +434,9 @@ func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error { srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} - srv.wgInterfaces.Add(1) + srv.wgShutdown.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer srv.wgShutdown.Done() defer s5l.Println("srv|web: interface stopped listening") srv.webRun(ln.(*net.TCPListener)) |