diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-07-07 01:59:11 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-07-07 01:59:11 +0200 |
commit | a6d2038221262439f0aba03f137a8d7f46062ca5 (patch) | |
tree | fd063a5d80659fe880b58687670ee9e22041aabf | |
parent | make worker configurable (diff) |
server shutdown is a little better now (still needs some thoughts though)
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 79 |
1 files changed, 39 insertions, 40 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index adfd2da..220a7ae 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -36,27 +36,26 @@ import ( "errors" "runtime" "sync" + "time" ) -type ingestToken struct { - update *UpdateFull - response chan error -} +var ( + ErrShutdownInProgress = errors.New("shutdown in progress") +) -type ingestManyToken struct { +type ingestToken struct { updates []*UpdateFull response chan error } type Server struct { - store *Store - numWorker int - anonymization AnonymizationAlgo - geoip GeoIPLookup - quit chan bool - done *sync.WaitGroup - ingestChan chan ingestToken - ingestManyChan chan ingestManyToken + store *Store + numWorker int + anonymization AnonymizationAlgo + geoip GeoIPLookup + quit chan bool + wgWorker *sync.WaitGroup + ingestChan chan ingestToken } func (srv Server) transform(update *UpdateFull) *UpdateFull { @@ -110,12 +109,10 @@ func (srv Server) transformMany(updates []*UpdateFull) { func (srv Server) ingestWorker(idx int) { for { select { - case <-srv.quit: - return - case token := <-srv.ingestChan: - srv.transform(token.update) - token.response <- srv.store.Append(token.update) - case token := <-srv.ingestManyChan: + case token, ok := <-srv.ingestChan: + if !ok { + return + } srv.transformMany(token.updates) token.response <- srv.store.AppendMany(token.updates) } @@ -123,31 +120,34 @@ func (srv Server) ingestWorker(idx int) { } func (srv Server) Ingest(update *UpdateFull) error { - token := ingestToken{update: update, response: make(chan error, 1)} - defer close(token.response) - srv.ingestChan <- token - return <-token.response + return srv.IngestMany([]*UpdateFull{update}) } func (srv Server) IngestMany(updates []*UpdateFull) error { - token := ingestManyToken{updates: updates, response: make(chan error, 1)} + if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it + return ErrShutdownInProgress + } + + token := ingestToken{updates: updates, response: make(chan error, 1)} defer close(token.response) - srv.ingestManyChan <- token + srv.ingestChan <- token return <-token.response } func (srv Server) Close() { s5l.Printf("srv: shutting down") - close(srv.quit) - srv.done.Wait() - // TODO: shutdown procedure is flawed: - // - there might still be data in the ingest and ingestMany channels - // when issuing quit -> data loss!! - // - if interfaces ingets new data when ingest channels are alreday closed - // they produce a panic!! - - close(srv.ingestChan) - close(srv.ingestManyChan) + + // TODO: enable this as soon as interfaces can be told to stop accepting data + // and yield wgInterfaces once the last client connection is done + // close(srv.quit) // tell interfaces to don't accept new data + // srv.wgInterfaces.Wait() // wait for interfaces to finish up + + srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it + time.Sleep(time.Second) // this is quite ugly but must be good enough for now + + close(srv.ingestChan) // close ingest channel to tell worker to stop + srv.wgWorker.Wait() // wait for worker to finish up + srv.store.Close() s5l.Printf("srv: finished") } @@ -178,14 +178,13 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) { if cfg.Workers > 0 { srv.numWorker = cfg.Workers } - srv.quit = make(chan bool) - srv.done = &sync.WaitGroup{} + srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works) + srv.wgWorker = &sync.WaitGroup{} srv.ingestChan = make(chan ingestToken, srv.numWorker) - srv.ingestManyChan = make(chan ingestManyToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { - srv.done.Add(1) + srv.wgWorker.Add(1) go func(idx int) { - defer srv.done.Done() + defer srv.wgWorker.Done() srv.ingestWorker(idx) }(i) } |