summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srv.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go137
1 files changed, 60 insertions, 77 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 6742b6f..e45db69 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -48,18 +48,20 @@ type ingestToken struct {
}
type Server struct {
- cfg SrvConfig
- store *Store
- numWorker int
- anonymization AnonymizationAlgo
- geoip GeoIPLookup
- wgWorker *sync.WaitGroup
- ingestChan chan ingestToken
- interfaces struct {
+ cfg SrvConfig
+ store *Store
+ wgInterfaces sync.WaitGroup
+ interfaces struct {
pipe net.Listener
pipegram net.PacketConn
web *http.Server
}
+ numWorker int
+ anonymization AnonymizationAlgo
+ geoip GeoIPLookup
+ wgWorker sync.WaitGroup
+ ingestChan chan ingestToken
+ wgForwarder sync.WaitGroup
}
func (srv *Server) transform(update *UpdateFull) *UpdateFull {
@@ -134,95 +136,77 @@ func (srv *Server) IngestMany(updates []*UpdateFull) error {
return <-token.response
}
-func (srv *Server) shutdownInterfaces() (errors int) {
- ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
- c := make(chan error)
- go func() { c <- srv.webStop(ctx) }()
- go func() { c <- srv.pipeStop(ctx) }()
- go func() { c <- srv.pipegramStop(ctx) }()
-
- errors = 0
- for i := 0; i < 3; i++ {
- if err := <-c; err != nil {
- s5l.Printf("srv: interface shutdown failed failed: %v", err)
- errors++
- }
- }
- close(c) // closing channel here in the hopes that this leads to a panic
- // in case the number of channel reads (for loop above) doesn't match the
- // number of interfaces to be stopped
- cancel()
- return
-}
-
-func (srv *Server) Start() (wg sync.WaitGroup, err error) {
+func (srv *Server) Start() (wg *sync.WaitGroup, err error) {
if srv.cfg.Interfaces.Pipe.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipe(srv.cfg.Interfaces.Pipe)
- }()
+ srv.ServePipe(srv.cfg.Interfaces.Pipe)
}
-
if srv.cfg.Interfaces.Pipegram.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipegram(srv.cfg.Interfaces.Pipegram)
- }()
+ srv.ServePipegram(srv.cfg.Interfaces.Pipegram)
}
-
if srv.cfg.Interfaces.Web.ListenAddr != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServeWeb(srv.cfg.Interfaces.Web)
- }()
+ srv.ServeWeb(srv.cfg.Interfaces.Web)
}
+ // TODO: forwarder need a clean shutdown as well!!!
if srv.cfg.Forwards.SFive.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwarding(srv.cfg.Forwards.SFive)
- }()
+ go srv.RunForwarding(srv.cfg.Forwards.SFive)
}
-
if srv.cfg.Forwards.Elasticsearch.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
- }()
+ go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
}
-
if srv.cfg.Forwards.Graphite.Host != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
- }()
+ go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
}
-
if srv.cfg.Forwards.Piwik.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
- }()
+ go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
}
- return wg, nil
+
+ return &srv.wgInterfaces, nil
+}
+
+func (srv *Server) shutdownInterfaces() {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ defer cancel()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.webStop(ctx); err != nil {
+ s5l.Printf("srv|web: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.pipeStop(ctx); err != nil {
+ s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.pipegramStop(ctx); err != nil {
+ s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ s5l.Printf("srv: waiting for all clients to disconnect")
+ defer s5l.Printf("srv: all clients are now disconnected")
+ srv.wgInterfaces.Wait()
+ return
}
+// must not be called before Start()
func (srv *Server) Shutdown() {
s5l.Printf("srv: shutting down")
- if errors := srv.shutdownInterfaces(); errors != 0 {
- s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!")
- }
+ srv.shutdownInterfaces()
- close(srv.ingestChan) // close ingest channel to tell worker to stop
- srv.wgWorker.Wait() // wait for worker to finish up
+ s5l.Printf("srv: shutting down worker")
+ close(srv.ingestChan)
+ srv.wgWorker.Wait()
s5l.Printf("srv: all worker stopped")
srv.store.Close()
@@ -256,7 +240,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
srv.numWorker = cfg.Workers
}
- srv.wgWorker = &sync.WaitGroup{}
srv.ingestChan = make(chan ingestToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
srv.wgWorker.Add(1)