From 30570e1bc9c040c54e79b4ff5ba97be1caadaca3 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 25 Oct 2014 15:59:02 +0200 Subject: hub: Use actor for Stats and GetUpdatesAfter queries. - solves the "database is locked" problem when running forward --- src/hub/src/spreadspace.org/sfive/s5srv.go | 43 +++++++++++++++++++---- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 14 ++++++-- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 11 +++++- 3 files changed, 59 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 0bd220a..3e17e2c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -7,12 +7,34 @@ type appendManyToken struct { response chan bool } +type queryStatsResult struct { + stats StatsResult + err error +} + +type queryStatsToken struct { + filter *StatsFilter + response chan queryStatsResult +} + +type getUpdatesAfterResult struct { + values []StatisticsData + err error +} + +type getUpdatesAfterToken struct { + id int + response chan getUpdatesAfterResult +} + type StatsSinkServer struct { - store sqliteStore - quit chan bool - done chan bool - appendData chan StatisticsData - appendManyData chan appendManyToken // chan []StatisticsData + store sqliteStore + quit chan bool + done chan bool + appendData chan StatisticsData + appendManyData chan appendManyToken // chan []StatisticsData + getStatsChan chan queryStatsToken + getUpdatesAfterChan chan getUpdatesAfterToken } func (self StatsSinkServer) appendActor() { @@ -42,6 +64,12 @@ func (self StatsSinkServer) appendActor() { } else { token.response <- true } + case token := <-self.getStatsChan: + stats, err := self.store.GetStats(token.filter) + token.response <- queryStatsResult{stats, err} + case token := <-self.getUpdatesAfterChan: + values, err := self.store.GetUpdatesAfter(token.id) + token.response <- getUpdatesAfterResult{values, err} } } } @@ -53,6 +81,7 @@ func (self StatsSinkServer) Close() { close(self.done) close(self.appendData) close(self.appendManyData) + close(self.getStatsChan) self.store.Close() } @@ -66,8 +95,10 @@ func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) { server.quit = make(chan bool) server.done = make(chan bool) - server.appendData = make(chan StatisticsData, 100) + server.appendData = make(chan StatisticsData, 5) server.appendManyData = make(chan appendManyToken, 5) + server.getStatsChan = make(chan queryStatsToken, 5) + server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 5) go server.appendActor() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 00542a4..6b3e654 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -60,6 +60,16 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } +func (self StatsSinkServer) getUpdatesAfterInvoke(id int) (values []StatisticsData, err error) { + token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesAfterResult, 1)} + defer close(token.response) + self.getUpdatesAfterChan <- token + res := <-token.response + values = res.values + err = res.err + return +} + func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: @@ -76,7 +86,7 @@ tryResync: nextBatch: for { - updates, err := self.store.GetUpdatesAfter(lastId) + updates, err := self.getUpdatesAfterInvoke(lastId) if err != nil { s5l.Printf("fwd: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -115,7 +125,7 @@ tryResync: lastId = findMaxId(updates) s5l.Printf("fwd: new lastid: %d", lastId) - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 0013ea4..d657f8b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -197,7 +197,16 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite func (self StatsSinkServer) getStats(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "stats" filter := getFilter(r) - values, err := self.store.GetStats(&filter) + + token := queryStatsToken{filter: &filter, response: make(chan queryStatsResult, 1)} + defer close(token.response) + self.getStatsChan <- token + res := <-token.response + values := res.stats + err := res.err + + // values, err := self.store.GetStats(&filter) + if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return -- cgit v1.2.3