From f5001c58096b83fa81c6582047b94aeb427a77c3 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 11 Jul 2017 14:09:03 +0200 Subject: first steps towards clean shutdown --- src/hub/src/spreadspace.org/sfive/s5srv.go | 59 +++++++++++++--------- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 4 +- .../src/spreadspace.org/sfive/s5srvForwardEs.go | 4 +- .../spreadspace.org/sfive/s5srvForwardGraphite.go | 4 +- .../src/spreadspace.org/sfive/s5srvForwardPiwik.go | 4 +- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 8 +-- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 22 +++++--- 7 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 220a7ae..b91d5c5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -33,16 +33,14 @@ package sfive import ( + "context" "errors" + "net/http" "runtime" "sync" "time" ) -var ( - ErrShutdownInProgress = errors.New("shutdown in progress") -) - type ingestToken struct { updates []*UpdateFull response chan error @@ -53,12 +51,14 @@ type Server struct { numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup - quit chan bool wgWorker *sync.WaitGroup ingestChan chan ingestToken + interfaces struct { + web *http.Server + } } -func (srv Server) transform(update *UpdateFull) *UpdateFull { +func (srv *Server) transform(update *UpdateFull) *UpdateFull { bytesSentTotal := uint(0) clients := []Client{} for _, client := range update.Data.Clients { @@ -100,13 +100,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { return update } -func (srv Server) transformMany(updates []*UpdateFull) { +func (srv *Server) transformMany(updates []*UpdateFull) { for _, update := range updates { srv.transform(update) } } -func (srv Server) ingestWorker(idx int) { +func (srv *Server) ingestWorker(idx int) { for { select { case token, ok := <-srv.ingestChan: @@ -119,31 +119,44 @@ func (srv Server) ingestWorker(idx int) { } } -func (srv Server) Ingest(update *UpdateFull) error { +func (srv *Server) Ingest(update *UpdateFull) error { return srv.IngestMany([]*UpdateFull{update}) } -func (srv Server) IngestMany(updates []*UpdateFull) error { - if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it - return ErrShutdownInProgress - } - +func (srv *Server) IngestMany(updates []*UpdateFull) error { token := ingestToken{updates: updates, response: make(chan error, 1)} defer close(token.response) srv.ingestChan <- token return <-token.response } -func (srv Server) Close() { - s5l.Printf("srv: shutting down") +func (srv *Server) shutdownInterfaces() (errors int) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + c := make(chan error) + go func() { c <- srv.webStop(ctx) }() + //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped + //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped + + errors = 0 + for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled + if err := <-c; err != nil { + s5l.Printf("srv: interface shutdown failed failed: %v", err) + errors++ + } + } + close(c) // closing channel here in the hopes that this leads to a panic + // in case the number of channel reads (for loop above) is) doesn't match the + // number of interfaces + cancel() + return +} - // TODO: enable this as soon as interfaces can be told to stop accepting data - // and yield wgInterfaces once the last client connection is done - // close(srv.quit) // tell interfaces to don't accept new data - // srv.wgInterfaces.Wait() // wait for interfaces to finish up +func (srv *Server) Close() { + s5l.Printf("srv: shutting down") - srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it - time.Sleep(time.Second) // this is quite ugly but must be good enough for now + if errors := srv.shutdownInterfaces(); errors != 0 { + s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!") + } close(srv.ingestChan) // close ingest channel to tell worker to stop srv.wgWorker.Wait() // wait for worker to finish up @@ -178,7 +191,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { if cfg.Workers > 0 { srv.numWorker = cfg.Workers } - srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works) + srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 094a1a8..904dc18 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -109,7 +109,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er return result.NumUpdates, nil } -func (srv Server) forwardRun(baseUrl string, client *http.Client) { +func (srv *Server) forwardRun(baseUrl string, client *http.Client) { url := baseUrl + "/updates/_bulk" hubUUID := srv.store.GetHubUUID() tryResync: @@ -151,7 +151,7 @@ tryResync: } } -func (srv Server) RunForwarding(cfg SFiveForwardConfig) { +func (srv *Server) RunForwarding(cfg SFiveForwardConfig) { s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 02a6470..fb56cbd 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -98,7 +98,7 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( return } -func (srv Server) forwardEsRun(baseUrl string, client *http.Client) { +func (srv *Server) forwardEsRun(baseUrl string, client *http.Client) { url := baseUrl + "/_bulk" hubUUID := srv.store.GetHubUUID() tryResync: @@ -159,7 +159,7 @@ tryResync: } } -func (srv Server) RunForwardingEs(cfg ESForwardConfig) { +func (srv *Server) RunForwardingEs(cfg ESForwardConfig) { s5l.Printf("srv|fwd-es: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd-es: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index d0c07d5..104c005 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -39,7 +39,7 @@ import ( "github.com/equinox0815/graphite-golang" ) -func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) { +func (srv *Server) forwardGraphiteRun(forwardHost string, basePath string) { tryResync: for { client, err := graphite.NewGraphiteFromAddress(forwardHost) @@ -96,7 +96,7 @@ tryResync: } } -func (srv Server) RunForwardingGraphite(cfg GraphiteForwardConfig) { +func (srv *Server) RunForwardingGraphite(cfg GraphiteForwardConfig) { s5l.Printf("srv|fwd-graphite: forwarding to '%s'", cfg.Host) defer s5l.Println("srv|fwd-graphite: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index f3f71e7..93b717b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -55,7 +55,7 @@ func fwdPiwikGetLastUpdateID(piwikURL, siteURL string, siteID uint, token string return } -func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { +func (srv *Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { // hubUuid := srv.store.GetHubUuid() tryResync: for { @@ -138,7 +138,7 @@ tryResync: } } -func (srv Server) RunForwardingPiwik(cfg PiwikForwardConfig) { +func (srv *Server) RunForwardingPiwik(cfg PiwikForwardConfig) { s5l.Printf("srv|fwd-piwik: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd-piwik: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 4755c35..7d0fbe4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -46,7 +46,7 @@ const ( // Unix Socket Interface (streams) // -func (srv Server) pipeHandle(conn net.Conn) { +func (srv *Server) pipeHandle(conn net.Conn) { defer conn.Close() dec, err := NewStatefulDecoder(conn) @@ -85,7 +85,7 @@ func (srv Server) pipeHandle(conn net.Conn) { } } -func (srv Server) ServePipe(cfg PipeInterfaceConfig) { +func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { ln, err := net.Listen("unix", cfg.ListenPath) if err != nil { s5l.Printf("server|pipe: listen() failed: %v", err) @@ -111,7 +111,7 @@ func (srv Server) ServePipe(cfg PipeInterfaceConfig) { // Unix Socket Interface (datagrams) // -func (srv Server) pipegramHandle(pconn net.PacketConn) { +func (srv *Server) pipegramHandle(pconn net.PacketConn) { buffer := make([]byte, PipegramMessageSizeLimit) for { @@ -134,7 +134,7 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { } } -func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) { +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) { pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) if err != nil { s5l.Printf("srv|pgram: listen() failed: %v", err) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 9e0b474..a112585 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -33,6 +33,7 @@ package sfive import ( + "context" "encoding/json" "fmt" "io" @@ -392,11 +393,11 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { return } tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(180 * time.Second) + tc.SetKeepAlivePeriod(30 * time.Second) return tc, nil } -func webRun(listener *net.TCPListener, srv *Server) (err error) { +func (srv *Server) webRun(listener *net.TCPListener) (err error) { mux := http.NewServeMux() mux.Handle("/healthz", webHandler{srv, webHealthz}) mux.Handle("/hubs", webHandler{srv, webHubs}) @@ -410,11 +411,20 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) mux.Handle("/", webHandler{srv, webNotFound}) - server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second} - return server.Serve(tcpKeepAliveListener{listener}) + srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} + return srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) } -func (srv Server) ServeWeb(cfg WebInterfaceConfig) { +func (srv *Server) webStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to webRun and webStop... + if srv.interfaces.web == nil { + return nil + } + s5l.Printf("srv|web: shutting down") + return srv.interfaces.web.Shutdown(ctx) +} + +func (srv *Server) ServeWeb(cfg WebInterfaceConfig) { ln, err := net.Listen("tcp", cfg.ListenAddr) if err != nil { s5l.Printf("srv|web: listen() failed: %v", err) @@ -424,5 +434,5 @@ func (srv Server) ServeWeb(cfg WebInterfaceConfig) { s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) defer s5l.Println("srv|web: interface stopped") - webRun(ln.(*net.TCPListener), &srv) + srv.webRun(ln.(*net.TCPListener)) } -- cgit v1.2.3 From 92d690733dee358f9c74e4ef1b6afdd3176a4dad Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 13 Jul 2017 12:07:32 +0200 Subject: cleaner shutdown for pipgram interface --- src/daq/s5proxy/src/s5proxy/proxy.go | 2 +- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 71 +++-------------------- src/hub/src/spreadspace.org/sfive/s5srv.go | 79 +++++++++++++++++++++++--- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 51 ++++++++++------- 4 files changed, 112 insertions(+), 91 deletions(-) diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index 3bb0afb..f994982 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -212,7 +212,7 @@ func (p *Proxy) Run() error { httpsL := m.Match(cmux.Any()) go p.RunHTTPS(httpsL) - if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { + if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? return err } return nil diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index ba34697..9b28270 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -37,7 +37,7 @@ import ( "log" "os" "os/signal" - "sync" + "syscall" "spreadspace.org/sfive" ) @@ -100,64 +100,10 @@ func main() { if err != nil { s5hl.Fatalf(err.Error()) } - defer srv.Close() - var wg sync.WaitGroup - - if cfg.Interfaces.Pipe.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipe(cfg.Interfaces.Pipe) - }() - } - - if cfg.Interfaces.Pipegram.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipegram(cfg.Interfaces.Pipegram) - }() - } - - if cfg.Interfaces.Web.ListenAddr != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServeWeb(cfg.Interfaces.Web) - }() - } - - if cfg.Forwards.SFive.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwarding(cfg.Forwards.SFive) - }() - } - - if cfg.Forwards.Elasticsearch.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingEs(cfg.Forwards.Elasticsearch) - }() - } - - if cfg.Forwards.Graphite.Host != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingGraphite(cfg.Forwards.Graphite) - }() - } - - if cfg.Forwards.Piwik.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingPiwik(cfg.Forwards.Piwik) - }() + wg, err := srv.Start() + if err != nil { + s5hl.Fatalf(err.Error()) } alldone := make(chan bool) @@ -167,13 +113,12 @@ func main() { }() c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) select { - case <-c: - s5hl.Println("received interrupt, shutdown") - return + case sig := <-c: + s5hl.Printf("signal(%v) received, shutting down", sig) case <-alldone: - return } + srv.Shutdown() } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index b91d5c5..e8778df 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -35,6 +35,7 @@ package sfive import ( "context" "errors" + "net" "net/http" "runtime" "sync" @@ -47,6 +48,7 @@ type ingestToken struct { } type Server struct { + cfg SrvConfig store *Store numWorker int anonymization AnonymizationAlgo @@ -54,7 +56,8 @@ type Server struct { wgWorker *sync.WaitGroup ingestChan chan ingestToken interfaces struct { - web *http.Server + web *http.Server + pipegram net.PacketConn } } @@ -134,24 +137,83 @@ func (srv *Server) shutdownInterfaces() (errors int) { ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) c := make(chan error) go func() { c <- srv.webStop(ctx) }() - //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped - //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped + //go func() { c <- srv.pipeStop(ctx) }() // TODO: add this as soon as pipe interface can be stopped + go func() { c <- srv.pipegramStop(ctx) }() errors = 0 - for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled + for i := 0; i < 2; i++ { // TODO: set limit to 3 when the above has been enabled if err := <-c; err != nil { s5l.Printf("srv: interface shutdown failed failed: %v", err) errors++ } } close(c) // closing channel here in the hopes that this leads to a panic - // in case the number of channel reads (for loop above) is) doesn't match the - // number of interfaces + // in case the number of channel reads (for loop above) doesn't match the + // number of interfaces to be stopped cancel() return } -func (srv *Server) Close() { +func (srv *Server) Start() (wg sync.WaitGroup, err error) { + if srv.cfg.Interfaces.Pipe.ListenPath != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.ServePipe(srv.cfg.Interfaces.Pipe) + }() + } + + if srv.cfg.Interfaces.Pipegram.ListenPath != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.ServePipegram(srv.cfg.Interfaces.Pipegram) + }() + } + + if srv.cfg.Interfaces.Web.ListenAddr != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.ServeWeb(srv.cfg.Interfaces.Web) + }() + } + + if srv.cfg.Forwards.SFive.URL != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwarding(srv.cfg.Forwards.SFive) + }() + } + + if srv.cfg.Forwards.Elasticsearch.URL != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) + }() + } + + if srv.cfg.Forwards.Graphite.Host != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) + }() + } + + if srv.cfg.Forwards.Piwik.URL != "" { + wg.Add(1) + go func() { + defer wg.Done() + srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) + }() + } + return wg, nil +} + +func (srv *Server) Shutdown() { s5l.Printf("srv: shutting down") if errors := srv.shutdownInterfaces(); errors != 0 { @@ -160,13 +222,14 @@ func (srv *Server) Close() { close(srv.ingestChan) // close ingest channel to tell worker to stop srv.wgWorker.Wait() // wait for worker to finish up + s5l.Printf("srv: all worker stopped") srv.store.Close() s5l.Printf("srv: finished") } func NewServer(cfg SrvConfig) (srv *Server, err error) { - srv = &Server{} + srv = &Server{cfg: cfg} if srv.store, err = NewStore(cfg.Store); err != nil { return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 7d0fbe4..6cf2bc4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -34,8 +34,10 @@ package sfive import ( "bytes" + "context" "io" "net" + "strings" ) const ( @@ -51,13 +53,13 @@ func (srv *Server) pipeHandle(conn net.Conn) { dec, err := NewStatefulDecoder(conn) if err != nil { - s5l.Printf("server|pipe: read(init) failed: %v\n", err) + s5l.Printf("srv|pipe: read(init) failed: %v\n", err) return } slug := dec.Slug() - s5l.Printf("server|pipe: new connection: %s\n", slug) - defer s5l.Printf("server|pipe(%s): connection closed\n", slug) + s5l.Printf("srv|pipe: new connection: %s\n", slug) + defer s5l.Printf("srv|pipe(%s): connection closed\n", slug) for { update, err := dec.Decode() @@ -69,15 +71,15 @@ func (srv *Server) pipeHandle(conn net.Conn) { opErr, isOpErr := err.(*net.OpError) if isOpErr && opErr.Temporary() { - s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) + s5l.Printf("srv|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) } else { - s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err) + s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) break } } if err = srv.Ingest(update); err != nil { - s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err) + s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) // TODO: send NACK? break } @@ -88,18 +90,18 @@ func (srv *Server) pipeHandle(conn net.Conn) { func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { ln, err := net.Listen("unix", cfg.ListenPath) if err != nil { - s5l.Printf("server|pipe: listen() failed: %v", err) + s5l.Printf("srv|pipe: listen() failed: %v", err) return } defer ln.Close() - s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pipe: interface stopped") + s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pipe: interface stopped") for { conn, err := ln.Accept() if err != nil { - s5l.Printf("server|pipe: accept() failed: %v", err) + s5l.Printf("srv|pipe: accept() failed: %v", err) // ignore continue } @@ -111,12 +113,15 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { // Unix Socket Interface (datagrams) // -func (srv *Server) pipegramHandle(pconn net.PacketConn) { +func (srv *Server) pipegramRun() { buffer := make([]byte, PipegramMessageSizeLimit) for { - n, _, err := pconn.ReadFrom(buffer) + n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } s5l.Printf("srv|pgram: read() failed: %v", err) continue } @@ -134,16 +139,24 @@ func (srv *Server) pipegramHandle(pconn net.PacketConn) { } } -func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) { - pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) - if err != nil { +func (srv *Server) pipegramStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to webRun and webStop... + if srv.interfaces.pipegram == nil { + return nil + } + s5l.Printf("srv|pgram: shutting down") + return srv.interfaces.pipegram.Close() +} + +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { + if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { s5l.Printf("srv|pgram: listen() failed: %v", err) return } - defer pconn.Close() - s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pgram: interface stopped") + s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pgram: interface stopped") - srv.pipegramHandle(pconn) + srv.pipegramRun() + return } -- cgit v1.2.3 From 6e69590369f2410127a0061902361025b7aa7c12 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 13 Jul 2017 22:19:36 +0200 Subject: move pipgram into separate file --- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 63 --------------- src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 92 ++++++++++++++++++++++ 2 files changed, 92 insertions(+), 63 deletions(-) create mode 100644 src/hub/src/spreadspace.org/sfive/s5srvPipegram.go diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 6cf2bc4..20e3085 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -33,21 +33,10 @@ package sfive import ( - "bytes" - "context" "io" "net" - "strings" ) -const ( - PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? -) - -// -// Unix Socket Interface (streams) -// - func (srv *Server) pipeHandle(conn net.Conn) { defer conn.Close() @@ -108,55 +97,3 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { go srv.pipeHandle(conn) } } - -// -// Unix Socket Interface (datagrams) -// - -func (srv *Server) pipegramRun() { - buffer := make([]byte, PipegramMessageSizeLimit) - - for { - n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) - if err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? - return - } - s5l.Printf("srv|pgram: read() failed: %v", err) - continue - } - data := buffer[0:n] - - update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() - if err != nil { - s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) - continue - } - - if err = srv.Ingest(update); err != nil { - s5l.Printf("srv|pgram: storing data failed: %v\n", err) - } - } -} - -func (srv *Server) pipegramStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to webRun and webStop... - if srv.interfaces.pipegram == nil { - return nil - } - s5l.Printf("srv|pgram: shutting down") - return srv.interfaces.pipegram.Close() -} - -func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { - if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { - s5l.Printf("srv|pgram: listen() failed: %v", err) - return - } - - s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pgram: interface stopped") - - srv.pipegramRun() - return -} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go new file mode 100644 index 0000000..b2807ed --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -0,0 +1,92 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner +// Markus Grüneis +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see . +// + +package sfive + +import ( + "bytes" + "context" + "net" + "strings" +) + +const ( + PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? +) + +func (srv *Server) pipegramRun() { + buffer := make([]byte, PipegramMessageSizeLimit) + + for { + n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } + s5l.Printf("srv|pgram: read() failed: %v", err) + continue + } + data := buffer[0:n] + + update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() + if err != nil { + s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) + continue + } + + if err = srv.Ingest(update); err != nil { + s5l.Printf("srv|pgram: storing data failed: %v\n", err) + } + } +} + +func (srv *Server) pipegramStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to pipegramRun and pipegramStop... + if srv.interfaces.pipegram == nil { + return nil + } + s5l.Printf("srv|pgram: shutting down") + return srv.interfaces.pipegram.Close() +} + +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { + if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { + s5l.Printf("srv|pgram: listen() failed: %v", err) + return + } + + s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pgram: interface stopped") + + srv.pipegramRun() + return +} -- cgit v1.2.3 From a4146ac1faf5937741c2eb09ca2cc35be3b7b1f9 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 13 Jul 2017 22:52:53 +0200 Subject: added (stupid) shutdown for pipe interface --- src/hub/src/spreadspace.org/sfive/s5srv.go | 7 +++-- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 42 ++++++++++++++++++-------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e8778df..6742b6f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -56,8 +56,9 @@ type Server struct { wgWorker *sync.WaitGroup ingestChan chan ingestToken interfaces struct { - web *http.Server + pipe net.Listener pipegram net.PacketConn + web *http.Server } } @@ -137,11 +138,11 @@ func (srv *Server) shutdownInterfaces() (errors int) { ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) c := make(chan error) go func() { c <- srv.webStop(ctx) }() - //go func() { c <- srv.pipeStop(ctx) }() // TODO: add this as soon as pipe interface can be stopped + go func() { c <- srv.pipeStop(ctx) }() go func() { c <- srv.pipegramStop(ctx) }() errors = 0 - for i := 0; i < 2; i++ { // TODO: set limit to 3 when the above has been enabled + for i := 0; i < 3; i++ { if err := <-c; err != nil { s5l.Printf("srv: interface shutdown failed failed: %v", err) errors++ diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 20e3085..c291513 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -33,8 +33,10 @@ package sfive import ( + "context" "io" "net" + "strings" ) func (srv *Server) pipeHandle(conn net.Conn) { @@ -76,20 +78,13 @@ func (srv *Server) pipeHandle(conn net.Conn) { } } -func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { - ln, err := net.Listen("unix", cfg.ListenPath) - if err != nil { - s5l.Printf("srv|pipe: listen() failed: %v", err) - return - } - defer ln.Close() - - s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pipe: interface stopped") - +func (srv *Server) pipeRun() { for { - conn, err := ln.Accept() + conn, err := srv.interfaces.pipe.Accept() if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } s5l.Printf("srv|pipe: accept() failed: %v", err) // ignore continue @@ -97,3 +92,26 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) { go srv.pipeHandle(conn) } } + +func (srv *Server) pipeStop(ctx context.Context) (err error) { + // TODO: this is a race condition between a call to pipeRun and pipeStop... + if srv.interfaces.pipe == nil { + return nil + } + s5l.Printf("srv|pipe: shutting down") + return srv.interfaces.pipe.Close() + // TODO: also close alle open client file descriptors +} + +func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { + if srv.interfaces.pipe, err = net.Listen("unix", cfg.ListenPath); err != nil { + s5l.Printf("srv|pipe: listen() failed: %v", err) + return + } + + s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) + defer s5l.Println("srv|pipe: interface stopped") + + srv.pipeRun() + return +} -- cgit v1.2.3 From f67e238d3a0f96affad921d19d8f469153287b70 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 14 Jul 2017 01:19:07 +0200 Subject: interface shutdown almost clean now --- src/hub/src/spreadspace.org/sfive/s5srv.go | 137 +++++++++------------ src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 23 +++- src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 13 +- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 47 +++---- src/hub/test-srv-single | 27 ++++ 5 files changed, 138 insertions(+), 109 deletions(-) create mode 100755 src/hub/test-srv-single diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 6742b6f..e45db69 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -48,18 +48,20 @@ type ingestToken struct { } type Server struct { - cfg SrvConfig - store *Store - numWorker int - anonymization AnonymizationAlgo - geoip GeoIPLookup - wgWorker *sync.WaitGroup - ingestChan chan ingestToken - interfaces struct { + cfg SrvConfig + store *Store + wgInterfaces sync.WaitGroup + interfaces struct { pipe net.Listener pipegram net.PacketConn web *http.Server } + numWorker int + anonymization AnonymizationAlgo + geoip GeoIPLookup + wgWorker sync.WaitGroup + ingestChan chan ingestToken + wgForwarder sync.WaitGroup } func (srv *Server) transform(update *UpdateFull) *UpdateFull { @@ -134,95 +136,77 @@ func (srv *Server) IngestMany(updates []*UpdateFull) error { return <-token.response } -func (srv *Server) shutdownInterfaces() (errors int) { - ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) - c := make(chan error) - go func() { c <- srv.webStop(ctx) }() - go func() { c <- srv.pipeStop(ctx) }() - go func() { c <- srv.pipegramStop(ctx) }() - - errors = 0 - for i := 0; i < 3; i++ { - if err := <-c; err != nil { - s5l.Printf("srv: interface shutdown failed failed: %v", err) - errors++ - } - } - close(c) // closing channel here in the hopes that this leads to a panic - // in case the number of channel reads (for loop above) doesn't match the - // number of interfaces to be stopped - cancel() - return -} - -func (srv *Server) Start() (wg sync.WaitGroup, err error) { +func (srv *Server) Start() (wg *sync.WaitGroup, err error) { if srv.cfg.Interfaces.Pipe.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipe(srv.cfg.Interfaces.Pipe) - }() + srv.ServePipe(srv.cfg.Interfaces.Pipe) } - if srv.cfg.Interfaces.Pipegram.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipegram(srv.cfg.Interfaces.Pipegram) - }() + srv.ServePipegram(srv.cfg.Interfaces.Pipegram) } - if srv.cfg.Interfaces.Web.ListenAddr != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServeWeb(srv.cfg.Interfaces.Web) - }() + srv.ServeWeb(srv.cfg.Interfaces.Web) } + // TODO: forwarder need a clean shutdown as well!!! if srv.cfg.Forwards.SFive.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwarding(srv.cfg.Forwards.SFive) - }() + go srv.RunForwarding(srv.cfg.Forwards.SFive) } - if srv.cfg.Forwards.Elasticsearch.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) - }() + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) } - if srv.cfg.Forwards.Graphite.Host != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) - }() + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) } - if srv.cfg.Forwards.Piwik.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) - }() + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) } - return wg, nil + + return &srv.wgInterfaces, nil +} + +func (srv *Server) shutdownInterfaces() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.webStop(ctx); err != nil { + s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipeStop(ctx); err != nil { + s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipegramStop(ctx); err != nil { + s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + } + }() + + s5l.Printf("srv: waiting for all clients to disconnect") + defer s5l.Printf("srv: all clients are now disconnected") + srv.wgInterfaces.Wait() + return } +// must not be called before Start() func (srv *Server) Shutdown() { s5l.Printf("srv: shutting down") - if errors := srv.shutdownInterfaces(); errors != 0 { - s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!") - } + srv.shutdownInterfaces() - close(srv.ingestChan) // close ingest channel to tell worker to stop - srv.wgWorker.Wait() // wait for worker to finish up + s5l.Printf("srv: shutting down worker") + close(srv.ingestChan) + srv.wgWorker.Wait() s5l.Printf("srv: all worker stopped") srv.store.Close() @@ -256,7 +240,6 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { srv.numWorker = cfg.Workers } - srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { srv.wgWorker.Add(1) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index c291513..46a83cc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -37,6 +37,7 @@ import ( "io" "net" "strings" + "sync" ) func (srv *Server) pipeHandle(conn net.Conn) { @@ -79,28 +80,34 @@ func (srv *Server) pipeHandle(conn net.Conn) { } func (srv *Server) pipeRun() { + var wgClients sync.WaitGroup for { conn, err := srv.interfaces.pipe.Accept() if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? - return + break } s5l.Printf("srv|pipe: accept() failed: %v", err) // ignore continue } - go srv.pipeHandle(conn) + wgClients.Add(1) + go func() { + defer wgClients.Done() + srv.pipeHandle(conn) + }() } + s5l.Println("srv|pipe: interface stopped listening") + // TODO: tell all clients to disconnect!!! + wgClients.Wait() } func (srv *Server) pipeStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to pipeRun and pipeStop... if srv.interfaces.pipe == nil { return nil } s5l.Printf("srv|pipe: shutting down") return srv.interfaces.pipe.Close() - // TODO: also close alle open client file descriptors } func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { @@ -110,8 +117,12 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { } s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pipe: interface stopped") - srv.pipeRun() + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + + srv.pipeRun() + }() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go index b2807ed..65ff0e0 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -40,12 +40,11 @@ import ( ) const ( - PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? + PipegramMessageSizeLimit = 1024 * 1024 ) func (srv *Server) pipegramRun() { buffer := make([]byte, PipegramMessageSizeLimit) - for { n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) if err != nil { @@ -70,7 +69,6 @@ func (srv *Server) pipegramRun() { } func (srv *Server) pipegramStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to pipegramRun and pipegramStop... if srv.interfaces.pipegram == nil { return nil } @@ -85,8 +83,13 @@ func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { } s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("srv|pgram: interface stopped") - srv.pipegramRun() + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + defer s5l.Println("srv|pgram: interface stopped listening") + + srv.pipegramRun() + }() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index a112585..65e677a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -397,26 +397,11 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { return tc, nil } -func (srv *Server) webRun(listener *net.TCPListener) (err error) { - mux := http.NewServeMux() - mux.Handle("/healthz", webHandler{srv, webHealthz}) - mux.Handle("/hubs", webHandler{srv, webHubs}) - mux.Handle("/sources", webHandler{srv, webSources}) - mux.Handle("/clients", webHandler{srv, webClients}) - mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) - mux.Handle("/updates", webHandler{srv, webUpdates}) - mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) - mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID}) - - // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) - mux.Handle("/", webHandler{srv, webNotFound}) - - srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} - return srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) +func (srv *Server) webRun(listener *net.TCPListener) { + srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) } func (srv *Server) webStop(ctx context.Context) (err error) { - // TODO: this is a race condition between a call to webRun and webStop... if srv.interfaces.web == nil { return nil } @@ -424,15 +409,35 @@ func (srv *Server) webStop(ctx context.Context) (err error) { return srv.interfaces.web.Shutdown(ctx) } -func (srv *Server) ServeWeb(cfg WebInterfaceConfig) { +func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error { ln, err := net.Listen("tcp", cfg.ListenAddr) if err != nil { s5l.Printf("srv|web: listen() failed: %v", err) - return + return err } s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) - defer s5l.Println("srv|web: interface stopped") - srv.webRun(ln.(*net.TCPListener)) + mux := http.NewServeMux() + mux.Handle("/healthz", webHandler{srv, webHealthz}) + mux.Handle("/hubs", webHandler{srv, webHubs}) + mux.Handle("/sources", webHandler{srv, webSources}) + mux.Handle("/clients", webHandler{srv, webClients}) + mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) + mux.Handle("/updates", webHandler{srv, webUpdates}) + mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) + mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID}) + // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) + mux.Handle("/", webHandler{srv, webNotFound}) + + srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + defer s5l.Println("srv|web: interface stopped listening") + + srv.webRun(ln.(*net.TCPListener)) + }() + return nil } diff --git a/src/hub/test-srv-single b/src/hub/test-srv-single new file mode 100755 index 0000000..902c78b --- /dev/null +++ b/src/hub/test-srv-single @@ -0,0 +1,27 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +case "$2" in + pipe) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server=false -start-web-server=false + ;; + pipegram) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server=false + ;; + web) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server -web ":8000" + ;; + *) + "unknown interface $2" + return 1 + ;; +esac -- cgit v1.2.3 From 50b3bb1acf202f7dc858618b61b9d0a608176796 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 15 Jul 2017 04:55:20 +0200 Subject: pipe client connections can now be killed on shutdown --- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 70 +++++++++++++++++--------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 46a83cc..2a6a200 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -38,49 +38,71 @@ import ( "net" "strings" "sync" + "time" ) -func (srv *Server) pipeHandle(conn net.Conn) { +func (srv *Server) pipeRead(dec Decoder, updateCH chan<- *UpdateFull, errorCH chan<- error) { + for { + update, err := dec.Decode() + if err != nil { + if err == io.EOF { + close(updateCH) + return + } + opErr, isOpErr := err.(*net.OpError) + if isOpErr && opErr.Temporary() { + continue + } + errorCH <- err + return + } + updateCH <- update + } +} + +func (srv *Server) pipeHandle(conn net.Conn, quit <-chan bool) { defer conn.Close() + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) dec, err := NewStatefulDecoder(conn) if err != nil { s5l.Printf("srv|pipe: read(init) failed: %v\n", err) return } + conn.SetReadDeadline(time.Time{}) slug := dec.Slug() s5l.Printf("srv|pipe: new connection: %s\n", slug) defer s5l.Printf("srv|pipe(%s): connection closed\n", slug) + updateCH := make(chan *UpdateFull) + errorCH := make(chan error) + go srv.pipeRead(dec, updateCH, errorCH) + for { - update, err := dec.Decode() - if err != nil { - if err == io.EOF { - break + select { + case update := <-updateCH: + if update == nil { + return } - // TODO: send NACK? - - opErr, isOpErr := err.(*net.OpError) - if isOpErr && opErr.Temporary() { - s5l.Printf("srv|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) - } else { - s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) - break + if err = srv.Ingest(update); err != nil { + s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) + // TODO: send NACK + return } + // TODO: send ACK + case err := <-errorCH: + s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) + return + case <-quit: + return } - - if err = srv.Ingest(update); err != nil { - s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) - // TODO: send NACK? - break - } - // TODO: send ACK? } } func (srv *Server) pipeRun() { - var wgClients sync.WaitGroup + wgClients := &sync.WaitGroup{} + quit := make(chan bool) for { conn, err := srv.interfaces.pipe.Accept() if err != nil { @@ -88,17 +110,17 @@ func (srv *Server) pipeRun() { break } s5l.Printf("srv|pipe: accept() failed: %v", err) - // ignore + // TODO: ignore ... or is this permanent? continue } wgClients.Add(1) go func() { defer wgClients.Done() - srv.pipeHandle(conn) + srv.pipeHandle(conn, quit) }() } s5l.Println("srv|pipe: interface stopped listening") - // TODO: tell all clients to disconnect!!! + close(quit) wgClients.Wait() } -- cgit v1.2.3