summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-11-16 22:18:29 +0100
committerChristian Pointner <equinox@spreadspace.org>2017-11-16 22:18:29 +0100
commite095541652336559edb76854950f474f8aa86999 (patch)
treec5ec2c0c832ff5b28f6a4a340c65ec9945f05358
parentserver shutdown is a little better now (still needs some thoughts though) (diff)
parentpipe client connections can now be killed on shutdown (diff)
Merge branch 'clean-shutdown'
-rw-r--r--src/daq/s5proxy/src/s5proxy/proxy.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go71
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go118
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go151
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipegram.go95
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go47
-rwxr-xr-xsrc/hub/test-srv-single27
11 files changed, 335 insertions, 192 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go
index 3bb0afb..f994982 100644
--- a/src/daq/s5proxy/src/s5proxy/proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/proxy.go
@@ -212,7 +212,7 @@ func (p *Proxy) Run() error {
httpsL := m.Match(cmux.Any())
go p.RunHTTPS(httpsL)
- if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") {
+ if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
return err
}
return nil
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index ba34697..9b28270 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -37,7 +37,7 @@ import (
"log"
"os"
"os/signal"
- "sync"
+ "syscall"
"spreadspace.org/sfive"
)
@@ -100,64 +100,10 @@ func main() {
if err != nil {
s5hl.Fatalf(err.Error())
}
- defer srv.Close()
- var wg sync.WaitGroup
-
- if cfg.Interfaces.Pipe.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipe(cfg.Interfaces.Pipe)
- }()
- }
-
- if cfg.Interfaces.Pipegram.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipegram(cfg.Interfaces.Pipegram)
- }()
- }
-
- if cfg.Interfaces.Web.ListenAddr != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServeWeb(cfg.Interfaces.Web)
- }()
- }
-
- if cfg.Forwards.SFive.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwarding(cfg.Forwards.SFive)
- }()
- }
-
- if cfg.Forwards.Elasticsearch.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingEs(cfg.Forwards.Elasticsearch)
- }()
- }
-
- if cfg.Forwards.Graphite.Host != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingGraphite(cfg.Forwards.Graphite)
- }()
- }
-
- if cfg.Forwards.Piwik.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingPiwik(cfg.Forwards.Piwik)
- }()
+ wg, err := srv.Start()
+ if err != nil {
+ s5hl.Fatalf(err.Error())
}
alldone := make(chan bool)
@@ -167,13 +113,12 @@ func main() {
}()
c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt)
+ signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
select {
- case <-c:
- s5hl.Println("received interrupt, shutdown")
- return
+ case sig := <-c:
+ s5hl.Printf("signal(%v) received, shutting down", sig)
case <-alldone:
- return
}
+ srv.Shutdown()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 220a7ae..e45db69 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -33,32 +33,38 @@
package sfive
import (
+ "context"
"errors"
+ "net"
+ "net/http"
"runtime"
"sync"
"time"
)
-var (
- ErrShutdownInProgress = errors.New("shutdown in progress")
-)
-
type ingestToken struct {
updates []*UpdateFull
response chan error
}
type Server struct {
- store *Store
+ cfg SrvConfig
+ store *Store
+ wgInterfaces sync.WaitGroup
+ interfaces struct {
+ pipe net.Listener
+ pipegram net.PacketConn
+ web *http.Server
+ }
numWorker int
anonymization AnonymizationAlgo
geoip GeoIPLookup
- quit chan bool
- wgWorker *sync.WaitGroup
+ wgWorker sync.WaitGroup
ingestChan chan ingestToken
+ wgForwarder sync.WaitGroup
}
-func (srv Server) transform(update *UpdateFull) *UpdateFull {
+func (srv *Server) transform(update *UpdateFull) *UpdateFull {
bytesSentTotal := uint(0)
clients := []Client{}
for _, client := range update.Data.Clients {
@@ -100,13 +106,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
return update
}
-func (srv Server) transformMany(updates []*UpdateFull) {
+func (srv *Server) transformMany(updates []*UpdateFull) {
for _, update := range updates {
srv.transform(update)
}
}
-func (srv Server) ingestWorker(idx int) {
+func (srv *Server) ingestWorker(idx int) {
for {
select {
case token, ok := <-srv.ingestChan:
@@ -119,41 +125,96 @@ func (srv Server) ingestWorker(idx int) {
}
}
-func (srv Server) Ingest(update *UpdateFull) error {
+func (srv *Server) Ingest(update *UpdateFull) error {
return srv.IngestMany([]*UpdateFull{update})
}
-func (srv Server) IngestMany(updates []*UpdateFull) error {
- if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it
- return ErrShutdownInProgress
- }
-
+func (srv *Server) IngestMany(updates []*UpdateFull) error {
token := ingestToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
srv.ingestChan <- token
return <-token.response
}
-func (srv Server) Close() {
- s5l.Printf("srv: shutting down")
+func (srv *Server) Start() (wg *sync.WaitGroup, err error) {
+ if srv.cfg.Interfaces.Pipe.ListenPath != "" {
+ srv.ServePipe(srv.cfg.Interfaces.Pipe)
+ }
+ if srv.cfg.Interfaces.Pipegram.ListenPath != "" {
+ srv.ServePipegram(srv.cfg.Interfaces.Pipegram)
+ }
+ if srv.cfg.Interfaces.Web.ListenAddr != "" {
+ srv.ServeWeb(srv.cfg.Interfaces.Web)
+ }
+
+ // TODO: forwarder need a clean shutdown as well!!!
+ if srv.cfg.Forwards.SFive.URL != "" {
+ go srv.RunForwarding(srv.cfg.Forwards.SFive)
+ }
+ if srv.cfg.Forwards.Elasticsearch.URL != "" {
+ go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
+ }
+ if srv.cfg.Forwards.Graphite.Host != "" {
+ go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
+ }
+ if srv.cfg.Forwards.Piwik.URL != "" {
+ go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
+ }
- // TODO: enable this as soon as interfaces can be told to stop accepting data
- // and yield wgInterfaces once the last client connection is done
- // close(srv.quit) // tell interfaces to don't accept new data
- // srv.wgInterfaces.Wait() // wait for interfaces to finish up
+ return &srv.wgInterfaces, nil
+}
+
+func (srv *Server) shutdownInterfaces() {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ defer cancel()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.webStop(ctx); err != nil {
+ s5l.Printf("srv|web: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.pipeStop(ctx); err != nil {
+ s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err)
+ }
+ }()
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ if err := srv.pipegramStop(ctx); err != nil {
+ s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err)
+ }
+ }()
- srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it
- time.Sleep(time.Second) // this is quite ugly but must be good enough for now
+ s5l.Printf("srv: waiting for all clients to disconnect")
+ defer s5l.Printf("srv: all clients are now disconnected")
+ srv.wgInterfaces.Wait()
+ return
+}
- close(srv.ingestChan) // close ingest channel to tell worker to stop
- srv.wgWorker.Wait() // wait for worker to finish up
+// must not be called before Start()
+func (srv *Server) Shutdown() {
+ s5l.Printf("srv: shutting down")
+
+ srv.shutdownInterfaces()
+
+ s5l.Printf("srv: shutting down worker")
+ close(srv.ingestChan)
+ srv.wgWorker.Wait()
+ s5l.Printf("srv: all worker stopped")
srv.store.Close()
s5l.Printf("srv: finished")
}
func NewServer(cfg SrvConfig) (srv *Server, err error) {
- srv = &Server{}
+ srv = &Server{cfg: cfg}
if srv.store, err = NewStore(cfg.Store); err != nil {
return
}
@@ -178,8 +239,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
if cfg.Workers > 0 {
srv.numWorker = cfg.Workers
}
- srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works)
- srv.wgWorker = &sync.WaitGroup{}
+
srv.ingestChan = make(chan ingestToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
srv.wgWorker.Add(1)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 094a1a8..904dc18 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -109,7 +109,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er
return result.NumUpdates, nil
}
-func (srv Server) forwardRun(baseUrl string, client *http.Client) {
+func (srv *Server) forwardRun(baseUrl string, client *http.Client) {
url := baseUrl + "/updates/_bulk"
hubUUID := srv.store.GetHubUUID()
tryResync:
@@ -151,7 +151,7 @@ tryResync:
}
}
-func (srv Server) RunForwarding(cfg SFiveForwardConfig) {
+func (srv *Server) RunForwarding(cfg SFiveForwardConfig) {
s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL)
defer s5l.Println("srv|fwd: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 02a6470..fb56cbd 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -98,7 +98,7 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (
return
}
-func (srv Server) forwardEsRun(baseUrl string, client *http.Client) {
+func (srv *Server) forwardEsRun(baseUrl string, client *http.Client) {
url := baseUrl + "/_bulk"
hubUUID := srv.store.GetHubUUID()
tryResync:
@@ -159,7 +159,7 @@ tryResync:
}
}
-func (srv Server) RunForwardingEs(cfg ESForwardConfig) {
+func (srv *Server) RunForwardingEs(cfg ESForwardConfig) {
s5l.Printf("srv|fwd-es: forwarding to '%s'", cfg.URL)
defer s5l.Println("srv|fwd-es: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index d0c07d5..104c005 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -39,7 +39,7 @@ import (
"github.com/equinox0815/graphite-golang"
)
-func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) {
+func (srv *Server) forwardGraphiteRun(forwardHost string, basePath string) {
tryResync:
for {
client, err := graphite.NewGraphiteFromAddress(forwardHost)
@@ -96,7 +96,7 @@ tryResync:
}
}
-func (srv Server) RunForwardingGraphite(cfg GraphiteForwardConfig) {
+func (srv *Server) RunForwardingGraphite(cfg GraphiteForwardConfig) {
s5l.Printf("srv|fwd-graphite: forwarding to '%s'", cfg.Host)
defer s5l.Println("srv|fwd-graphite: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index f3f71e7..93b717b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -55,7 +55,7 @@ func fwdPiwikGetLastUpdateID(piwikURL, siteURL string, siteID uint, token string
return
}
-func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
+func (srv *Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
// hubUuid := srv.store.GetHubUuid()
tryResync:
for {
@@ -138,7 +138,7 @@ tryResync:
}
}
-func (srv Server) RunForwardingPiwik(cfg PiwikForwardConfig) {
+func (srv *Server) RunForwardingPiwik(cfg PiwikForwardConfig) {
s5l.Printf("srv|fwd-piwik: forwarding to '%s'", cfg.URL)
defer s5l.Println("srv|fwd-piwik: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 4755c35..2a6a200 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -33,117 +33,118 @@
package sfive
import (
- "bytes"
+ "context"
"io"
"net"
+ "strings"
+ "sync"
+ "time"
)
-const (
- PipegramMessageSizeLimit = 1024 * 1024 // TODO: is this really needed?
-)
-
-//
-// Unix Socket Interface (streams)
-//
-
-func (srv Server) pipeHandle(conn net.Conn) {
- defer conn.Close()
-
- dec, err := NewStatefulDecoder(conn)
- if err != nil {
- s5l.Printf("server|pipe: read(init) failed: %v\n", err)
- return
- }
-
- slug := dec.Slug()
- s5l.Printf("server|pipe: new connection: %s\n", slug)
- defer s5l.Printf("server|pipe(%s): connection closed\n", slug)
-
+func (srv *Server) pipeRead(dec Decoder, updateCH chan<- *UpdateFull, errorCH chan<- error) {
for {
update, err := dec.Decode()
if err != nil {
if err == io.EOF {
- break
+ close(updateCH)
+ return
}
- // TODO: send NACK?
-
opErr, isOpErr := err.(*net.OpError)
if isOpErr && opErr.Temporary() {
- s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err)
- } else {
- s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err)
- break
+ continue
}
+ errorCH <- err
+ return
}
-
- if err = srv.Ingest(update); err != nil {
- s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err)
- // TODO: send NACK?
- break
- }
- // TODO: send ACK?
+ updateCH <- update
}
}
-func (srv Server) ServePipe(cfg PipeInterfaceConfig) {
- ln, err := net.Listen("unix", cfg.ListenPath)
+func (srv *Server) pipeHandle(conn net.Conn, quit <-chan bool) {
+ defer conn.Close()
+
+ conn.SetReadDeadline(time.Now().Add(10 * time.Second))
+ dec, err := NewStatefulDecoder(conn)
if err != nil {
- s5l.Printf("server|pipe: listen() failed: %v", err)
+ s5l.Printf("srv|pipe: read(init) failed: %v\n", err)
return
}
- defer ln.Close()
+ conn.SetReadDeadline(time.Time{})
- s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("server|pipe: interface stopped")
+ slug := dec.Slug()
+ s5l.Printf("srv|pipe: new connection: %s\n", slug)
+ defer s5l.Printf("srv|pipe(%s): connection closed\n", slug)
+
+ updateCH := make(chan *UpdateFull)
+ errorCH := make(chan error)
+ go srv.pipeRead(dec, updateCH, errorCH)
for {
- conn, err := ln.Accept()
- if err != nil {
- s5l.Printf("server|pipe: accept() failed: %v", err)
- // ignore
- continue
+ select {
+ case update := <-updateCH:
+ if update == nil {
+ return
+ }
+ if err = srv.Ingest(update); err != nil {
+ s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err)
+ // TODO: send NACK
+ return
+ }
+ // TODO: send ACK
+ case err := <-errorCH:
+ s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err)
+ return
+ case <-quit:
+ return
}
- go srv.pipeHandle(conn)
}
}
-//
-// Unix Socket Interface (datagrams)
-//
-
-func (srv Server) pipegramHandle(pconn net.PacketConn) {
- buffer := make([]byte, PipegramMessageSizeLimit)
-
+func (srv *Server) pipeRun() {
+ wgClients := &sync.WaitGroup{}
+ quit := make(chan bool)
for {
- n, _, err := pconn.ReadFrom(buffer)
+ conn, err := srv.interfaces.pipe.Accept()
if err != nil {
- s5l.Printf("srv|pgram: read() failed: %v", err)
- continue
- }
- data := buffer[0:n]
-
- update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
- if err != nil {
- s5l.Printf("srv|pgram: decoding data message failed: %v\n", err)
+ if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
+ break
+ }
+ s5l.Printf("srv|pipe: accept() failed: %v", err)
+ // TODO: ignore ... or is this permanent?
continue
}
+ wgClients.Add(1)
+ go func() {
+ defer wgClients.Done()
+ srv.pipeHandle(conn, quit)
+ }()
+ }
+ s5l.Println("srv|pipe: interface stopped listening")
+ close(quit)
+ wgClients.Wait()
+}
- if err = srv.Ingest(update); err != nil {
- s5l.Printf("srv|pgram: storing data failed: %v\n", err)
- }
+func (srv *Server) pipeStop(ctx context.Context) (err error) {
+ if srv.interfaces.pipe == nil {
+ return nil
}
+ s5l.Printf("srv|pipe: shutting down")
+ return srv.interfaces.pipe.Close()
}
-func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) {
- pconn, err := net.ListenPacket("unixgram", cfg.ListenPath)
- if err != nil {
- s5l.Printf("srv|pgram: listen() failed: %v", err)
+func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) {
+ if srv.interfaces.pipe, err = net.Listen("unix", cfg.ListenPath); err != nil {
+ s5l.Printf("srv|pipe: listen() failed: %v", err)
return
}
- defer pconn.Close()
- s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("server|pgram: interface stopped")
+ s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath)
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
- srv.pipegramHandle(pconn)
+ srv.pipeRun()
+ }()
+ return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
new file mode 100644
index 0000000..65ff0e0
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go
@@ -0,0 +1,95 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
+package sfive
+
+import (
+ "bytes"
+ "context"
+ "net"
+ "strings"
+)
+
+const (
+ PipegramMessageSizeLimit = 1024 * 1024
+)
+
+func (srv *Server) pipegramRun() {
+ buffer := make([]byte, PipegramMessageSizeLimit)
+ for {
+ n, _, err := srv.interfaces.pipegram.ReadFrom(buffer)
+ if err != nil {
+ if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
+ return
+ }
+ s5l.Printf("srv|pgram: read() failed: %v", err)
+ continue
+ }
+ data := buffer[0:n]
+
+ update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
+ if err != nil {
+ s5l.Printf("srv|pgram: decoding data message failed: %v\n", err)
+ continue
+ }
+
+ if err = srv.Ingest(update); err != nil {
+ s5l.Printf("srv|pgram: storing data failed: %v\n", err)
+ }
+ }
+}
+
+func (srv *Server) pipegramStop(ctx context.Context) (err error) {
+ if srv.interfaces.pipegram == nil {
+ return nil
+ }
+ s5l.Printf("srv|pgram: shutting down")
+ return srv.interfaces.pipegram.Close()
+}
+
+func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) {
+ if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil {
+ s5l.Printf("srv|pgram: listen() failed: %v", err)
+ return
+ }
+
+ s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath)
+
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ defer s5l.Println("srv|pgram: interface stopped listening")
+
+ srv.pipegramRun()
+ }()
+ return
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 9e0b474..65e677a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -33,6 +33,7 @@
package sfive
import (
+ "context"
"encoding/json"
"fmt"
"io"
@@ -392,11 +393,31 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
return
}
tc.SetKeepAlive(true)
- tc.SetKeepAlivePeriod(180 * time.Second)
+ tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}
-func webRun(listener *net.TCPListener, srv *Server) (err error) {
+func (srv *Server) webRun(listener *net.TCPListener) {
+ srv.interfaces.web.Serve(tcpKeepAliveListener{listener})
+}
+
+func (srv *Server) webStop(ctx context.Context) (err error) {
+ if srv.interfaces.web == nil {
+ return nil
+ }
+ s5l.Printf("srv|web: shutting down")
+ return srv.interfaces.web.Shutdown(ctx)
+}
+
+func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error {
+ ln, err := net.Listen("tcp", cfg.ListenAddr)
+ if err != nil {
+ s5l.Printf("srv|web: listen() failed: %v", err)
+ return err
+ }
+
+ s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr)
+
mux := http.NewServeMux()
mux.Handle("/healthz", webHandler{srv, webHealthz})
mux.Handle("/hubs", webHandler{srv, webHubs})
@@ -406,23 +427,17 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) {
mux.Handle("/updates", webHandler{srv, webUpdates})
mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID})
mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID})
-
// mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. ))))
mux.Handle("/", webHandler{srv, webNotFound})
- server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second}
- return server.Serve(tcpKeepAliveListener{listener})
-}
-
-func (srv Server) ServeWeb(cfg WebInterfaceConfig) {
- ln, err := net.Listen("tcp", cfg.ListenAddr)
- if err != nil {
- s5l.Printf("srv|web: listen() failed: %v", err)
- return
- }
+ srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
- s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr)
- defer s5l.Println("srv|web: interface stopped")
+ srv.wgInterfaces.Add(1)
+ go func() {
+ defer srv.wgInterfaces.Done()
+ defer s5l.Println("srv|web: interface stopped listening")
- webRun(ln.(*net.TCPListener), &srv)
+ srv.webRun(ln.(*net.TCPListener))
+ }()
+ return nil
}
diff --git a/src/hub/test-srv-single b/src/hub/test-srv-single
new file mode 100755
index 0000000..902c78b
--- /dev/null
+++ b/src/hub/test-srv-single
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name> <interface>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+mkdir -p "$TEST_D"
+rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
+case "$2" in
+ pipe)
+ exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server=false -start-web-server=false
+ ;;
+ pipegram)
+ exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server=false
+ ;;
+ web)
+ exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server -web ":8000"
+ ;;
+ *)
+ "unknown interface $2"
+ return 1
+ ;;
+esac