diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/proxy.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 71 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 118 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 151 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 95 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 47 | ||||
-rwxr-xr-x | src/hub/test-srv-single | 27 |
11 files changed, 335 insertions, 192 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go index 3bb0afb..f994982 100644 --- a/src/daq/s5proxy/src/s5proxy/proxy.go +++ b/src/daq/s5proxy/src/s5proxy/proxy.go @@ -212,7 +212,7 @@ func (p *Proxy) Run() error { httpsL := m.Match(cmux.Any()) go p.RunHTTPS(httpsL) - if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { + if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? return err } return nil diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index ba34697..9b28270 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -37,7 +37,7 @@ import ( "log" "os" "os/signal" - "sync" + "syscall" "spreadspace.org/sfive" ) @@ -100,64 +100,10 @@ func main() { if err != nil { s5hl.Fatalf(err.Error()) } - defer srv.Close() - var wg sync.WaitGroup - - if cfg.Interfaces.Pipe.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipe(cfg.Interfaces.Pipe) - }() - } - - if cfg.Interfaces.Pipegram.ListenPath != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServePipegram(cfg.Interfaces.Pipegram) - }() - } - - if cfg.Interfaces.Web.ListenAddr != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.ServeWeb(cfg.Interfaces.Web) - }() - } - - if cfg.Forwards.SFive.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwarding(cfg.Forwards.SFive) - }() - } - - if cfg.Forwards.Elasticsearch.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingEs(cfg.Forwards.Elasticsearch) - }() - } - - if cfg.Forwards.Graphite.Host != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingGraphite(cfg.Forwards.Graphite) - }() - } - - if cfg.Forwards.Piwik.URL != "" { - wg.Add(1) - go func() { - defer wg.Done() - srv.RunForwardingPiwik(cfg.Forwards.Piwik) - }() + wg, err := srv.Start() + if err != nil { + s5hl.Fatalf(err.Error()) } alldone := make(chan bool) @@ -167,13 +113,12 @@ func main() { }() c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) select { - case <-c: - s5hl.Println("received interrupt, shutdown") - return + case sig := <-c: + s5hl.Printf("signal(%v) received, shutting down", sig) case <-alldone: - return } + srv.Shutdown() } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 220a7ae..e45db69 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -33,32 +33,38 @@ package sfive import ( + "context" "errors" + "net" + "net/http" "runtime" "sync" "time" ) -var ( - ErrShutdownInProgress = errors.New("shutdown in progress") -) - type ingestToken struct { updates []*UpdateFull response chan error } type Server struct { - store *Store + cfg SrvConfig + store *Store + wgInterfaces sync.WaitGroup + interfaces struct { + pipe net.Listener + pipegram net.PacketConn + web *http.Server + } numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup - quit chan bool - wgWorker *sync.WaitGroup + wgWorker sync.WaitGroup ingestChan chan ingestToken + wgForwarder sync.WaitGroup } -func (srv Server) transform(update *UpdateFull) *UpdateFull { +func (srv *Server) transform(update *UpdateFull) *UpdateFull { bytesSentTotal := uint(0) clients := []Client{} for _, client := range update.Data.Clients { @@ -100,13 +106,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { return update } -func (srv Server) transformMany(updates []*UpdateFull) { +func (srv *Server) transformMany(updates []*UpdateFull) { for _, update := range updates { srv.transform(update) } } -func (srv Server) ingestWorker(idx int) { +func (srv *Server) ingestWorker(idx int) { for { select { case token, ok := <-srv.ingestChan: @@ -119,41 +125,96 @@ func (srv Server) ingestWorker(idx int) { } } -func (srv Server) Ingest(update *UpdateFull) error { +func (srv *Server) Ingest(update *UpdateFull) error { return srv.IngestMany([]*UpdateFull{update}) } -func (srv Server) IngestMany(updates []*UpdateFull) error { - if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it - return ErrShutdownInProgress - } - +func (srv *Server) IngestMany(updates []*UpdateFull) error { token := ingestToken{updates: updates, response: make(chan error, 1)} defer close(token.response) srv.ingestChan <- token return <-token.response } -func (srv Server) Close() { - s5l.Printf("srv: shutting down") +func (srv *Server) Start() (wg *sync.WaitGroup, err error) { + if srv.cfg.Interfaces.Pipe.ListenPath != "" { + srv.ServePipe(srv.cfg.Interfaces.Pipe) + } + if srv.cfg.Interfaces.Pipegram.ListenPath != "" { + srv.ServePipegram(srv.cfg.Interfaces.Pipegram) + } + if srv.cfg.Interfaces.Web.ListenAddr != "" { + srv.ServeWeb(srv.cfg.Interfaces.Web) + } + + // TODO: forwarder need a clean shutdown as well!!! + if srv.cfg.Forwards.SFive.URL != "" { + go srv.RunForwarding(srv.cfg.Forwards.SFive) + } + if srv.cfg.Forwards.Elasticsearch.URL != "" { + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) + } + if srv.cfg.Forwards.Graphite.Host != "" { + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) + } + if srv.cfg.Forwards.Piwik.URL != "" { + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) + } - // TODO: enable this as soon as interfaces can be told to stop accepting data - // and yield wgInterfaces once the last client connection is done - // close(srv.quit) // tell interfaces to don't accept new data - // srv.wgInterfaces.Wait() // wait for interfaces to finish up + return &srv.wgInterfaces, nil +} + +func (srv *Server) shutdownInterfaces() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.webStop(ctx); err != nil { + s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipeStop(ctx); err != nil { + s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + } + }() + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + if err := srv.pipegramStop(ctx); err != nil { + s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + } + }() - srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it - time.Sleep(time.Second) // this is quite ugly but must be good enough for now + s5l.Printf("srv: waiting for all clients to disconnect") + defer s5l.Printf("srv: all clients are now disconnected") + srv.wgInterfaces.Wait() + return +} - close(srv.ingestChan) // close ingest channel to tell worker to stop - srv.wgWorker.Wait() // wait for worker to finish up +// must not be called before Start() +func (srv *Server) Shutdown() { + s5l.Printf("srv: shutting down") + + srv.shutdownInterfaces() + + s5l.Printf("srv: shutting down worker") + close(srv.ingestChan) + srv.wgWorker.Wait() + s5l.Printf("srv: all worker stopped") srv.store.Close() s5l.Printf("srv: finished") } func NewServer(cfg SrvConfig) (srv *Server, err error) { - srv = &Server{} + srv = &Server{cfg: cfg} if srv.store, err = NewStore(cfg.Store); err != nil { return } @@ -178,8 +239,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { if cfg.Workers > 0 { srv.numWorker = cfg.Workers } - srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works) - srv.wgWorker = &sync.WaitGroup{} + srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { srv.wgWorker.Add(1) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 094a1a8..904dc18 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -109,7 +109,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er return result.NumUpdates, nil } -func (srv Server) forwardRun(baseUrl string, client *http.Client) { +func (srv *Server) forwardRun(baseUrl string, client *http.Client) { url := baseUrl + "/updates/_bulk" hubUUID := srv.store.GetHubUUID() tryResync: @@ -151,7 +151,7 @@ tryResync: } } -func (srv Server) RunForwarding(cfg SFiveForwardConfig) { +func (srv *Server) RunForwarding(cfg SFiveForwardConfig) { s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 02a6470..fb56cbd 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -98,7 +98,7 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( return } -func (srv Server) forwardEsRun(baseUrl string, client *http.Client) { +func (srv *Server) forwardEsRun(baseUrl string, client *http.Client) { url := baseUrl + "/_bulk" hubUUID := srv.store.GetHubUUID() tryResync: @@ -159,7 +159,7 @@ tryResync: } } -func (srv Server) RunForwardingEs(cfg ESForwardConfig) { +func (srv *Server) RunForwardingEs(cfg ESForwardConfig) { s5l.Printf("srv|fwd-es: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd-es: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index d0c07d5..104c005 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -39,7 +39,7 @@ import ( "github.com/equinox0815/graphite-golang" ) -func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) { +func (srv *Server) forwardGraphiteRun(forwardHost string, basePath string) { tryResync: for { client, err := graphite.NewGraphiteFromAddress(forwardHost) @@ -96,7 +96,7 @@ tryResync: } } -func (srv Server) RunForwardingGraphite(cfg GraphiteForwardConfig) { +func (srv *Server) RunForwardingGraphite(cfg GraphiteForwardConfig) { s5l.Printf("srv|fwd-graphite: forwarding to '%s'", cfg.Host) defer s5l.Println("srv|fwd-graphite: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index f3f71e7..93b717b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -55,7 +55,7 @@ func fwdPiwikGetLastUpdateID(piwikURL, siteURL string, siteID uint, token string return } -func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { +func (srv *Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { // hubUuid := srv.store.GetHubUuid() tryResync: for { @@ -138,7 +138,7 @@ tryResync: } } -func (srv Server) RunForwardingPiwik(cfg PiwikForwardConfig) { +func (srv *Server) RunForwardingPiwik(cfg PiwikForwardConfig) { s5l.Printf("srv|fwd-piwik: forwarding to '%s'", cfg.URL) defer s5l.Println("srv|fwd-piwik: forwarder stopped") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 4755c35..2a6a200 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -33,117 +33,118 @@ package sfive import ( - "bytes" + "context" "io" "net" + "strings" + "sync" + "time" ) -const ( - PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed? -) - -// -// Unix Socket Interface (streams) -// - -func (srv Server) pipeHandle(conn net.Conn) { - defer conn.Close() - - dec, err := NewStatefulDecoder(conn) - if err != nil { - s5l.Printf("server|pipe: read(init) failed: %v\n", err) - return - } - - slug := dec.Slug() - s5l.Printf("server|pipe: new connection: %s\n", slug) - defer s5l.Printf("server|pipe(%s): connection closed\n", slug) - +func (srv *Server) pipeRead(dec Decoder, updateCH chan<- *UpdateFull, errorCH chan<- error) { for { update, err := dec.Decode() if err != nil { if err == io.EOF { - break + close(updateCH) + return } - // TODO: send NACK? - opErr, isOpErr := err.(*net.OpError) if isOpErr && opErr.Temporary() { - s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err) - } else { - s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err) - break + continue } + errorCH <- err + return } - - if err = srv.Ingest(update); err != nil { - s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err) - // TODO: send NACK? - break - } - // TODO: send ACK? + updateCH <- update } } -func (srv Server) ServePipe(cfg PipeInterfaceConfig) { - ln, err := net.Listen("unix", cfg.ListenPath) +func (srv *Server) pipeHandle(conn net.Conn, quit <-chan bool) { + defer conn.Close() + + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + dec, err := NewStatefulDecoder(conn) if err != nil { - s5l.Printf("server|pipe: listen() failed: %v", err) + s5l.Printf("srv|pipe: read(init) failed: %v\n", err) return } - defer ln.Close() + conn.SetReadDeadline(time.Time{}) - s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pipe: interface stopped") + slug := dec.Slug() + s5l.Printf("srv|pipe: new connection: %s\n", slug) + defer s5l.Printf("srv|pipe(%s): connection closed\n", slug) + + updateCH := make(chan *UpdateFull) + errorCH := make(chan error) + go srv.pipeRead(dec, updateCH, errorCH) for { - conn, err := ln.Accept() - if err != nil { - s5l.Printf("server|pipe: accept() failed: %v", err) - // ignore - continue + select { + case update := <-updateCH: + if update == nil { + return + } + if err = srv.Ingest(update); err != nil { + s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err) + // TODO: send NACK + return + } + // TODO: send ACK + case err := <-errorCH: + s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err) + return + case <-quit: + return } - go srv.pipeHandle(conn) } } -// -// Unix Socket Interface (datagrams) -// - -func (srv Server) pipegramHandle(pconn net.PacketConn) { - buffer := make([]byte, PipegramMessageSizeLimit) - +func (srv *Server) pipeRun() { + wgClients := &sync.WaitGroup{} + quit := make(chan bool) for { - n, _, err := pconn.ReadFrom(buffer) + conn, err := srv.interfaces.pipe.Accept() if err != nil { - s5l.Printf("srv|pgram: read() failed: %v", err) - continue - } - data := buffer[0:n] - - update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() - if err != nil { - s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + break + } + s5l.Printf("srv|pipe: accept() failed: %v", err) + // TODO: ignore ... or is this permanent? continue } + wgClients.Add(1) + go func() { + defer wgClients.Done() + srv.pipeHandle(conn, quit) + }() + } + s5l.Println("srv|pipe: interface stopped listening") + close(quit) + wgClients.Wait() +} - if err = srv.Ingest(update); err != nil { - s5l.Printf("srv|pgram: storing data failed: %v\n", err) - } +func (srv *Server) pipeStop(ctx context.Context) (err error) { + if srv.interfaces.pipe == nil { + return nil } + s5l.Printf("srv|pipe: shutting down") + return srv.interfaces.pipe.Close() } -func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) { - pconn, err := net.ListenPacket("unixgram", cfg.ListenPath) - if err != nil { - s5l.Printf("srv|pgram: listen() failed: %v", err) +func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { + if srv.interfaces.pipe, err = net.Listen("unix", cfg.ListenPath); err != nil { + s5l.Printf("srv|pipe: listen() failed: %v", err) return } - defer pconn.Close() - s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath) - defer s5l.Println("server|pgram: interface stopped") + s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() - srv.pipegramHandle(pconn) + srv.pipeRun() + }() + return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go new file mode 100644 index 0000000..65ff0e0 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -0,0 +1,95 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + +package sfive + +import ( + "bytes" + "context" + "net" + "strings" +) + +const ( + PipegramMessageSizeLimit = 1024 * 1024 +) + +func (srv *Server) pipegramRun() { + buffer := make([]byte, PipegramMessageSizeLimit) + for { + n, _, err := srv.interfaces.pipegram.ReadFrom(buffer) + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this? + return + } + s5l.Printf("srv|pgram: read() failed: %v", err) + continue + } + data := buffer[0:n] + + update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() + if err != nil { + s5l.Printf("srv|pgram: decoding data message failed: %v\n", err) + continue + } + + if err = srv.Ingest(update); err != nil { + s5l.Printf("srv|pgram: storing data failed: %v\n", err) + } + } +} + +func (srv *Server) pipegramStop(ctx context.Context) (err error) { + if srv.interfaces.pipegram == nil { + return nil + } + s5l.Printf("srv|pgram: shutting down") + return srv.interfaces.pipegram.Close() +} + +func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { + if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil { + s5l.Printf("srv|pgram: listen() failed: %v", err) + return + } + + s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) + + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + defer s5l.Println("srv|pgram: interface stopped listening") + + srv.pipegramRun() + }() + return +} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 9e0b474..65e677a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -33,6 +33,7 @@ package sfive import ( + "context" "encoding/json" "fmt" "io" @@ -392,11 +393,31 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { return } tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(180 * time.Second) + tc.SetKeepAlivePeriod(30 * time.Second) return tc, nil } -func webRun(listener *net.TCPListener, srv *Server) (err error) { +func (srv *Server) webRun(listener *net.TCPListener) { + srv.interfaces.web.Serve(tcpKeepAliveListener{listener}) +} + +func (srv *Server) webStop(ctx context.Context) (err error) { + if srv.interfaces.web == nil { + return nil + } + s5l.Printf("srv|web: shutting down") + return srv.interfaces.web.Shutdown(ctx) +} + +func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error { + ln, err := net.Listen("tcp", cfg.ListenAddr) + if err != nil { + s5l.Printf("srv|web: listen() failed: %v", err) + return err + } + + s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) + mux := http.NewServeMux() mux.Handle("/healthz", webHandler{srv, webHealthz}) mux.Handle("/hubs", webHandler{srv, webHubs}) @@ -406,23 +427,17 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { mux.Handle("/updates", webHandler{srv, webUpdates}) mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID}) - // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) mux.Handle("/", webHandler{srv, webNotFound}) - server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second} - return server.Serve(tcpKeepAliveListener{listener}) -} - -func (srv Server) ServeWeb(cfg WebInterfaceConfig) { - ln, err := net.Listen("tcp", cfg.ListenAddr) - if err != nil { - s5l.Printf("srv|web: listen() failed: %v", err) - return - } + srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} - s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr) - defer s5l.Println("srv|web: interface stopped") + srv.wgInterfaces.Add(1) + go func() { + defer srv.wgInterfaces.Done() + defer s5l.Println("srv|web: interface stopped listening") - webRun(ln.(*net.TCPListener), &srv) + srv.webRun(ln.(*net.TCPListener)) + }() + return nil } diff --git a/src/hub/test-srv-single b/src/hub/test-srv-single new file mode 100755 index 0000000..902c78b --- /dev/null +++ b/src/hub/test-srv-single @@ -0,0 +1,27 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 <db-name> <interface>" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +case "$2" in + pipe) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server=false -start-web-server=false + ;; + pipegram) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server=false + ;; + web) + exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server -web ":8000" + ;; + *) + "unknown interface $2" + return 1 + ;; +esac |