summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-13 12:07:32 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-13 12:07:32 +0200
commit92d690733dee358f9c74e4ef1b6afdd3176a4dad (patch)
tree07ee84f80b0aba6186f3e859ad9fe022054fda09
parentfirst steps towards clean shutdown (diff)
cleaner shutdown for pipgram interface
-rw-r--r--src/daq/s5proxy/src/s5proxy/proxy.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go71
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go79
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go51
4 files changed, 112 insertions, 91 deletions
diff --git a/src/daq/s5proxy/src/s5proxy/proxy.go b/src/daq/s5proxy/src/s5proxy/proxy.go
index 3bb0afb..f994982 100644
--- a/src/daq/s5proxy/src/s5proxy/proxy.go
+++ b/src/daq/s5proxy/src/s5proxy/proxy.go
@@ -212,7 +212,7 @@ func (p *Proxy) Run() error {
httpsL := m.Match(cmux.Any())
go p.RunHTTPS(httpsL)
- if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") {
+ if err := m.Serve(); !strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
return err
}
return nil
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index ba34697..9b28270 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -37,7 +37,7 @@ import (
"log"
"os"
"os/signal"
- "sync"
+ "syscall"
"spreadspace.org/sfive"
)
@@ -100,64 +100,10 @@ func main() {
if err != nil {
s5hl.Fatalf(err.Error())
}
- defer srv.Close()
- var wg sync.WaitGroup
-
- if cfg.Interfaces.Pipe.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipe(cfg.Interfaces.Pipe)
- }()
- }
-
- if cfg.Interfaces.Pipegram.ListenPath != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServePipegram(cfg.Interfaces.Pipegram)
- }()
- }
-
- if cfg.Interfaces.Web.ListenAddr != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.ServeWeb(cfg.Interfaces.Web)
- }()
- }
-
- if cfg.Forwards.SFive.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwarding(cfg.Forwards.SFive)
- }()
- }
-
- if cfg.Forwards.Elasticsearch.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingEs(cfg.Forwards.Elasticsearch)
- }()
- }
-
- if cfg.Forwards.Graphite.Host != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingGraphite(cfg.Forwards.Graphite)
- }()
- }
-
- if cfg.Forwards.Piwik.URL != "" {
- wg.Add(1)
- go func() {
- defer wg.Done()
- srv.RunForwardingPiwik(cfg.Forwards.Piwik)
- }()
+ wg, err := srv.Start()
+ if err != nil {
+ s5hl.Fatalf(err.Error())
}
alldone := make(chan bool)
@@ -167,13 +113,12 @@ func main() {
}()
c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt)
+ signal.Notify(c, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
select {
- case <-c:
- s5hl.Println("received interrupt, shutdown")
- return
+ case sig := <-c:
+ s5hl.Printf("signal(%v) received, shutting down", sig)
case <-alldone:
- return
}
+ srv.Shutdown()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index b91d5c5..e8778df 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -35,6 +35,7 @@ package sfive
import (
"context"
"errors"
+ "net"
"net/http"
"runtime"
"sync"
@@ -47,6 +48,7 @@ type ingestToken struct {
}
type Server struct {
+ cfg SrvConfig
store *Store
numWorker int
anonymization AnonymizationAlgo
@@ -54,7 +56,8 @@ type Server struct {
wgWorker *sync.WaitGroup
ingestChan chan ingestToken
interfaces struct {
- web *http.Server
+ web *http.Server
+ pipegram net.PacketConn
}
}
@@ -134,24 +137,83 @@ func (srv *Server) shutdownInterfaces() (errors int) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
c := make(chan error)
go func() { c <- srv.webStop(ctx) }()
- //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped
- //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped
+ //go func() { c <- srv.pipeStop(ctx) }() // TODO: add this as soon as pipe interface can be stopped
+ go func() { c <- srv.pipegramStop(ctx) }()
errors = 0
- for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled
+ for i := 0; i < 2; i++ { // TODO: set limit to 3 when the above has been enabled
if err := <-c; err != nil {
s5l.Printf("srv: interface shutdown failed failed: %v", err)
errors++
}
}
close(c) // closing channel here in the hopes that this leads to a panic
- // in case the number of channel reads (for loop above) is) doesn't match the
- // number of interfaces
+ // in case the number of channel reads (for loop above) doesn't match the
+ // number of interfaces to be stopped
cancel()
return
}
-func (srv *Server) Close() {
+func (srv *Server) Start() (wg sync.WaitGroup, err error) {
+ if srv.cfg.Interfaces.Pipe.ListenPath != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.ServePipe(srv.cfg.Interfaces.Pipe)
+ }()
+ }
+
+ if srv.cfg.Interfaces.Pipegram.ListenPath != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.ServePipegram(srv.cfg.Interfaces.Pipegram)
+ }()
+ }
+
+ if srv.cfg.Interfaces.Web.ListenAddr != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.ServeWeb(srv.cfg.Interfaces.Web)
+ }()
+ }
+
+ if srv.cfg.Forwards.SFive.URL != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.RunForwarding(srv.cfg.Forwards.SFive)
+ }()
+ }
+
+ if srv.cfg.Forwards.Elasticsearch.URL != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch)
+ }()
+ }
+
+ if srv.cfg.Forwards.Graphite.Host != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite)
+ }()
+ }
+
+ if srv.cfg.Forwards.Piwik.URL != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik)
+ }()
+ }
+ return wg, nil
+}
+
+func (srv *Server) Shutdown() {
s5l.Printf("srv: shutting down")
if errors := srv.shutdownInterfaces(); errors != 0 {
@@ -160,13 +222,14 @@ func (srv *Server) Close() {
close(srv.ingestChan) // close ingest channel to tell worker to stop
srv.wgWorker.Wait() // wait for worker to finish up
+ s5l.Printf("srv: all worker stopped")
srv.store.Close()
s5l.Printf("srv: finished")
}
func NewServer(cfg SrvConfig) (srv *Server, err error) {
- srv = &Server{}
+ srv = &Server{cfg: cfg}
if srv.store, err = NewStore(cfg.Store); err != nil {
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 7d0fbe4..6cf2bc4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -34,8 +34,10 @@ package sfive
import (
"bytes"
+ "context"
"io"
"net"
+ "strings"
)
const (
@@ -51,13 +53,13 @@ func (srv *Server) pipeHandle(conn net.Conn) {
dec, err := NewStatefulDecoder(conn)
if err != nil {
- s5l.Printf("server|pipe: read(init) failed: %v\n", err)
+ s5l.Printf("srv|pipe: read(init) failed: %v\n", err)
return
}
slug := dec.Slug()
- s5l.Printf("server|pipe: new connection: %s\n", slug)
- defer s5l.Printf("server|pipe(%s): connection closed\n", slug)
+ s5l.Printf("srv|pipe: new connection: %s\n", slug)
+ defer s5l.Printf("srv|pipe(%s): connection closed\n", slug)
for {
update, err := dec.Decode()
@@ -69,15 +71,15 @@ func (srv *Server) pipeHandle(conn net.Conn) {
opErr, isOpErr := err.(*net.OpError)
if isOpErr && opErr.Temporary() {
- s5l.Printf("server|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err)
+ s5l.Printf("srv|pipe(%s): read(data) failed: %v (temporary error)\n", slug, err)
} else {
- s5l.Printf("server|pipe(%s): read(data) failed: %v\n", slug, err)
+ s5l.Printf("srv|pipe(%s): read(data) failed: %v\n", slug, err)
break
}
}
if err = srv.Ingest(update); err != nil {
- s5l.Printf("server|pipe(%s): storing data failed: %v\n", slug, err)
+ s5l.Printf("srv|pipe(%s): storing data failed: %v\n", slug, err)
// TODO: send NACK?
break
}
@@ -88,18 +90,18 @@ func (srv *Server) pipeHandle(conn net.Conn) {
func (srv *Server) ServePipe(cfg PipeInterfaceConfig) {
ln, err := net.Listen("unix", cfg.ListenPath)
if err != nil {
- s5l.Printf("server|pipe: listen() failed: %v", err)
+ s5l.Printf("srv|pipe: listen() failed: %v", err)
return
}
defer ln.Close()
- s5l.Printf("server|pipe: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("server|pipe: interface stopped")
+ s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("srv|pipe: interface stopped")
for {
conn, err := ln.Accept()
if err != nil {
- s5l.Printf("server|pipe: accept() failed: %v", err)
+ s5l.Printf("srv|pipe: accept() failed: %v", err)
// ignore
continue
}
@@ -111,12 +113,15 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) {
// Unix Socket Interface (datagrams)
//
-func (srv *Server) pipegramHandle(pconn net.PacketConn) {
+func (srv *Server) pipegramRun() {
buffer := make([]byte, PipegramMessageSizeLimit)
for {
- n, _, err := pconn.ReadFrom(buffer)
+ n, _, err := srv.interfaces.pipegram.ReadFrom(buffer)
if err != nil {
+ if strings.Contains(err.Error(), "use of closed network connection") { // TODO: is this really the best way to do this?
+ return
+ }
s5l.Printf("srv|pgram: read() failed: %v", err)
continue
}
@@ -134,16 +139,24 @@ func (srv *Server) pipegramHandle(pconn net.PacketConn) {
}
}
-func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) {
- pconn, err := net.ListenPacket("unixgram", cfg.ListenPath)
- if err != nil {
+func (srv *Server) pipegramStop(ctx context.Context) (err error) {
+ // TODO: this is a race condition between a call to webRun and webStop...
+ if srv.interfaces.pipegram == nil {
+ return nil
+ }
+ s5l.Printf("srv|pgram: shutting down")
+ return srv.interfaces.pipegram.Close()
+}
+
+func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) {
+ if srv.interfaces.pipegram, err = net.ListenPacket("unixgram", cfg.ListenPath); err != nil {
s5l.Printf("srv|pgram: listen() failed: %v", err)
return
}
- defer pconn.Close()
- s5l.Printf("server|pgram: listening on '%s'", cfg.ListenPath)
- defer s5l.Println("server|pgram: interface stopped")
+ s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath)
+ defer s5l.Println("srv|pgram: interface stopped")
- srv.pipegramHandle(pconn)
+ srv.pipegramRun()
+ return
}