diff options
author | Markus Grüneis <gimpf@gimpf.org> | 2014-10-25 16:25:20 +0200 |
---|---|---|
committer | Markus Grüneis <gimpf@gimpf.org> | 2014-10-25 16:25:20 +0200 |
commit | 8ce6d9a44b4fb7cbd0fe3666cc047d1b0a2faaab (patch) | |
tree | 6ab331ffc000cc01eac6be80ce9d23049a1e6864 /src | |
parent | hub: Get 5000 instead of 200 updates for forwarder at once. (diff) |
hub: Add support for getUpdates via actor.
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 20 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 2 |
2 files changed, 17 insertions, 5 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 3e17e2c..15806d9 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -17,14 +17,19 @@ type queryStatsToken struct { response chan queryStatsResult } -type getUpdatesAfterResult struct { +type getUpdatesResult struct { values []StatisticsData err error } type getUpdatesAfterToken struct { id int - response chan getUpdatesAfterResult + response chan getUpdatesResult +} + +type getUpdatesToken struct { + filter *StatsFilter + response chan getUpdatesResult } type StatsSinkServer struct { @@ -35,6 +40,7 @@ type StatsSinkServer struct { appendManyData chan appendManyToken // chan []StatisticsData getStatsChan chan queryStatsToken getUpdatesAfterChan chan getUpdatesAfterToken + getUpdatesChan chan getUpdatesToken } func (self StatsSinkServer) appendActor() { @@ -69,7 +75,10 @@ func (self StatsSinkServer) appendActor() { token.response <- queryStatsResult{stats, err} case token := <-self.getUpdatesAfterChan: values, err := self.store.GetUpdatesAfter(token.id) - token.response <- getUpdatesAfterResult{values, err} + token.response <- getUpdatesResult{values, err} + case token := <-self.getUpdatesChan: + values, err := self.store.GetUpdates(token.filter) + token.response <- getUpdatesResult{values, err} } } } @@ -82,6 +91,8 @@ func (self StatsSinkServer) Close() { close(self.appendData) close(self.appendManyData) close(self.getStatsChan) + close(self.getUpdatesAfterChan) + close(self.getUpdatesChan) self.store.Close() } @@ -98,7 +109,8 @@ func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) { server.appendData = make(chan StatisticsData, 5) server.appendManyData = make(chan appendManyToken, 5) server.getStatsChan = make(chan queryStatsToken, 5) - server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 5) + server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) + server.getUpdatesChan = make(chan getUpdatesToken, 3) go server.appendActor() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 6b3e654..39dda24 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -61,7 +61,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( } func (self StatsSinkServer) getUpdatesAfterInvoke(id int) (values []StatisticsData, err error) { - token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesAfterResult, 1)} + token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)} defer close(token.response) self.getUpdatesAfterChan <- token res := <-token.response |