From 8ce6d9a44b4fb7cbd0fe3666cc047d1b0a2faaab Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 25 Oct 2014 16:25:20 +0200 Subject: hub: Add support for getUpdates via actor. --- src/hub/src/spreadspace.org/sfive/s5srv.go | 20 ++++++++++++++++---- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 3e17e2c..15806d9 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -17,14 +17,19 @@ type queryStatsToken struct { response chan queryStatsResult } -type getUpdatesAfterResult struct { +type getUpdatesResult struct { values []StatisticsData err error } type getUpdatesAfterToken struct { id int - response chan getUpdatesAfterResult + response chan getUpdatesResult +} + +type getUpdatesToken struct { + filter *StatsFilter + response chan getUpdatesResult } type StatsSinkServer struct { @@ -35,6 +40,7 @@ type StatsSinkServer struct { appendManyData chan appendManyToken // chan []StatisticsData getStatsChan chan queryStatsToken getUpdatesAfterChan chan getUpdatesAfterToken + getUpdatesChan chan getUpdatesToken } func (self StatsSinkServer) appendActor() { @@ -69,7 +75,10 @@ func (self StatsSinkServer) appendActor() { token.response <- queryStatsResult{stats, err} case token := <-self.getUpdatesAfterChan: values, err := self.store.GetUpdatesAfter(token.id) - token.response <- getUpdatesAfterResult{values, err} + token.response <- getUpdatesResult{values, err} + case token := <-self.getUpdatesChan: + values, err := self.store.GetUpdates(token.filter) + token.response <- getUpdatesResult{values, err} } } } @@ -82,6 +91,8 @@ func (self StatsSinkServer) Close() { close(self.appendData) close(self.appendManyData) close(self.getStatsChan) + close(self.getUpdatesAfterChan) + close(self.getUpdatesChan) self.store.Close() } @@ -98,7 +109,8 @@ func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) { server.appendData = make(chan StatisticsData, 5) server.appendManyData = make(chan appendManyToken, 5) server.getStatsChan = make(chan queryStatsToken, 5) - server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 5) + server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) + server.getUpdatesChan = make(chan getUpdatesToken, 3) go server.appendActor() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 6b3e654..39dda24 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -61,7 +61,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( } func (self StatsSinkServer) getUpdatesAfterInvoke(id int) (values []StatisticsData, err error) { - token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesAfterResult, 1)} + token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)} defer close(token.response) self.getUpdatesAfterChan <- token res := <-token.response -- cgit v1.2.3