diff options
Diffstat (limited to 'src/hub')
6 files changed, 17 insertions, 86 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 87b2c0d..7690e11 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -46,44 +46,13 @@ type appendManyToken struct { response chan error } -type getUpdatesResult struct { - updates []UpdateFull - err error -} - -type getUpdatesAfterToken struct { - id int - limit int - response chan getUpdatesResult -} - -type getHubUuidResult struct { - id string -} - -type getHubUuidToken struct { - response chan getHubUuidResult -} - -type getLastUpdateIdResult struct { - id int - err error -} - -type getLastUpdateIdToken struct { - response chan getLastUpdateIdResult -} - type Server struct { - store Store - anonymization AnonymizationAlgo - quit chan bool - done chan bool - appendChan chan appendToken - appendManyChan chan appendManyToken - getUpdatesAfterChan chan getUpdatesAfterToken - getHubUuidChan chan getHubUuidToken - getLastUpdateIdChan chan getLastUpdateIdToken + store Store + anonymization AnonymizationAlgo + quit chan bool + done chan bool + appendChan chan appendToken + appendManyChan chan appendManyToken } func (srv Server) Anonymize(update UpdateFull) UpdateFull { @@ -125,14 +94,6 @@ func (srv Server) appendActor() { token.updates = srv.AnonymizeMany(token.updates) } token.response <- srv.store.AppendMany(token.updates) - case token := <-srv.getUpdatesAfterChan: - values, err := srv.store.GetUpdatesAfter(token.id, token.limit) - token.response <- getUpdatesResult{values, err} - case token := <-srv.getHubUuidChan: - token.response <- getHubUuidResult{srv.store.GetHubUuid()} - case token := <-srv.getLastUpdateIdChan: - lastUpdateId, err := srv.store.GetLastUpdateId() - token.response <- getLastUpdateIdResult{lastUpdateId, err} } } } @@ -151,30 +112,6 @@ func (srv Server) AppendMany(updates []UpdateFull) error { return <-token.response } -func (srv Server) GetUpdatesAfter(id, limit int) ([]UpdateFull, error) { - token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)} - defer close(token.response) - srv.getUpdatesAfterChan <- token - res := <-token.response - return res.updates, res.err -} - -func (srv Server) GetHubUuid() string { - token := getHubUuidToken{response: make(chan getHubUuidResult, 1)} - defer close(token.response) - srv.getHubUuidChan <- token - res := <-token.response - return res.id -} - -func (srv Server) GetLastUpdateId() (int, error) { - token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)} - defer close(token.response) - srv.getLastUpdateIdChan <- token - res := <-token.response - return res.id, res.err -} - func (srv Server) Close() { s5l.Printf("server: shutting down\n") srv.quit <- true @@ -183,9 +120,6 @@ func (srv Server) Close() { close(srv.done) close(srv.appendChan) close(srv.appendManyChan) - close(srv.getUpdatesAfterChan) - close(srv.getHubUuidChan) - close(srv.getLastUpdateIdChan) srv.store.Close() s5l.Printf("server: finished\n") } @@ -210,9 +144,6 @@ func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (ser server.done = make(chan bool) server.appendChan = make(chan appendToken, 32) server.appendManyChan = make(chan appendManyToken, 32) - server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32) - server.getHubUuidChan = make(chan getHubUuidToken, 32) - server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32) go server.appendActor() s5l.Printf("server: started\n") return diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 2d70e32..8ee3999 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -111,7 +111,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er func (srv Server) forwardRun(baseUrl string, client *http.Client) { url := baseUrl + "/updates/_bulk" - hubUuid := srv.GetHubUuid() + hubUuid := srv.store.GetHubUuid() tryResync: for { lastId, err := fwdGetLastUpdateId(baseUrl, client, hubUuid) @@ -124,7 +124,7 @@ tryResync: nextBatch: for { - updates, err := srv.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd: failed reading updates: %v", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index a7931f2..2d3cf15 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -100,7 +100,7 @@ func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) ( func (srv Server) forwardEsRun(baseUrl string, client *http.Client) { url := baseUrl + "/_bulk" - hubUuid := srv.GetHubUuid() + hubUuid := srv.store.GetHubUuid() tryResync: for { lastId, err := fwdEsGetLastUpdateId(baseUrl, client, hubUuid) @@ -113,7 +113,7 @@ tryResync: nextBatch: for { - updates, err := srv.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd-es: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index 1b8e818..c972cbc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -49,7 +49,7 @@ tryResync: continue tryResync } - lastId, err := srv.GetLastUpdateId() + lastId, err := srv.store.GetLastUpdateId() if err != nil { s5l.Printf("fwd-graphite: lastupdate returned err: %v", err) client.Disconnect() @@ -60,7 +60,7 @@ tryResync: nextBatch: for { - updates, err := srv.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd-graphite: failed reading updates: %v", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 017e2ea..9fa1dcc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -56,11 +56,11 @@ func fwdPiwikGetLastUpdateId(piwikURL, siteURL string, siteID uint, token string } func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { - // hubUuid := srv.GetHubUuid() + // hubUuid := srv.store.GetHubUuid() tryResync: for { // lastId, err := srv.forwardPiwikGetLastUpdateId(piwikURL, siteURL, siteID, token, client, hubUuid) - lastId, err := srv.GetLastUpdateId() + lastId, err := srv.store.GetLastUpdateId() if err != nil { s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -70,7 +70,7 @@ tryResync: nextBatch: for { - updates, err := srv.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd-piwik: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index b9752ba..21f4083 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -60,7 +60,7 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { } resp := WebHealthzResponse{} - resp.HubUuid = srv.GetHubUuid() + resp.HubUuid = srv.store.GetHubUuid() resp.Status = "OK" // TODO: do a more sophisticated check sendWebResponse(w, http.StatusOK, resp) } @@ -306,7 +306,7 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) { var err error resp := WebLastUpdateIdResponse{} - resp.HubUuid = srv.GetHubUuid() + resp.HubUuid = srv.store.GetHubUuid() if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return |