summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srv.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go70
1 files changed, 48 insertions, 22 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index e45db69..e63b214 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -48,20 +48,25 @@ type ingestToken struct {
}
type Server struct {
- cfg SrvConfig
- store *Store
- wgInterfaces sync.WaitGroup
- interfaces struct {
+ cfg SrvConfig
+ store *Store
+ wgShutdown sync.WaitGroup
+ interfaces struct {
pipe net.Listener
pipegram net.PacketConn
web *http.Server
}
+ forwarder struct {
+ sfive chan chan bool
+ es chan chan bool
+ graphite chan chan bool
+ piwik chan chan bool
+ }
numWorker int
anonymization AnonymizationAlgo
geoip GeoIPLookup
wgWorker sync.WaitGroup
ingestChan chan ingestToken
- wgForwarder sync.WaitGroup
}
func (srv *Server) transform(update *UpdateFull) *UpdateFull {
@@ -147,54 +152,73 @@ func (srv *Server) Start() (wg *sync.WaitGroup, err error) {
srv.ServeWeb(srv.cfg.Interfaces.Web)
}
- // TODO: forwarder need a clean shutdown as well!!!
if srv.cfg.Forwards.SFive.URL != "" {
- go srv.RunForwarding(srv.cfg.Forwards.SFive)
+ srv.ForwardSFive(srv.cfg.Forwards.SFive)
}
if srv.cfg.Forwards.Elasticsearch.URL != "" {
- go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
+ go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) // TODO: clean shutdown
}
if srv.cfg.Forwards.Graphite.Host != "" {
- go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
+ go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) // TODO: clean shutdown
}
if srv.cfg.Forwards.Piwik.URL != "" {
- go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
+ go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) // TODO: clean shutdown
}
- return &srv.wgInterfaces, nil
+ return &srv.wgShutdown, nil
}
func (srv *Server) shutdownInterfaces() {
- ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value
defer cancel()
- srv.wgInterfaces.Add(1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer wg.Done()
if err := srv.webStop(ctx); err != nil {
- s5l.Printf("srv|web: interface shutdown failed failed: %v", err)
+ s5l.Printf("srv|web: interface shutdown failed: %v", err)
}
}()
- srv.wgInterfaces.Add(1)
+ wg.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer wg.Done()
if err := srv.pipeStop(ctx); err != nil {
- s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err)
+ s5l.Printf("srv|pipe: interface shutdown failed: %v", err)
}
}()
- srv.wgInterfaces.Add(1)
+ wg.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer wg.Done()
if err := srv.pipegramStop(ctx); err != nil {
- s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err)
+ s5l.Printf("srv|pgram: interface shutdown failed: %v", err)
}
}()
s5l.Printf("srv: waiting for all clients to disconnect")
defer s5l.Printf("srv: all clients are now disconnected")
- srv.wgInterfaces.Wait()
+ wg.Wait()
+ return
+}
+
+func (srv *Server) shutdownForwarder() {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value
+ defer cancel()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := srv.forwardStop(ctx); err != nil {
+ s5l.Printf("srv|fwd: forwarder shutdown failed: %v", err)
+ }
+ }()
+
+ s5l.Printf("srv: waiting for all forwarder to stop")
+ defer s5l.Printf("srv: all forwarder are stopped")
+ wg.Wait()
return
}
@@ -209,6 +233,8 @@ func (srv *Server) Shutdown() {
srv.wgWorker.Wait()
s5l.Printf("srv: all worker stopped")
+ srv.shutdownForwarder()
+
srv.store.Close()
s5l.Printf("srv: finished")
}