summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-16 02:19:14 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-16 02:19:14 +0200
commit9b20c0ec159bf3a74621a4208d3bbe79c6372c1a (patch)
tree16e2619b5ab505b852a2ae9a4ea1d289a3774f57 /src/hub
parentremove read only commands from server (diff)
first test with multiple append workers
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go44
1 files changed, 24 insertions, 20 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 7690e11..1df4f07 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -34,6 +34,7 @@ package sfive
import (
"errors"
+ "sync"
)
type appendToken struct {
@@ -50,7 +51,7 @@ type Server struct {
store Store
anonymization AnonymizationAlgo
quit chan bool
- done chan bool
+ done sync.WaitGroup
appendChan chan appendToken
appendManyChan chan appendManyToken
}
@@ -78,8 +79,7 @@ func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull {
return anonymized
}
-func (srv Server) appendActor() {
- defer func() { srv.done <- true }()
+func (srv Server) appendWorker() {
for {
select {
case <-srv.quit:
@@ -113,38 +113,42 @@ func (srv Server) AppendMany(updates []UpdateFull) error {
}
func (srv Server) Close() {
- s5l.Printf("server: shutting down\n")
- srv.quit <- true
- <-srv.done
+ s5l.Printf("server: shutting down")
close(srv.quit)
- close(srv.done)
+ srv.done.Wait()
close(srv.appendChan)
close(srv.appendManyChan)
srv.store.Close()
- s5l.Printf("server: finished\n")
+ s5l.Printf("server: finished")
}
-func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (server *Server, err error) {
+func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (srv *Server, err error) {
// TODO read configuration and create instance with correct settings
- server = &Server{}
- server.store, err = NewStore(dbPath, readOnly)
- if err != nil {
+ srv = &Server{}
+ if srv.store, err = NewStore(dbPath, readOnly); err != nil {
return
}
if anonymize {
- if server.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil {
+ if srv.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil {
err = errors.New("failed to initialize IP address anonymization: " + err.Error())
return
}
- s5l.Printf("using IP address anonymization: %s", server.anonymization)
+ s5l.Printf("using IP address anonymization: %s", srv.anonymization)
}
- server.quit = make(chan bool)
- server.done = make(chan bool)
- server.appendChan = make(chan appendToken, 32)
- server.appendManyChan = make(chan appendManyToken, 32)
- go server.appendActor()
- s5l.Printf("server: started\n")
+ srv.quit = make(chan bool)
+ srv.appendChan = make(chan appendToken, 32)
+ srv.appendManyChan = make(chan appendManyToken, 32)
+ for i := 0; i < 8; i = i + 1 {
+ srv.done.Add(1)
+ go func(idx int) {
+ defer srv.done.Done()
+ s5l.Printf("server: worker %d started", idx)
+ srv.appendWorker()
+ s5l.Printf("server: worker %d stopped", idx)
+ }(i)
+ }
+ s5l.Printf("server: started")
return
}