summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go81
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go6
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go4
6 files changed, 17 insertions, 86 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 87b2c0d..7690e11 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -46,44 +46,13 @@ type appendManyToken struct {
response chan error
}
-type getUpdatesResult struct {
- updates []UpdateFull
- err error
-}
-
-type getUpdatesAfterToken struct {
- id int
- limit int
- response chan getUpdatesResult
-}
-
-type getHubUuidResult struct {
- id string
-}
-
-type getHubUuidToken struct {
- response chan getHubUuidResult
-}
-
-type getLastUpdateIdResult struct {
- id int
- err error
-}
-
-type getLastUpdateIdToken struct {
- response chan getLastUpdateIdResult
-}
-
type Server struct {
- store Store
- anonymization AnonymizationAlgo
- quit chan bool
- done chan bool
- appendChan chan appendToken
- appendManyChan chan appendManyToken
- getUpdatesAfterChan chan getUpdatesAfterToken
- getHubUuidChan chan getHubUuidToken
- getLastUpdateIdChan chan getLastUpdateIdToken
+ store Store
+ anonymization AnonymizationAlgo
+ quit chan bool
+ done chan bool
+ appendChan chan appendToken
+ appendManyChan chan appendManyToken
}
func (srv Server) Anonymize(update UpdateFull) UpdateFull {
@@ -125,14 +94,6 @@ func (srv Server) appendActor() {
token.updates = srv.AnonymizeMany(token.updates)
}
token.response <- srv.store.AppendMany(token.updates)
- case token := <-srv.getUpdatesAfterChan:
- values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
- token.response <- getUpdatesResult{values, err}
- case token := <-srv.getHubUuidChan:
- token.response <- getHubUuidResult{srv.store.GetHubUuid()}
- case token := <-srv.getLastUpdateIdChan:
- lastUpdateId, err := srv.store.GetLastUpdateId()
- token.response <- getLastUpdateIdResult{lastUpdateId, err}
}
}
}
@@ -151,30 +112,6 @@ func (srv Server) AppendMany(updates []UpdateFull) error {
return <-token.response
}
-func (srv Server) GetUpdatesAfter(id, limit int) ([]UpdateFull, error) {
- token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)}
- defer close(token.response)
- srv.getUpdatesAfterChan <- token
- res := <-token.response
- return res.updates, res.err
-}
-
-func (srv Server) GetHubUuid() string {
- token := getHubUuidToken{response: make(chan getHubUuidResult, 1)}
- defer close(token.response)
- srv.getHubUuidChan <- token
- res := <-token.response
- return res.id
-}
-
-func (srv Server) GetLastUpdateId() (int, error) {
- token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)}
- defer close(token.response)
- srv.getLastUpdateIdChan <- token
- res := <-token.response
- return res.id, res.err
-}
-
func (srv Server) Close() {
s5l.Printf("server: shutting down\n")
srv.quit <- true
@@ -183,9 +120,6 @@ func (srv Server) Close() {
close(srv.done)
close(srv.appendChan)
close(srv.appendManyChan)
- close(srv.getUpdatesAfterChan)
- close(srv.getHubUuidChan)
- close(srv.getLastUpdateIdChan)
srv.store.Close()
s5l.Printf("server: finished\n")
}
@@ -210,9 +144,6 @@ func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile string) (ser
server.done = make(chan bool)
server.appendChan = make(chan appendToken, 32)
server.appendManyChan = make(chan appendManyToken, 32)
- server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32)
- server.getHubUuidChan = make(chan getHubUuidToken, 32)
- server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32)
go server.appendActor()
s5l.Printf("server: started\n")
return
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 2d70e32..8ee3999 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -111,7 +111,7 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er
func (srv Server) forwardRun(baseUrl string, client *http.Client) {
url := baseUrl + "/updates/_bulk"
- hubUuid := srv.GetHubUuid()
+ hubUuid := srv.store.GetHubUuid()
tryResync:
for {
lastId, err := fwdGetLastUpdateId(baseUrl, client, hubUuid)
@@ -124,7 +124,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.GetUpdatesAfter(lastId, 5000)
+ updates, err := srv.store.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd: failed reading updates: %v", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index a7931f2..2d3cf15 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -100,7 +100,7 @@ func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (
func (srv Server) forwardEsRun(baseUrl string, client *http.Client) {
url := baseUrl + "/_bulk"
- hubUuid := srv.GetHubUuid()
+ hubUuid := srv.store.GetHubUuid()
tryResync:
for {
lastId, err := fwdEsGetLastUpdateId(baseUrl, client, hubUuid)
@@ -113,7 +113,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.GetUpdatesAfter(lastId, 5000)
+ updates, err := srv.store.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd-es: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index 1b8e818..c972cbc 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -49,7 +49,7 @@ tryResync:
continue tryResync
}
- lastId, err := srv.GetLastUpdateId()
+ lastId, err := srv.store.GetLastUpdateId()
if err != nil {
s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
client.Disconnect()
@@ -60,7 +60,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.GetUpdatesAfter(lastId, 5000)
+ updates, err := srv.store.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd-graphite: failed reading updates: %v", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index 017e2ea..9fa1dcc 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -56,11 +56,11 @@ func fwdPiwikGetLastUpdateId(piwikURL, siteURL string, siteID uint, token string
}
func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
- // hubUuid := srv.GetHubUuid()
+ // hubUuid := srv.store.GetHubUuid()
tryResync:
for {
// lastId, err := srv.forwardPiwikGetLastUpdateId(piwikURL, siteURL, siteID, token, client, hubUuid)
- lastId, err := srv.GetLastUpdateId()
+ lastId, err := srv.store.GetLastUpdateId()
if err != nil {
s5l.Printf("fwd-piwik: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -70,7 +70,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.GetUpdatesAfter(lastId, 5000)
+ updates, err := srv.store.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd-piwik: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index b9752ba..21f4083 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -60,7 +60,7 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) {
}
resp := WebHealthzResponse{}
- resp.HubUuid = srv.GetHubUuid()
+ resp.HubUuid = srv.store.GetHubUuid()
resp.Status = "OK" // TODO: do a more sophisticated check
sendWebResponse(w, http.StatusOK, resp)
}
@@ -306,7 +306,7 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) {
var err error
resp := WebLastUpdateIdResponse{}
- resp.HubUuid = srv.GetHubUuid()
+ resp.HubUuid = srv.store.GetHubUuid()
if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return