summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-07-07 01:59:11 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-07-07 01:59:11 +0200
commita6d2038221262439f0aba03f137a8d7f46062ca5 (patch)
treefd063a5d80659fe880b58687670ee9e22041aabf
parentmake worker configurable (diff)
server shutdown is a little better now (still needs some thoughts though)
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go79
1 files changed, 39 insertions, 40 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index adfd2da..220a7ae 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -36,27 +36,26 @@ import (
"errors"
"runtime"
"sync"
+ "time"
)
-type ingestToken struct {
- update *UpdateFull
- response chan error
-}
+var (
+ ErrShutdownInProgress = errors.New("shutdown in progress")
+)
-type ingestManyToken struct {
+type ingestToken struct {
updates []*UpdateFull
response chan error
}
type Server struct {
- store *Store
- numWorker int
- anonymization AnonymizationAlgo
- geoip GeoIPLookup
- quit chan bool
- done *sync.WaitGroup
- ingestChan chan ingestToken
- ingestManyChan chan ingestManyToken
+ store *Store
+ numWorker int
+ anonymization AnonymizationAlgo
+ geoip GeoIPLookup
+ quit chan bool
+ wgWorker *sync.WaitGroup
+ ingestChan chan ingestToken
}
func (srv Server) transform(update *UpdateFull) *UpdateFull {
@@ -110,12 +109,10 @@ func (srv Server) transformMany(updates []*UpdateFull) {
func (srv Server) ingestWorker(idx int) {
for {
select {
- case <-srv.quit:
- return
- case token := <-srv.ingestChan:
- srv.transform(token.update)
- token.response <- srv.store.Append(token.update)
- case token := <-srv.ingestManyChan:
+ case token, ok := <-srv.ingestChan:
+ if !ok {
+ return
+ }
srv.transformMany(token.updates)
token.response <- srv.store.AppendMany(token.updates)
}
@@ -123,31 +120,34 @@ func (srv Server) ingestWorker(idx int) {
}
func (srv Server) Ingest(update *UpdateFull) error {
- token := ingestToken{update: update, response: make(chan error, 1)}
- defer close(token.response)
- srv.ingestChan <- token
- return <-token.response
+ return srv.IngestMany([]*UpdateFull{update})
}
func (srv Server) IngestMany(updates []*UpdateFull) error {
- token := ingestManyToken{updates: updates, response: make(chan error, 1)}
+ if len(srv.quit) > 0 { // check if there is at least one element on the channel without consuming it
+ return ErrShutdownInProgress
+ }
+
+ token := ingestToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
- srv.ingestManyChan <- token
+ srv.ingestChan <- token
return <-token.response
}
func (srv Server) Close() {
s5l.Printf("srv: shutting down")
- close(srv.quit)
- srv.done.Wait()
- // TODO: shutdown procedure is flawed:
- // - there might still be data in the ingest and ingestMany channels
- // when issuing quit -> data loss!!
- // - if interfaces ingets new data when ingest channels are alreday closed
- // they produce a panic!!
-
- close(srv.ingestChan)
- close(srv.ingestManyChan)
+
+ // TODO: enable this as soon as interfaces can be told to stop accepting data
+ // and yield wgInterfaces once the last client connection is done
+ // close(srv.quit) // tell interfaces to don't accept new data
+ // srv.wgInterfaces.Wait() // wait for interfaces to finish up
+
+ srv.quit <- true // this will checked by Ingest() and IngestMany without consuming it
+ time.Sleep(time.Second) // this is quite ugly but must be good enough for now
+
+ close(srv.ingestChan) // close ingest channel to tell worker to stop
+ srv.wgWorker.Wait() // wait for worker to finish up
+
srv.store.Close()
s5l.Printf("srv: finished")
}
@@ -178,14 +178,13 @@ func NewServer(cfg SrvConfig) (srv *Server, err error) {
if cfg.Workers > 0 {
srv.numWorker = cfg.Workers
}
- srv.quit = make(chan bool)
- srv.done = &sync.WaitGroup{}
+ srv.quit = make(chan bool, 1) // this will never be consumed (until interface cleanup actually works)
+ srv.wgWorker = &sync.WaitGroup{}
srv.ingestChan = make(chan ingestToken, srv.numWorker)
- srv.ingestManyChan = make(chan ingestManyToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
- srv.done.Add(1)
+ srv.wgWorker.Add(1)
go func(idx int) {
- defer srv.done.Done()
+ defer srv.wgWorker.Done()
srv.ingestWorker(idx)
}(i)
}