From 9b20c0ec159bf3a74621a4208d3bbe79c6372c1a Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 16 May 2017 02:19:14 +0200 Subject: first test with multiple append workers --- src/hub/src/spreadspace.org/sfive/s5srv.go | 44 ++++++++++++++++-------------- 1 file changed, 24 insertions(+), 20 deletions(-) (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 7690e11..1df4f07 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -34,6 +34,7 @@ package sfive import ( "errors" + "sync" ) type appendToken struct { @@ -50,7 +51,7 @@ type Server struct { store Store anonymization AnonymizationAlgo quit chan bool - done chan bool + done sync.WaitGroup appendChan chan appendToken appendManyChan chan appendManyToken } @@ -78,8 +79,7 @@ func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull { return anonymized } -func (srv Server) appendActor() { - defer func() { srv.done <- true }() +func (srv Server) appendWorker() { for { select { case <-srv.quit: @@ -113,38 +113,42 @@ func (srv Server) AppendMany(updates []UpdateFull) error { } func (srv Server) Close() { - s5l.Printf("server: shutting down\n") - srv.quit <- true - <-srv.done + s5l.Printf("server: shutting down") close(srv.quit) - close(srv.done) + srv.done.Wait() close(srv.appendChan) close(srv.appendManyChan) srv.store.Close() - s5l.Printf("server: finished\n") + s5l.Printf("server: finished") } -func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (server *Server, err error) { +func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (srv *Server, err error) { // TODO read configuration and create instance with correct settings - server = &Server{} - server.store, err = NewStore(dbPath, readOnly) - if err != nil { + srv = &Server{} + if srv.store, err = NewStore(dbPath, readOnly); err != nil { return } if anonymize { - if server.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil { + if srv.anonymization, err = NewCryptopanAnonymization(anonKeyfile); err != nil { err = errors.New("failed to initialize IP address anonymization: " + err.Error()) return } - s5l.Printf("using IP address anonymization: %s", server.anonymization) + s5l.Printf("using IP address anonymization: %s", srv.anonymization) } - server.quit = make(chan bool) - server.done = make(chan bool) - server.appendChan = make(chan appendToken, 32) - server.appendManyChan = make(chan appendManyToken, 32) - go server.appendActor() - s5l.Printf("server: started\n") + srv.quit = make(chan bool) + srv.appendChan = make(chan appendToken, 32) + srv.appendManyChan = make(chan appendManyToken, 32) + for i := 0; i < 8; i = i + 1 { + srv.done.Add(1) + go func(idx int) { + defer srv.done.Done() + s5l.Printf("server: worker %d started", idx) + srv.appendWorker() + s5l.Printf("server: worker %d stopped", idx) + }(i) + } + s5l.Printf("server: started") return } -- cgit v1.2.3