summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-14 01:19:07 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-14 01:19:07 +0200
commitf67e238d3a0f96affad921d19d8f469153287b70 (patch)
treef8e298864fc05b042adf97c3f416bd849abd02a0 /src
parentadded (stupid) shutdown for pipe interface (diff)
interface shutdown almost clean now
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go137
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go23
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipegram.go13
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go47
-rwxr-xr-xsrc/hub/test-srv-single27
5 files changed, 138 insertions, 109 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 6742b6f..e45db69 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -48,18 +48,20 @@ type ingestToken struct {
}
type Server struct {
- cfg SrvConfig
- store *Store
- numWorker int
- anonymization AnonymizationAlgo
- geoip GeoIPLookup
- wgWorker *sync.WaitGroup
- ingestChan chan ingestToken
- interfaces struct {
+ cfg SrvConfig
+ store *Store
+ wgInterfaces sync.WaitGroup
+ interfaces struct {
pipe net.Listener
pipegram net.PacketConn
web *http.Server
}
+ numWorker int
+ anonymization AnonymizationAlgo
+ geoip GeoIPLookup
+ wgWorker sync.WaitGroup
+ ingestChan chan ingestToken
+ wgForwarder sync.WaitGroup
}
func (srv *Server) transform(update *UpdateFull) *UpdateFull {
@@ -134,95 +136,77 @@ func (srv *Server) IngestMany(updates []*UpdateFull) error {
return <-token.response
}
-func (srv *Server) shutdownInterfaces() (errors int) {
- ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
- c := make(chan error)
- go func() { c <- srv.webStop(ctx) }()
- go func() { c <- srv.pipeStop(ctx) }()
- go func() { c <- srv.pipegramStop(ctx) }()
-
- errors = 0
- for i := 0; i < 3; i++ {
- if err := <-c; err != nil {
- s5l.Printf("srv: interface shutdown failed failed: %v", err)
- errors++
- }
- }
- close(c) // closing channel here in the hopes that this leads to a panic
- // in case the number of channel reads (for loop above) doesn't match the
- // number of interfaces to be stopped
- cancel()
- return
-}
-
-func (srv *Server) Start() (wg sync.WaitGroup, err error) {
+func (srv *Server) Start() (wg *sync.WaitGroup, err error) {
if srv.cfg.Interfaces.Pipe.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipe(srv.cfg.Interfaces.Pipe)
- }()
+ srv.ServePipe(srv.cfg.Interfaces.Pipe)
}
-
if srv.cfg.Interfaces.Pipegram.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipegram(srv.cfg.Interfaces.Pipegram)
- }()
+ srv.ServePipegram(srv.cfg.Interfaces.Pipegram)
}
-
if srv.cfg.Interfaces.Web.ListenAddr != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServeWeb(srv.cfg.Interfaces.Web)
- }()
+ srv.ServeWeb(srv.cfg.Interfaces.Web)
}
+ // TODO: forwarder need a clean shutdown as well!!!
if srv.cfg.Forwards.SFive.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwarding(srv.cfg.Forwards.SFive)
- }()
+ go srv.RunForwarding(srv.cfg.Forwards.SFive)
}
-
if srv.cfg.Forwards.Elasticsearch.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
- }()
+ go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
}
-
if srv.cfg.Forwards.Graphite.Host != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
- }()
+ go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
}
-
if srv.cfg.Forwards.Piwik.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
- }()
+ go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
}
- return wg, nil
+
+ return &srv.wgInterfaces, nil
+}
+
+func (srv *Server) shutdownInterfaces() {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ defer cancel()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.webStop(ctx); err != nil {
+ s5l.Printf("srv|web: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.pipeStop(ctx); err != nil {
+ s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.pipegramStop(ctx); err != nil {
+ s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ s5l.Printf("srv: waiting for all clients to disconnect")
+ defer s5l.Printf("srv: all clients are now disconnected")
+ srv.wgInterfaces.Wait()
+ return
}
+// must not be called before Start()
func (srv *Server) Shutdown() {
s5l.Printf("srv: shutting down")
- if errors := srv.shutdownInterfaces(); errors != 0 {
- s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!")
- }
+ srv.shutdownInterfaces()
- close(srv.ingestChan) // close ingest channel to tell worker to stop
- srv.wgWorker.Wait() // wait for worker to finish up
+ s5l.Printf("srv: shutting down worker")
+ close(srv.ingestChan)
+ srv.wgWorker.Wait()
s5l.Printf("srv: all worker stopped")
srv.store.Close()
@@ -256,7 +240,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
srv.numWorker = cfg.Workers
}
- srv.wgWorker = &sync.WaitGroup{}
srv.ingestChan = make(chan ingestToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
srv.wgWorker.Add(1)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index c291513..46a83cc 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -37,6 +37,7 @@ import (
"io"
"net"
"strings"
+ "sync"
)
func (srv *Server) pipeHandle(conn net.Conn) {
@@ -79,28 +80,34 @@ func (srv *Server) pipeHandle(conn net.Conn) {
}
func (srv *Server) pipeRun() {
+ var wgClients sync.WaitGroup
for {
conn, err := srv.interfaces.pipe.Accept()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
- return
+ break
}
s5l.Printf("srv|pipe: accept() failed: %v", err)
// ignore
continue
}
- go srv.pipeHandle(conn)
+ wgClients.Add(1)
+ go func() {
+ defer wgClients.Done()
+ srv.pipeHandle(conn)
+ }()
}
+ s5l.Println("srv|pipe: interface stopped listening")
+ // TODO: tell all clients to disconnect!!!
+ wgClients.Wait()
}
func (srv *Server) pipeStop(ctx context.Context) (err error) {
- // TODO: this is a race condition between a call to pipeRun and pipeStop...
if srv.interfaces.pipe == nil {
return nil
}
s5l.Printf("srv|pipe: shutting down")
return srv.interfaces.pipe.Close()
- // TODO: also close alle open client file descriptors
}
func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) {
@@ -110,8 +117,12 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) {
}
s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("srv|pipe: interface stopped")
- srv.pipeRun()
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+
+ srv.pipeRun()
+ }()
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
index b2807ed..65ff0e0 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
@@ -40,12 +40,11 @@ import (
)
const (
- PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed?
+ PipegramMessageSizeLimit = 1024 * 1024
)
func (srv *Server) pipegramRun() {
buffer := make([]byte, PipegramMessageSizeLimit)
-
for {
n, _, err := srv.interfaces.pipegram.ReadFrom(buffer)
if err != nil {
@@ -70,7 +69,6 @@ func (srv *Server) pipegramRun() {
}
func (srv *Server) pipegramStop(ctx context.Context) (err error) {
- // TODO: this is a race condition between a call to pipegramRun and pipegramStop...
if srv.interfaces.pipegram == nil {
return nil
}
@@ -85,8 +83,13 @@ func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) {
}
s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("srv|pgram: interface stopped")
- srv.pipegramRun()
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ defer s5l.Println("srv|pgram: interface stopped listening")
+
+ srv.pipegramRun()
+ }()
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index a112585..65e677a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -397,26 +397,11 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
return tc, nil
}
-func (srv *Server) webRun(listener *net.TCPListener) (err error) {
- mux := http.NewServeMux()
- mux.Handle("/healthz", webHandler{srv, webHealthz})
- mux.Handle("/hubs", webHandler{srv, webHubs})
- mux.Handle("/sources", webHandler{srv, webSources})
- mux.Handle("/clients", webHandler{srv, webClients})
- mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam})
- mux.Handle("/updates", webHandler{srv, webUpdates})
- mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID})
- mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID})
-
- // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. ))))
- mux.Handle("/", webHandler{srv, webNotFound})
-
- srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
- return srv.interfaces.web.Serve(tcpKeepAliveListener{listener})
+func (srv *Server) webRun(listener *net.TCPListener) {
+ srv.interfaces.web.Serve(tcpKeepAliveListener{listener})
}
func (srv *Server) webStop(ctx context.Context) (err error) {
- // TODO: this is a race condition between a call to webRun and webStop...
if srv.interfaces.web == nil {
return nil
}
@@ -424,15 +409,35 @@ func (srv *Server) webStop(ctx context.Context) (err error) {
return srv.interfaces.web.Shutdown(ctx)
}
-func (srv *Server) ServeWeb(cfg WebInterfaceConfig) {
+func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error {
ln, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
s5l.Printf("srv|web: listen() failed: %v", err)
- return
+ return err
}
s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr)
- defer s5l.Println("srv|web: interface stopped")
- srv.webRun(ln.(*net.TCPListener))
+ mux := http.NewServeMux()
+ mux.Handle("/healthz", webHandler{srv, webHealthz})
+ mux.Handle("/hubs", webHandler{srv, webHubs})
+ mux.Handle("/sources", webHandler{srv, webSources})
+ mux.Handle("/clients", webHandler{srv, webClients})
+ mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam})
+ mux.Handle("/updates", webHandler{srv, webUpdates})
+ mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID})
+ mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID})
+ // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. ))))
+ mux.Handle("/", webHandler{srv, webNotFound})
+
+ srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ defer s5l.Println("srv|web: interface stopped listening")
+
+ srv.webRun(ln.(*net.TCPListener))
+ }()
+ return nil
}
diff --git a/src/hub/test-srv-single b/src/hub/test-srv-single
new file mode 100755
index 0000000..902c78b
--- /dev/null
+++ b/src/hub/test-srv-single
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name> <interface>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+mkdir -p "$TEST_D"
+rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
+case "$2" in
+ pipe)
+ exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server=false -start-web-server=false
+ ;;
+ pipegram)
+ exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server=false
+ ;;
+ web)
+ exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server -web ":8000"
+ ;;
+ *)
+ "unknown interface $2"
+ return 1
+ ;;
+esac