From 22448bda4081d9a1d1a4aa97837a483e611dcd31 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 25 Nov 2017 17:48:54 +0100 Subject: clean shutdown for forwarder (not finished) --- src/hub/src/spreadspace.org/sfive/s5srv.go | 70 +++++--- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 159 ------------------ .../src/spreadspace.org/sfive/s5srvForwardSfive.go | 187 +++++++++++++++++++++ src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 4 +- src/hub/src/spreadspace.org/sfive/s5srvPipegram.go | 4 +- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 4 +- 6 files changed, 241 insertions(+), 187 deletions(-) delete mode 100644 src/hub/src/spreadspace.org/sfive/s5srvForward.go create mode 100644 src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e45db69..e63b214 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -48,20 +48,25 @@ type ingestToken struct { } type Server struct { - cfg SrvConfig - store *Store - wgInterfaces sync.WaitGroup - interfaces struct { + cfg SrvConfig + store *Store + wgShutdown sync.WaitGroup + interfaces struct { pipe net.Listener pipegram net.PacketConn web *http.Server } + forwarder struct { + sfive chan chan bool + es chan chan bool + graphite chan chan bool + piwik chan chan bool + } numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup wgWorker sync.WaitGroup ingestChan chan ingestToken - wgForwarder sync.WaitGroup } func (srv *Server) transform(update *UpdateFull) *UpdateFull { @@ -147,54 +152,73 @@ func (srv *Server) Start() (wg *sync.WaitGroup, err error) { srv.ServeWeb(srv.cfg.Interfaces.Web) } - // TODO: forwarder need a clean shutdown as well!!! if srv.cfg.Forwards.SFive.URL != "" { - go srv.RunForwarding(srv.cfg.Forwards.SFive) + srv.ForwardSFive(srv.cfg.Forwards.SFive) } if srv.cfg.Forwards.Elasticsearch.URL != "" { - go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) + go srv.RunForwardingEs(srv.cfg.Forwards.Elasticsearch) // TODO: clean shutdown } if srv.cfg.Forwards.Graphite.Host != "" { - go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) + go srv.RunForwardingGraphite(srv.cfg.Forwards.Graphite) // TODO: clean shutdown } if srv.cfg.Forwards.Piwik.URL != "" { - go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) + go srv.RunForwardingPiwik(srv.cfg.Forwards.Piwik) // TODO: clean shutdown } - return &srv.wgInterfaces, nil + return &srv.wgShutdown, nil } func (srv *Server) shutdownInterfaces() { - ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value defer cancel() - srv.wgInterfaces.Add(1) + var wg sync.WaitGroup + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.webStop(ctx); err != nil { - s5l.Printf("srv|web: interface shutdown failed failed: %v", err) + s5l.Printf("srv|web: interface shutdown failed: %v", err) } }() - srv.wgInterfaces.Add(1) + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.pipeStop(ctx); err != nil { - s5l.Printf("srv|pipe: interface shutdown failed failed: %v", err) + s5l.Printf("srv|pipe: interface shutdown failed: %v", err) } }() - srv.wgInterfaces.Add(1) + wg.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer wg.Done() if err := srv.pipegramStop(ctx); err != nil { - s5l.Printf("srv|pgram: interface shutdown failed failed: %v", err) + s5l.Printf("srv|pgram: interface shutdown failed: %v", err) } }() s5l.Printf("srv: waiting for all clients to disconnect") defer s5l.Printf("srv: all clients are now disconnected") - srv.wgInterfaces.Wait() + wg.Wait() + return +} + +func (srv *Server) shutdownForwarder() { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) // TODO: hardcoded value + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := srv.forwardStop(ctx); err != nil { + s5l.Printf("srv|fwd: forwarder shutdown failed: %v", err) + } + }() + + s5l.Printf("srv: waiting for all forwarder to stop") + defer s5l.Printf("srv: all forwarder are stopped") + wg.Wait() return } @@ -209,6 +233,8 @@ func (srv *Server) Shutdown() { srv.wgWorker.Wait() s5l.Printf("srv: all worker stopped") + srv.shutdownForwarder() + srv.store.Close() s5l.Printf("srv: finished") } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go deleted file mode 100644 index 9e395be..0000000 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ /dev/null @@ -1,159 +0,0 @@ -// -// sfive -// -// sfive - spreadspace streaming statistics suite is a generic -// statistic collection tool for streaming server infrastuctures. -// The system collects and stores meta data like number of views -// and throughput from a number of streaming servers and stores -// it in a global data store. -// The data acquisition is designed to be generic and extensible in -// order to support different streaming software. -// sfive also contains tools and applications to filter and visualize -// live and recorded data. -// -// -// Copyright (C) 2014-2017 Christian Pointner -// Markus Grüneis -// -// This file is part of sfive. -// -// sfive is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License version 3 -// as published by the Free Software Foundation. -// -// sfive is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with sfive. If not, see . -// - -package sfive - -import ( - "encoding/json" - "errors" - "io" - "net/http" - "time" -) - -func findMaxID(updates []*UpdateFull) int { - maxID := -1 - for _, value := range updates { - if id := value.SourceHubUpdateID; id > maxID { - maxID = id - } - } - return maxID -} - -func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (lastID int, err error) { - lastID = -1 - - var resp *http.Response - resp, err = client.Get(baseUrl + "/api/v1/lastupdate/" + hubUUID) - if err != nil { - s5l.Printf("srv|fwd: querying for lastupdate failed: %v", err) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", resp.StatusCode) - return - } - - result := WebLastUpdateIDResponse{} - if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { - s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", err) - return - } - - lastID = result.LastUpdateID - return -} - -func fwdWriteUpdates(updates []*UpdateFull, pw *io.PipeWriter) { - defer pw.Close() - - enc, err := NewStatefulEncoder(pw) - if err != nil { - s5l.Printf("srv|fwd: encoding/sending init message failed: %v", err) - return - } - for _, upd := range updates { - if err := enc.Encode(upd); err != nil { - s5l.Printf("srv|fwd: encoding/sending updates failed: %v", err) - return - } - } -} - -func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, error) { - resp, err := client.Post(url, "application/json", pr) - if err != nil { - return 0, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return 0, errors.New("") - } - result := WebUpdatesPostResponse{} - if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { - return 0, err - } - return result.NumUpdates, nil -} - -func (srv *Server) forwardRun(baseUrl string, client *http.Client) { - url := baseUrl + "/api/v1/updates/_bulk" - hubUUID := srv.store.GetHubUUID() -tryResync: - for { - lastID, err := fwdGetLastUpdateID(baseUrl, client, hubUUID) - if err != nil { - s5l.Printf("srv|fwd: fetching lastupdate failed: %v", err) - time.Sleep(5 * time.Second) - continue tryResync - } - s5l.Printf("srv|fwd: new lastupdate: %d", lastID) - - nextBatch: - for { - updates, err := srv.store.GetUpdatesAfter(lastID, 5000) - if err != nil { - s5l.Printf("srv|fwd: reading updates failed: %v", err) - time.Sleep(500 * time.Millisecond) - continue nextBatch - } - if len(updates) == 0 { - time.Sleep(1 * time.Second) - continue nextBatch - } - - pr, pw := io.Pipe() - go fwdWriteUpdates(updates, pw) - if num, err := fwdPostUpdates(client, url, pr); err != nil { - s5l.Printf("srv|fwd: sending updates failed: %v", err) - continue tryResync - } else if num != len(updates) { - s5l.Printf("srv|fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num) - continue tryResync - } - - lastID = findMaxID(updates) - s5l.Printf("srv|fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) - } - } -} - -func (srv *Server) RunForwarding(cfg SFiveForwardConfig) { - s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL) - defer s5l.Println("srv|fwd: forwarder stopped") - - srv.forwardRun(cfg.URL, http.DefaultClient) -} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go new file mode 100644 index 0000000..d09c500 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardSfive.go @@ -0,0 +1,187 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner +// Markus Grüneis +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see . +// + +package sfive + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "time" +) + +func findMaxID(updates []*UpdateFull) int { + maxID := -1 + for _, value := range updates { + if id := value.SourceHubUpdateID; id > maxID { + maxID = id + } + } + return maxID +} + +func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (lastID int, err error) { + lastID = -1 + + var resp *http.Response + resp, err = client.Get(baseUrl + "/api/v1/lastupdate/" + hubUUID) + if err != nil { + s5l.Printf("srv|fwd: querying for lastupdate failed: %v", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", resp.StatusCode) + return + } + + result := WebLastUpdateIDResponse{} + if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { + s5l.Printf("srv|fwd: remote hub failed to fulfill query for lastupdate: %v", err) + return + } + + lastID = result.LastUpdateID + return +} + +func fwdWriteUpdates(updates []*UpdateFull, pw *io.PipeWriter) { + defer pw.Close() + + enc, err := NewStatefulEncoder(pw) + if err != nil { + s5l.Printf("srv|fwd: encoding/sending init message failed: %v", err) + return + } + for _, upd := range updates { + if err := enc.Encode(upd); err != nil { + s5l.Printf("srv|fwd: encoding/sending updates failed: %v", err) + return + } + } +} + +func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, error) { + resp, err := client.Post(url, "application/json", pr) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return 0, errors.New("") + } + result := WebUpdatesPostResponse{} + if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { + return 0, err + } + return result.NumUpdates, nil +} + +func (srv *Server) forwardRun(baseUrl string, client *http.Client) { + url := baseUrl + "/api/v1/updates/_bulk" + hubUUID := srv.store.GetHubUUID() +tryResync: + for { + lastID, err := fwdGetLastUpdateID(baseUrl, client, hubUUID) + if err != nil { + s5l.Printf("srv|fwd: fetching lastupdate failed: %v", err) + time.Sleep(5 * time.Second) + continue tryResync + } + s5l.Printf("srv|fwd: new lastupdate: %d", lastID) + + nextBatch: + for { + select { + case ch := <-srv.forwarder.sfive: + close(ch) + return + default: + } + + updates, err := srv.store.GetUpdatesAfter(lastID, 5000) + if err != nil { + s5l.Printf("srv|fwd: reading updates failed: %v", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + pr, pw := io.Pipe() + go fwdWriteUpdates(updates, pw) + if num, err := fwdPostUpdates(client, url, pr); err != nil { + s5l.Printf("srv|fwd: sending updates failed: %v", err) + continue tryResync + } else if num != len(updates) { + s5l.Printf("srv|fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num) + continue tryResync + } + + lastID = findMaxID(updates) + s5l.Printf("srv|fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) + } + } +} + +func (srv *Server) forwardStop(ctx context.Context) (err error) { + if srv.forwarder.sfive == nil { + return nil + } + s5l.Printf("srv|fwd: shutting down") + stopCH := make(chan bool) + srv.forwarder.sfive <- stopCH + select { + case <-stopCH: + case <-ctx.Done(): + } + return nil +} + +func (srv *Server) ForwardSFive(cfg SFiveForwardConfig) { + s5l.Printf("srv|fwd: forwarding to '%s'", cfg.URL) + + srv.wgShutdown.Add(1) + go func() { + defer srv.wgShutdown.Done() + defer s5l.Println("srv|fwd: forwarder stopped") + + srv.forwarder.sfive = make(chan chan bool) + srv.forwardRun(cfg.URL, http.DefaultClient) + }() +} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 2a6a200..5668735 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -140,9 +140,9 @@ func (srv *Server) ServePipe(cfg PipeInterfaceConfig) (err error) { s5l.Printf("srv|pipe: listening on '%s'", cfg.ListenPath) - srv.wgInterfaces.Add(1) + srv.wgShutdown.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer srv.wgShutdown.Done() srv.pipeRun() }() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go index 65ff0e0..b72185d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipegram.go @@ -84,9 +84,9 @@ func (srv *Server) ServePipegram(cfg PipegramInterfaceConfig) (err error) { s5l.Printf("srv|pgram: listening on '%s'", cfg.ListenPath) - srv.wgInterfaces.Add(1) + srv.wgShutdown.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer srv.wgShutdown.Done() defer s5l.Println("srv|pgram: interface stopped listening") srv.pipegramRun() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 54f1d18..7094dc4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -434,9 +434,9 @@ func (srv *Server) ServeWeb(cfg WebInterfaceConfig) error { srv.interfaces.web = &http.Server{Handler: mux, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second} - srv.wgInterfaces.Add(1) + srv.wgShutdown.Add(1) go func() { - defer srv.wgInterfaces.Done() + defer srv.wgShutdown.Done() defer s5l.Println("srv|web: interface stopped listening") srv.webRun(ln.(*net.TCPListener)) -- cgit v1.2.3