diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srv.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 59 |
1 files changed, 36 insertions, 23 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 220a7ae..b91d5c5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -33,16 +33,14 @@ package sfive import ( + "context" "errors" + "net/http" "runtime" "sync" "time" ) -var ( - ErrShutdownInProgress = errors.New("shutdown in progress") -) - type ingestToken struct { updates []*UpdateFull response chan error @@ -53,12 +51,14 @@ type Server struct { numWorker int anonymization AnonymizationAlgo geoip GeoIPLookup - quit chan bool wgWorker *sync.WaitGroup ingestChan chan ingestToken + interfaces struct { + web *http.Server + } } -func (srv Server) transform(update *UpdateFull) *UpdateFull { +func (srv *Server) transform(update *UpdateFull) *UpdateFull { bytesSentTotal := uint(0) clients := []Client{} for _, client := range update.Data.Clients { @@ -100,13 +100,13 @@ func (srv Server) transform(update *UpdateFull) *UpdateFull { return update } -func (srv Server) transformMany(updates []*UpdateFull) { +func (srv *Server) transformMany(updates []*UpdateFull) { for _, update := range updates { srv.transform(update) } } -func (srv Server) ingestWorker(idx int) { +func (srv *Server) ingestWorker(idx int) { for { select { case token, ok := <-srv.ingestChan: @@ -119,31 +119,44 @@ func (srv Server) ingestWorker(idx int) { } } -func (srv Server) Ingest(update *UpdateFull) error { +func (srv *Server) Ingest(update *UpdateFull) error { return srv.IngestMany([]*UpdateFull{update}) } -func (srv Server) IngestMany(updates []*UpdateFull) error { - if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it - return ErrShutdownInProgress - } - +func (srv *Server) IngestMany(updates []*UpdateFull) error { token := ingestToken{updates: updates, response: make(chan error, 1)} defer close(token.response) srv.ingestChan <- token return <-token.response } -func (srv Server) Close() { - s5l.Printf("srv: shutting down") +func (srv *Server) shutdownInterfaces() (errors int) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + c := make(chan error) + go func() { c <- srv.webStop(ctx) }() + //go func() { c <- srv.pipeStope(ctx) }() // TODO: add this as soon as pipe interface can be stopped + //go func() { c <- srv.pipegramStop(ctx) }() // TODO: add this as soon as pipegram interface can be stopped + + errors = 0 + for i := 0; i < 1; i++ { // TODO: set limit to 3 when the above has been enabled + if err := <-c; err != nil { + s5l.Printf("srv: interface shutdown failed failed: %v", err) + errors++ + } + } + close(c) // closing channel here in the hopes that this leads to a panic + // in case the number of channel reads (for loop above) is) doesn't match the + // number of interfaces + cancel() + return +} - // TODO: enable this as soon as interfaces can be told to stop accepting data - // and yield wgInterfaces once the last client connection is done - // close(srv.quit) // tell interfaces to don't accept new data - // srv.wgInterfaces.Wait() // wait for interfaces to finish up +func (srv *Server) Close() { + s5l.Printf("srv: shutting down") - srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it - time.Sleep(time.Second) // this is quite ugly but must be good enough for now + if errors := srv.shutdownInterfaces(); errors != 0 { + s5l.Printf("srv: shutdown of at least one interface failed, this is an unclean shutdown!!!") + } close(srv.ingestChan) // close ingest channel to tell worker to stop srv.wgWorker.Wait() // wait for worker to finish up @@ -178,7 +191,7 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { if cfg.Workers > 0 { srv.numWorker = cfg.Workers } - srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works) + srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { |