diff options
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 7690e11..b643047 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -34,6 +34,8 @@ package sfive import ( "errors" + "runtime" + "sync" ) type appendToken struct { @@ -50,7 +52,7 @@ type Server struct { store Store anonymization AnonymizationAlgo quit chan bool - done chan bool + done *sync.WaitGroup appendChan chan appendToken appendManyChan chan appendManyToken } @@ -78,8 +80,10 @@ func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull { return anonymized } -func (srv Server) appendActor() { - defer func() { srv.done <- true }() +// TODO: do a sanity check for input data +// TODO: add GeoIP lookup befor anonymization + +func (srv Server) appendWorker(idx int) { for { select { case <-srv.quit: @@ -113,38 +117,42 @@ func (srv Server) AppendMany(updates []UpdateFull) error { } func (srv Server) Close() { - s5l.Printf("server: shutting down\n") - srv.quit <- true - <-srv.done + s5l.Printf("server: shutting down") close(srv.quit) - close(srv.done) + srv.done.Wait() + close(srv.appendChan) close(srv.appendManyChan) srv.store.Close() - s5l.Printf("server: finished\n") + s5l.Printf("server: finished") } -func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (server *Server, err error) { - // TODO read configuration and create instance with correct settings - server = &Server{} - server.store, err = NewStore(dbPath, readOnly) - if err != nil { +func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (srv *Server, err error) { + // TODO: read configuration and create instance with correct settings + srv = &Server{} + if srv.store, err = NewStore(dbPath, readOnly); err != nil { return } if anonymize { - if server.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil { + if srv.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil { err = errors.New("failed to initialize IP address anonymization: " + err.Error()) return } - s5l.Printf("using IP address anonymization: %s", server.anonymization) + s5l.Printf("using IP address anonymization: %s", srv.anonymization) } - server.quit = make(chan bool) - server.done = make(chan bool) - server.appendChan = make(chan appendToken, 32) - server.appendManyChan = make(chan appendManyToken, 32) - go server.appendActor() - s5l.Printf("server: started\n") + srv.quit = make(chan bool) + srv.done = &sync.WaitGroup{} + srv.appendChan = make(chan appendToken, 32) + srv.appendManyChan = make(chan appendManyToken, 32) + for i := 0; i < runtime.NumCPU(); i = i + 1 { + srv.done.Add(1) + go func(idx int) { + defer srv.done.Done() + srv.appendWorker(idx) + }(i) + } + s5l.Printf("server: started") return } |