summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-11-25 17:48:54 +0100
committerChristian Pointner <equinox@spreadspace.org>2017-11-25 18:31:46 +0100
commit22448bda4081d9a1d1a4aa97837a483e611dcd31 (patch)
tree73c1a3c794fe5ff532defba1760a8c8f9964f868 /src
parentadded some scripts for forwarding tests (diff)
clean shutdown for forwarder (not finished)
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go70
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go (renamed from src/hub/src/spreadspace.org/sfive/s5srvForward.go)34
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipegram.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go4
5 files changed, 85 insertions, 31 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index e45db69..e63b214 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -48,20 +48,25 @@ type ingestToken struct {
}
type Server struct {
- cfg SrvConfig
- store *Store
- wgInterfaces sync.WaitGroup
- interfaces struct {
+ cfg SrvConfig
+ store *Store
+ wgShutdown sync.WaitGroup
+ interfaces struct {
pipe net.Listener
pipegram net.PacketConn
web *http.Server
}
+ forwarder struct {
+ sfive chan chan bool
+ es chan chan bool
+ graphite chan chan bool
+ piwik chan chan bool
+ }
numWorker int
anonymization AnonymizationAlgo
geoip GeoIPLookup
wgWorker sync.WaitGroup
ingestChan chan ingestToken
- wgForwarder sync.WaitGroup
}
func (srv *Server) transform(update *UpdateFull) *UpdateFull {
@@ -147,54 +152,73 @@ func (srv *Server) Start() (wg *sync.WaitGroup, err error) {
srv.ServeWeb(srv.cfg.Interfaces.Web)
}
- // TODO: forwarder need a clean shutdown as well!!!
if srv.cfg.Forwards.SFive.URL != "" {
- go srv.RunForwarding(srv.cfg.Forwards.SFive)
+ srv.ForwardSFive(srv.cfg.Forwards.SFive)
}
if srv.cfg.Forwards.Elasticsearch.URL != "" {
- go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
+ go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) // TODO: clean shutdown
}
if srv.cfg.Forwards.Graphite.Host != "" {
- go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
+ go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) // TODO: clean shutdown
}
if srv.cfg.Forwards.Piwik.URL != "" {
- go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
+ go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) // TODO: clean shutdown
}
- return &srv.wgInterfaces, nil
+ return &srv.wgShutdown, nil
}
func (srv *Server) shutdownInterfaces() {
- ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value
defer cancel()
- srv.wgInterfaces.Add(1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer wg.Done()
if err := srv.webStop(ctx); err != nil {
- s5l.Printf("srv|web: interface shutdown failed failed: %v", err)
+ s5l.Printf("srv|web: interface shutdown failed: %v", err)
}
}()
- srv.wgInterfaces.Add(1)
+ wg.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer wg.Done()
if err := srv.pipeStop(ctx); err != nil {
- s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err)
+ s5l.Printf("srv|pipe: interface shutdown failed: %v", err)
}
}()
- srv.wgInterfaces.Add(1)
+ wg.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer wg.Done()
if err := srv.pipegramStop(ctx); err != nil {
- s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err)
+ s5l.Printf("srv|pgram: interface shutdown failed: %v", err)
}
}()
s5l.Printf("srv: waiting for all clients to disconnect")
defer s5l.Printf("srv: all clients are now disconnected")
- srv.wgInterfaces.Wait()
+ wg.Wait()
+ return
+}
+
+func (srv *Server) shutdownForwarder() {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value
+ defer cancel()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := srv.forwardStop(ctx); err != nil {
+ s5l.Printf("srv|fwd: forwarder shutdown failed: %v", err)
+ }
+ }()
+
+ s5l.Printf("srv: waiting for all forwarder to stop")
+ defer s5l.Printf("srv: all forwarder are stopped")
+ wg.Wait()
return
}
@@ -209,6 +233,8 @@ func (srv *Server) Shutdown() {
srv.wgWorker.Wait()
s5l.Printf("srv: all worker stopped")
+ srv.shutdownForwarder()
+
srv.store.Close()
s5l.Printf("srv: finished")
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go
index 9e395be..d09c500 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go
@@ -33,6 +33,7 @@
package sfive
import (
+ "context"
"encoding/json"
"errors"
"io"
@@ -124,6 +125,13 @@ tryResync:
nextBatch:
for {
+ select {
+ case ch := <-srv.forwarder.sfive:
+ close(ch)
+ return
+ default:
+ }
+
updates, err := srv.store.GetUpdatesAfter(lastID, 5000)
if err != nil {
s5l.Printf("srv|fwd: reading updates failed: %v", err)
@@ -151,9 +159,29 @@ tryResync:
}
}
-func (srv *Server) RunForwarding(cfg SFiveForwardConfig) {
+func (srv *Server) forwardStop(ctx context.Context) (err error) {
+ if srv.forwarder.sfive == nil {
+ return nil
+ }
+ s5l.Printf("srv|fwd: shutting down")
+ stopCH := make(chan bool)
+ srv.forwarder.sfive <- stopCH
+ select {
+ case <-stopCH:
+ case <-ctx.Done():
+ }
+ return nil
+}
+
+func (srv *Server) ForwardSFive(cfg SFiveForwardConfig) {
s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL)
- defer s5l.Println("srv|fwd: forwarder stopped")
- srv.forwardRun(cfg.URL, http.DefaultClient)
+ srv.wgShutdown.Add(1)
+ go func() {
+ defer srv.wgShutdown.Done()
+ defer s5l.Println("srv|fwd: forwarder stopped")
+
+ srv.forwarder.sfive = make(chan chan bool)
+ srv.forwardRun(cfg.URL, http.DefaultClient)
+ }()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 2a6a200..5668735 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -140,9 +140,9 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) {
s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath)
- srv.wgInterfaces.Add(1)
+ srv.wgShutdown.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer srv.wgShutdown.Done()
srv.pipeRun()
}()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
index 65ff0e0..b72185d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
@@ -84,9 +84,9 @@ func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) {
s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath)
- srv.wgInterfaces.Add(1)
+ srv.wgShutdown.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer srv.wgShutdown.Done()
defer s5l.Println("srv|pgram: interface stopped listening")
srv.pipegramRun()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 54f1d18..7094dc4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -434,9 +434,9 @@ func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error {
srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
- srv.wgInterfaces.Add(1)
+ srv.wgShutdown.Add(1)
go func() {
- defer srv.wgInterfaces.Done()
+ defer srv.wgShutdown.Done()
defer s5l.Println("srv|web: interface stopped listening")
srv.webRun(ln.(*net.TCPListener))