summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-25 15:59:02 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-25 15:59:02 +0200
commit30570e1bc9c040c54e79b4ff5ba97be1caadaca3 (patch)
treebeb35a7334f722497f860a549fe941dafe38d3ef /src
parenthub: Remove go get of unused library (diff)
hub: Use actor for Stats and GetUpdatesAfter queries.
- solves the "database is locked" problem when running forward
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go43
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go14
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go11
3 files changed, 59 insertions, 9 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 0bd220a..3e17e2c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -7,12 +7,34 @@ type appendManyToken struct {
response chan bool
}
+type queryStatsResult struct {
+ stats StatsResult
+ err error
+}
+
+type queryStatsToken struct {
+ filter *StatsFilter
+ response chan queryStatsResult
+}
+
+type getUpdatesAfterResult struct {
+ values []StatisticsData
+ err error
+}
+
+type getUpdatesAfterToken struct {
+ id int
+ response chan getUpdatesAfterResult
+}
+
type StatsSinkServer struct {
- store sqliteStore
- quit chan bool
- done chan bool
- appendData chan StatisticsData
- appendManyData chan appendManyToken // chan []StatisticsData
+ store sqliteStore
+ quit chan bool
+ done chan bool
+ appendData chan StatisticsData
+ appendManyData chan appendManyToken // chan []StatisticsData
+ getStatsChan chan queryStatsToken
+ getUpdatesAfterChan chan getUpdatesAfterToken
}
func (self StatsSinkServer) appendActor() {
@@ -42,6 +64,12 @@ func (self StatsSinkServer) appendActor() {
} else {
token.response <- true
}
+ case token := <-self.getStatsChan:
+ stats, err := self.store.GetStats(token.filter)
+ token.response <- queryStatsResult{stats, err}
+ case token := <-self.getUpdatesAfterChan:
+ values, err := self.store.GetUpdatesAfter(token.id)
+ token.response <- getUpdatesAfterResult{values, err}
}
}
}
@@ -53,6 +81,7 @@ func (self StatsSinkServer) Close() {
close(self.done)
close(self.appendData)
close(self.appendManyData)
+ close(self.getStatsChan)
self.store.Close()
}
@@ -66,8 +95,10 @@ func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) {
server.quit = make(chan bool)
server.done = make(chan bool)
- server.appendData = make(chan StatisticsData, 100)
+ server.appendData = make(chan StatisticsData, 5)
server.appendManyData = make(chan appendManyToken, 5)
+ server.getStatsChan = make(chan queryStatsToken, 5)
+ server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 5)
go server.appendActor()
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 00542a4..6b3e654 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -60,6 +60,16 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
+func (self StatsSinkServer) getUpdatesAfterInvoke(id int) (values []StatisticsData, err error) {
+ token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesAfterResult, 1)}
+ defer close(token.response)
+ self.getUpdatesAfterChan <- token
+ res := <-token.response
+ values = res.values
+ err = res.err
+ return
+}
+
func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
@@ -76,7 +86,7 @@ tryResync:
nextBatch:
for {
- updates, err := self.store.GetUpdatesAfter(lastId)
+ updates, err := self.getUpdatesAfterInvoke(lastId)
if err != nil {
s5l.Printf("fwd: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
@@ -115,7 +125,7 @@ tryResync:
lastId = findMaxId(updates)
s5l.Printf("fwd: new lastid: %d", lastId)
- time.Sleep(2 * time.Second)
+ time.Sleep(1 * time.Second)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 0013ea4..d657f8b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -197,7 +197,16 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite
func (self StatsSinkServer) getStats(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "stats"
filter := getFilter(r)
- values, err := self.store.GetStats(&filter)
+
+ token := queryStatsToken{filter: &filter, response: make(chan queryStatsResult, 1)}
+ defer close(token.response)
+ self.getStatsChan <- token
+ res := <-token.response
+ values := res.stats
+ err := res.err
+
+ // values, err := self.store.GetStats(&filter)
+
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return