summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-18 13:21:52 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-18 13:21:52 +0200
commit46d8d989c95ad5f54873e4c5982881b9009f751e (patch)
tree0111510fc9150ca19e77f9497bd26a8cefaf3c1e /src
parentfirst test with multiple append workers (diff)
fixed multi worker append
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go18
1 files changed, 11 insertions, 7 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 1df4f07..b643047 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -34,6 +34,7 @@ package sfive
import (
"errors"
+ "runtime"
"sync"
)
@@ -51,7 +52,7 @@ type Server struct {
store Store
anonymization AnonymizationAlgo
quit chan bool
- done sync.WaitGroup
+ done *sync.WaitGroup
appendChan chan appendToken
appendManyChan chan appendManyToken
}
@@ -79,7 +80,10 @@ func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull {
return anonymized
}
-func (srv Server) appendWorker() {
+// TODO: do a sanity check for input data
+// TODO: add GeoIP lookup befor anonymization
+
+func (srv Server) appendWorker(idx int) {
for {
select {
case <-srv.quit:
@@ -116,6 +120,7 @@ func (srv Server) Close() {
s5l.Printf("server: shutting down")
close(srv.quit)
srv.done.Wait()
+
close(srv.appendChan)
close(srv.appendManyChan)
srv.store.Close()
@@ -123,7 +128,7 @@ func (srv Server) Close() {
}
func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (srv *Server, err error) {
- // TODO read configuration and create instance with correct settings
+ // TODO: read configuration and create instance with correct settings
srv = &Server{}
if srv.store, err = NewStore(dbPath, readOnly); err != nil {
return
@@ -138,15 +143,14 @@ func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (srv
}
srv.quit = make(chan bool)
+ srv.done = &sync.WaitGroup{}
srv.appendChan = make(chan appendToken, 32)
srv.appendManyChan = make(chan appendManyToken, 32)
- for i := 0; i < 8; i = i + 1 {
+ for i := 0; i < runtime.NumCPU(); i = i + 1 {
srv.done.Add(1)
go func(idx int) {
defer srv.done.Done()
- s5l.Printf("server: worker %d started", idx)
- srv.appendWorker()
- s5l.Printf("server: worker %d stopped", idx)
+ srv.appendWorker(idx)
}(i)
}
s5l.Printf("server: started")