summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-11 14:09:03 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-11 14:09:03 +0200
commitf5001c58096b83fa81c6582047b94aeb427a77c3 (patch)
treeff4df970519c31e12d80bb6c55dba115d06ddedd
parentserver shutdown is a little better now (still needs some thoughts though) (diff)
first steps towards clean shutdown
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go59
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go8
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go22
7 files changed, 64 insertions, 41 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 220a7ae..b91d5c5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -33,16 +33,14 @@
package sfive
import (
+ "context"
"errors"
+ "net/http"
"runtime"
"sync"
"time"
)
-var (
- ErrShutdownInProgress = errors.New("shutdown in progress")
-)
-
type ingestToken struct {
updates []*UpdateFull
response chan error
@@ -53,12 +51,14 @@ type Server struct {
numWorker int
anonymization AnonymizationAlgo
geoip GeoIPLookup
- quit chan bool
wgWorker *sync.WaitGroup
ingestChan chan ingestToken
+ interfaces struct {
+ web *http.Server
+ }
}
-func (srv Server) transform(update *UpdateFull) *UpdateFull {
+func (srv *Server) transform(update *UpdateFull) *UpdateFull {
bytesSentTotal := uint(0)
clients := []Client{}
for _, client := range update.Data.Clients {
@@ -100,13 +100,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull {
return update
}
-func (srv Server) transformMany(updates []*UpdateFull) {
+func (srv *Server) transformMany(updates []*UpdateFull) {
for _, update := range updates {
srv.transform(update)
}
}
-func (srv Server) ingestWorker(idx int) {
+func (srv *Server) ingestWorker(idx int) {
for {
select {
case token, ok := <-srv.ingestChan:
@@ -119,31 +119,44 @@ func (srv Server) ingestWorker(idx int) {
}
}
-func (srv Server) Ingest(update *UpdateFull) error {
+func (srv *Server) Ingest(update *UpdateFull) error {
return srv.IngestMany([]*UpdateFull{update})
}
-func (srv Server) IngestMany(updates []*UpdateFull) error {
- if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it
- return ErrShutdownInProgress
- }
-
+func (srv *Server) IngestMany(updates []*UpdateFull) error {
token := ingestToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
srv.ingestChan <- token
return <-token.response
}
-func (srv Server) Close() {
- s5l.Printf("srv: shutting down")
+func (srv *Server) shutdownInterfaces() (errors int) {
+ ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
+ c := make(chan error)
+ go func() { c <- srv.webStop(ctx) }()
+ //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped
+ //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped
+
+ errors = 0
+ for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled
+ if err := <-c; err != nil {
+ s5l.Printf("srv: interface shutdown failed failed: %v", err)
+ errors++
+ }
+ }
+ close(c) // closing channel here in the hopes that this leads to a panic
+ // in case the number of channel reads (for loop above) is) doesn't match the
+ // number of interfaces
+ cancel()
+ return
+}
- // TODO: enable this as soon as interfaces can be told to stop accepting data
- // and yield wgInterfaces once the last client connection is done
- // close(srv.quit) // tell interfaces to don't accept new data
- // srv.wgInterfaces.Wait() // wait for interfaces to finish up
+func (srv *Server) Close() {
+ s5l.Printf("srv: shutting down")
- srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it
- time.Sleep(time.Second) // this is quite ugly but must be good enough for now
+ if errors := srv.shutdownInterfaces(); errors != 0 {
+ s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!")
+ }
close(srv.ingestChan) // close ingest channel to tell worker to stop
srv.wgWorker.Wait() // wait for worker to finish up
@@ -178,7 +191,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
if cfg.Workers > 0 {
srv.numWorker = cfg.Workers
}
- srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works)
+
srv.wgWorker = &sync.WaitGroup{}
srv.ingestChan = make(chan ingestToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 094a1a8..904dc18 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -109,7 +109,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er
return result.NumUpdates, nil
}
-func (srv Server) forwardRun(baseUrl string, client *http.Client) {
+func (srv *Server) forwardRun(baseUrl string, client *http.Client) {
url := baseUrl + "/updates/_bulk"
hubUUID := srv.store.GetHubUUID()
tryResync:
@@ -151,7 +151,7 @@ tryResync:
}
}
-func (srv Server) RunForwarding(cfg SFiveForwardConfig) {
+func (srv *Server) RunForwarding(cfg SFiveForwardConfig) {
s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL)
defer s5l.Println("srv|fwd: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 02a6470..fb56cbd 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -98,7 +98,7 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (
return
}
-func (srv Server) forwardEsRun(baseUrl string, client *http.Client) {
+func (srv *Server) forwardEsRun(baseUrl string, client *http.Client) {
url := baseUrl + "/_bulk"
hubUUID := srv.store.GetHubUUID()
tryResync:
@@ -159,7 +159,7 @@ tryResync:
}
}
-func (srv Server) RunForwardingEs(cfg ESForwardConfig) {
+func (srv *Server) RunForwardingEs(cfg ESForwardConfig) {
s5l.Printf("srv|fwd-es: forwarding to '%s'", cfg.URL)
defer s5l.Println("srv|fwd-es: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index d0c07d5..104c005 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -39,7 +39,7 @@ import (
"github.com/equinox0815/graphite-golang"
)
-func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) {
+func (srv *Server) forwardGraphiteRun(forwardHost string, basePath string) {
tryResync:
for {
client, err := graphite.NewGraphiteFromAddress(forwardHost)
@@ -96,7 +96,7 @@ tryResync:
}
}
-func (srv Server) RunForwardingGraphite(cfg GraphiteForwardConfig) {
+func (srv *Server) RunForwardingGraphite(cfg GraphiteForwardConfig) {
s5l.Printf("srv|fwd-graphite: forwarding to '%s'", cfg.Host)
defer s5l.Println("srv|fwd-graphite: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index f3f71e7..93b717b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -55,7 +55,7 @@ func fwdPiwikGetLastUpdateID(piwikURL, siteURL string, siteID uint, token string
return
}
-func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
+func (srv *Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
// hubUuid := srv.store.GetHubUuid()
tryResync:
for {
@@ -138,7 +138,7 @@ tryResync:
}
}
-func (srv Server) RunForwardingPiwik(cfg PiwikForwardConfig) {
+func (srv *Server) RunForwardingPiwik(cfg PiwikForwardConfig) {
s5l.Printf("srv|fwd-piwik: forwarding to '%s'", cfg.URL)
defer s5l.Println("srv|fwd-piwik: forwarder stopped")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 4755c35..7d0fbe4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -46,7 +46,7 @@ const (
// Unix Socket Interface (streams)
//
-func (srv Server) pipeHandle(conn net.Conn) {
+func (srv *Server) pipeHandle(conn net.Conn) {
defer conn.Close()
dec, err := NewStatefulDecoder(conn)
@@ -85,7 +85,7 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
}
-func (srv Server) ServePipe(cfg PipeInterfaceConfig) {
+func (srv *Server) ServePipe(cfg PipeInterfaceConfig) {
ln, err := net.Listen("unix", cfg.ListenPath)
if err != nil {
s5l.Printf("server|pipe: listen() failed: %v", err)
@@ -111,7 +111,7 @@ func (srv Server) ServePipe(cfg PipeInterfaceConfig) {
// Unix Socket Interface (datagrams)
//
-func (srv Server) pipegramHandle(pconn net.PacketConn) {
+func (srv *Server) pipegramHandle(pconn net.PacketConn) {
buffer := make([]byte, PipegramMessageSizeLimit)
for {
@@ -134,7 +134,7 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
}
}
-func (srv Server) ServePipegram(cfg PipegramInterfaceConfig) {
+func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) {
pconn, err := net.ListenPacket("unixgram", cfg.ListenPath)
if err != nil {
s5l.Printf("srv|pgram: listen() failed: %v", err)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 9e0b474..a112585 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -33,6 +33,7 @@
package sfive
import (
+ "context"
"encoding/json"
"fmt"
"io"
@@ -392,11 +393,11 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
return
}
tc.SetKeepAlive(true)
- tc.SetKeepAlivePeriod(180 * time.Second)
+ tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}
-func webRun(listener *net.TCPListener, srv *Server) (err error) {
+func (srv *Server) webRun(listener *net.TCPListener) (err error) {
mux := http.NewServeMux()
mux.Handle("/healthz", webHandler{srv, webHealthz})
mux.Handle("/hubs", webHandler{srv, webHubs})
@@ -410,11 +411,20 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) {
// mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. ))))
mux.Handle("/", webHandler{srv, webNotFound})
- server := &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 120 * time.Second}
- return server.Serve(tcpKeepAliveListener{listener})
+ srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second}
+ return srv.interfaces.web.Serve(tcpKeepAliveListener{listener})
}
-func (srv Server) ServeWeb(cfg WebInterfaceConfig) {
+func (srv *Server) webStop(ctx context.Context) (err error) {
+ // TODO: this is a race condition between a call to webRun and webStop...
+ if srv.interfaces.web == nil {
+ return nil
+ }
+ s5l.Printf("srv|web: shutting down")
+ return srv.interfaces.web.Shutdown(ctx)
+}
+
+func (srv *Server) ServeWeb(cfg WebInterfaceConfig) {
ln, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
s5l.Printf("srv|web: listen() failed: %v", err)
@@ -424,5 +434,5 @@ func (srv Server) ServeWeb(cfg WebInterfaceConfig) {
s5l.Printf("srv|web: listening on '%s'", cfg.ListenAddr)
defer s5l.Println("srv|web: interface stopped")
- webRun(ln.(*net.TCPListener), &srv)
+ srv.webRun(ln.(*net.TCPListener))
}