From f5001c58096b83fa81c6582047b94aeb427a77c3 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 11 Jul 2017 14:09:03 +0200 Subject: first steps towards clean shutdown --- src/hub/src/spreadspace.org/sfive/s5srv.go | 59 +++++++++++++--------- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 4 +- .../src/spreadspace.org/sfive/s5srvForwardEs.go | 4 +- .../spreadspace.org/sfive/s5srvForwardGraphite.go | 4 +- .../src/spreadspace.org/sfive/s5srvForwardPiwik.go | 4 +- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 8 +-- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 22 +++++--- 7 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 220a7ae..b91d5c5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -33,16 +33,14 @@ package sfive import ( + "context" "errors" + "net/http" "runtime" "sync" "time" ) -var ( - ErrShutdownInProgress = errors.New("shutdown in progress") -) - type ingestToken struct { updates []*UpdateFull response chan error @@ -53,12 +51,14 @@ type Server struct { numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup - quit chan bool wgWorker *sync.WaitGroup ingestChan chan ingestToken + interfaces struct { + web *http.Server + } } -func (srv Server) transform(update *UpdateFull) *UpdateFull { +func (srv *Server) transform(update *UpdateFull) *UpdateFull { bytesSentTotal := uint(0) clients := []Client{} for _, client := range update.Data.Clients { @@ -100,13 +100,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { return update } -func (srv Server) transformMany(updates []*UpdateFull) { +func (srv *Server) transformMany(updates []*UpdateFull) { for _, update := range updates { srv.transform(update) } } -func (srv Server) ingestWorker(idx int) { +func (srv *Server) ingestWorker(idx int) { for { select { case token, ok := <-srv.ingestChan: @@ -119,31 +119,44 @@ func (srv Server) ingestWorker(idx int) { } } -func (srv Server) Ingest(update *UpdateFull) error { +func (srv *Server) Ingest(update *UpdateFull) error { return srv.IngestMany([]*UpdateFull{update}) } -func (srv Server) IngestMany(updates []*UpdateFull) error { - if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it - return ErrShutdownInProgress - } - +func (srv *Server) IngestMany(updates []*UpdateFull) error { token := ingestToken{updates: updates, response: make(chan error, 1)} defer close(token.response) srv.ingestChan <- token return <-token.response } -func (srv Server) Close() { - s5l.Printf("srv: shutting down") +func (srv *Server) shutdownInterfaces() (errors int) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + c := make(chan error) + go func() { c <- srv.webStop(ctx) }() + //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped + //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped + + errors = 0 + for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled + if err := <-c; err != nil { + s5l.Printf("srv: interface shutdown failed failed: %v", err) + errors++ + } + } + close(c) // closing channel here in the hopes that this leads to a panic + // in case the number of channel reads (for loop above) is) doesn't match the + // number of interfaces + cancel() + return +} - // TODO: enable this as soon as interfaces can be told to stop accepting data - // and yield wgInterfaces once the last client connection is done - // close(srv.quit) // tell interfaces to don't accept new data - // srv.wgInterfaces.Wait() // wait for interfaces to finish up +func (srv *Server) Close() { + s5l.Printf("srv: shutting down") - srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it - time.Sleep(time.Second) // this is quite ugly but must be good enough for now + if errors := srv.shutdownInterfaces(); errors != 0 { + s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!") + } close(srv.ingestChan) // close ingest channel to tell worker to stop srv.wgWorker.Wait() // wait for worker to finish up @@ -178,7 +191,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { if cfg.Workers > 0 { srv.numWorker = cfg.Workers } - srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works) + srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 094a1a8..904dc18 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -109,7 +109,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er return result.NumUpdates, nil } -func (srv Server) forwardRun(baseUrl string, client *http.Client) { +func (srv *Server) forwardRun(baseUrl string, client *http.Client) { url := baseUrl + "/updates/_bulk" hubUUID := srv.store.GetHubUUID() tryResync: @@ -151,7 +151,7 @@ tryResync: } } -func (srv Server) RunForwarding(cfg SFiveForwardConfig) { +func (srv *Server) RunForwarding(cfg SFiveForwardConfig) { s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 02a6470..fb56cbd 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -98,7 +98,7 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( return } -func (srv Server) forwardEsRun(baseUrl string, client *http.Client) { +func (srv *Server) forwardEsRun(baseUrl string, client *http.Client) { url := baseUrl + "/_bulk" hubUUID := srv.store.GetHubUUID() tryResync: @@ -159,7 +159,7 @@ tryResync: } } -func (srv Server) RunForwardingEs(cfg ESForwardConfig) { +func (srv *Server) RunForwardingEs(cfg ESForwardConfig) { s5l.Printf("srv|fwd-es: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd-es: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index d0c07d5..104c005 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -39,7 +39,7 @@ import ( "github.com/equinox0815/graphite-golang" ) -func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) { +func (srv *Server) forwardGraphiteRun(forwardHost string, basePath string) { tryResync: for { client, err := graphite.NewGraphiteFromAddress(forwardHost) @@ -96,7 +96,7 @@ tryResync: } } -func (srv Server) RunForwardingGraphite(cfg GraphiteForwardConfig) { +func (srv *Server) RunForwardingGraphite(cfg GraphiteForwardConfig) { s5l.Printf("srv|fwd-graphite: forwarding to '%s'", cfg.Host) defer s5l.Println("srv|fwd-graphite: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index f3f71e7..93b717b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -55,7 +55,7 @@ func fwdPiwikGetLastUpdateID(piwikURL, siteURL string, siteID uint, token string return } -func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { +func (srv *Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { // hubUuid := srv.store.GetHubUuid() tryResync: for { @@ -138,7 +138,7 @@ tryResync: } } -func (srv Server) RunForwardingPiwik(cfg PiwikForwardConfig) { +func (srv *Server) RunForwardingPiwik(cfg PiwikForwardConfig) { s5l.Printf("srv|fwd-piwik: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd-piwik: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 4755c35..7d0fbe4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -46,7 +46,7 @@ const ( // Unix Socket Interface (streams) // -func (srv Server) pipeHandle(conn net.Conn) { +func (srv *Server) pipeHandle(conn net.Conn) { defer conn.Close() dec, err := NewStatefulDecoder(conn) @@ -85,7 +85,7 @@ func (srv Server) pipeHandle(conn net.Conn) { } } -func (srv Server) ServePipe(cfg PipeInterfaceConfig) { +func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { ln, err := net.Listen("unix", cfg.ListenPath) if err != nil { s5l.Printf("server|pipe: listen() failed: %v", err) @@ -111,7 +111,7 @@ func (srv Server) ServePipe(cfg PipeInterfaceConfig) { // Unix Socket Interface (datagrams) // -func (srv Server) pipegramHandle(pconn net.PacketConn) { +func (srv *Server) pipegramHandle(pconn net.PacketConn) { buffer := make([]byte, PipegramMessageSizeLimit) for { @@ -134,7 +134,7 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { } } -func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) { +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) { pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) if err != nil { s5l.Printf("srv|pgram: listen() failed: %v", err) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 9e0b474..a112585 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -33,6 +33,7 @@ package sfive import ( + "context" "encoding/json" "fmt" "io" @@ -392,11 +393,11 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { return } tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(180 * time.Second) + tc.SetKeepAlivePeriod(30 * time.Second) return tc, nil } -func webRun(listener *net.TCPListener, srv *Server) (err error) { +func (srv *Server) webRun(listener *net.TCPListener) (err error) { mux := http.NewServeMux() mux.Handle("/healthz", webHandler{srv, webHealthz}) mux.Handle("/hubs", webHandler{srv, webHubs}) @@ -410,11 +411,20 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) mux.Handle("/", webHandler{srv, webNotFound}) - server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second} - return server.Serve(tcpKeepAliveListener{listener}) + srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} + return srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) } -func (srv Server) ServeWeb(cfg WebInterfaceConfig) { +func (srv *Server) webStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to webRun and webStop... + if srv.interfaces.web == nil { + return nil + } + s5l.Printf("srv|web: shutting down") + return srv.interfaces.web.Shutdown(ctx) +} + +func (srv *Server) ServeWeb(cfg WebInterfaceConfig) { ln, err := net.Listen("tcp", cfg.ListenAddr) if err != nil { s5l.Printf("srv|web: listen() failed: %v", err) @@ -424,5 +434,5 @@ func (srv Server) ServeWeb(cfg WebInterfaceConfig) { s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) defer s5l.Println("srv|web: interface stopped") - webRun(ln.(*net.TCPListener), &srv) + srv.webRun(ln.(*net.TCPListener)) } -- cgit v1.2.3