summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-27 05:38:15 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-27 05:38:15 +0200
commit7e2fc6b27d3cc278e20e19e87cb1869b9edc5c77 (patch)
tree6ba0a6bf5c7c2b0943f1950d06b8968db6f095fb /src/hub
parentno more self (diff)
get updates limit
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5log.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go27
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go9
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go33
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go4
9 files changed, 35 insertions, 50 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go
index 9b80f6f..429a537 100644
--- a/src/hub/src/spreadspace.org/sfive/s5log.go
+++ b/src/hub/src/spreadspace.org/sfive/s5log.go
@@ -9,6 +9,6 @@ import (
var (
s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags)
// use ioutil.Discard to switch that thing off
- // s5tl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags)
- s5tl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags)
+ // s5dl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags)
+ s5dl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags)
)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index ac3e086..26eb0ac 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -16,10 +16,7 @@ type getUpdatesResult struct {
type getUpdatesAfterToken struct {
id int
- response chan getUpdatesResult
-}
-
-type getUpdatesToken struct {
+ limit int
response chan getUpdatesResult
}
@@ -45,9 +42,8 @@ type Server struct {
quit chan bool
done chan bool
appendData chan DataUpdateFull
- appendManyData chan appendManyToken // chan []DataUpdateFull
+ appendManyData chan appendManyToken
getUpdatesAfterChan chan getUpdatesAfterToken
- getUpdatesChan chan getUpdatesToken
getHubIdChan chan getHubIdToken
getLastUpdateIdChan chan getLastUpdateIdToken
}
@@ -80,10 +76,7 @@ func (srv Server) appendActor() {
token.response <- true
}
case token := <-srv.getUpdatesAfterChan:
- values, err := srv.store.GetUpdatesAfter(token.id)
- token.response <- getUpdatesResult{values, err}
- case token := <-srv.getUpdatesChan:
- values, err := srv.store.GetUpdates()
+ values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
token.response <- getUpdatesResult{values, err}
case token := <-srv.getHubIdChan:
token.response <- getHubIdResult{srv.store.GetStoreId()}
@@ -94,22 +87,14 @@ func (srv Server) appendActor() {
}
}
-func (srv Server) getUpdatesAfterInvoke(id int) ([]DataUpdateFull, error) {
- token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)}
+func (srv Server) getUpdatesAfterInvoke(id, limit int) ([]DataUpdateFull, error) {
+ token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)}
defer close(token.response)
srv.getUpdatesAfterChan <- token
res := <-token.response
return res.values, res.err
}
-func (srv Server) getUpdatesInvoke() ([]DataUpdateFull, error) {
- token := getUpdatesToken{response: make(chan getUpdatesResult, 1)}
- defer close(token.response)
- srv.getUpdatesChan <- token
- res := <-token.response
- return res.values, res.err
-}
-
func (srv Server) getHubIdInvoke() string {
token := getHubIdToken{response: make(chan getHubIdResult, 1)}
defer close(token.response)
@@ -134,7 +119,6 @@ func (srv Server) Close() {
close(srv.appendData)
close(srv.appendManyData)
close(srv.getUpdatesAfterChan)
- close(srv.getUpdatesChan)
close(srv.getHubIdChan)
close(srv.getLastUpdateIdChan)
srv.store.Close()
@@ -153,7 +137,6 @@ func NewServer(dbPath string) (server *Server, err error) {
server.appendData = make(chan DataUpdateFull, 5)
server.appendManyData = make(chan appendManyToken, 5)
server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1)
- server.getUpdatesChan = make(chan getUpdatesToken, 3)
server.getHubIdChan = make(chan getHubIdToken, 1)
server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1)
go server.appendActor()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 6c9823f..d5fee0c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -74,7 +74,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.getUpdatesAfterInvoke(lastId, 5000)
if err != nil {
s5l.Printf("fwd: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 622b307..56af7ca 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -21,7 +21,7 @@ func (srv Server) getLastUpdateEs(baseurl string, client *http.Client) (latestId
storeId = srv.getHubIdInvoke()
queryJson := fmt.Sprintf(lastUpdateJson, storeId)
- s5tl.Printf("fwd-es: query: %s", queryJson)
+ s5dl.Printf("fwd-es: query: %s", queryJson)
var resp *http.Response
resp, err = client.Post(url, "application/json", strings.NewReader(queryJson))
@@ -43,7 +43,7 @@ func (srv Server) getLastUpdateEs(baseurl string, client *http.Client) (latestId
return
}
- s5tl.Printf("fwd-es: lastupdate response: %s\n", body)
+ s5dl.Printf("fwd-es: lastupdate response: %s\n", body)
if len(body) == 0 {
latestId = -1
@@ -82,7 +82,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.getUpdatesAfterInvoke(lastId, 5000)
if err != nil {
s5l.Printf("fwd-es: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
@@ -109,8 +109,6 @@ tryResync:
postData.WriteRune('\n')
}
- //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes()))
-
s5l.Printf("fwd-es: marshal OK")
resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes()))
@@ -131,7 +129,6 @@ tryResync:
s5l.Printf("fwd-es: all posts OK")
lastId = findMaxId(updates)
s5l.Printf("fwd-es: new lastid: %d", lastId)
- //time.Sleep(1 * time.Second)
}
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index 8ab0ac6..d865dac 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -38,7 +38,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.getUpdatesAfterInvoke(lastId, 5000)
if err != nil {
s5l.Printf("fwd-graphite: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index a7566a8..6cae87e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -42,7 +42,7 @@ tryResync:
nextBatch:
for {
- updates, err := srv.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.getUpdatesAfterInvoke(lastId, 5000)
if err != nil {
s5l.Printf("fwd-piwik: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 6e2cf00..12e6ccb 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -58,7 +58,7 @@ func (srv Server) getSource(c web.C, w http.ResponseWriter, r *http.Request) {
func (srv Server) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "updates"
- values, err := srv.getUpdatesInvoke()
+ values, err := srv.getUpdatesAfterInvoke(-1, 3) // TODO: get start and limit from param
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 64cb879..123fcd7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -8,6 +8,10 @@ import (
"github.com/pborman/uuid"
)
+const (
+ StoreGetUpdatesLimit = 42000
+)
+
type Store struct {
hubId string
db *bolt.DB
@@ -256,7 +260,7 @@ func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err
return
}
-func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
+func (s Store) createDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
var clients []ClientData
if clients, err = s.getClientsByUpdateId(tx, duId); err != nil {
return
@@ -276,10 +280,15 @@ func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb
return
}
-func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) {
- if id < 0 {
+func (s Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) {
+ res = []DataUpdateFull{}
+ if id < 0 { // TODO: interpret negative values as last x values
id = 0
}
+ if limit < 0 || limit > StoreGetUpdatesLimit {
+ s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit)
+ limit = StoreGetUpdatesLimit
+ }
err = s.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte(dataUpdatesBn)).Cursor()
if k, _ := c.Seek(itob(id)); k == nil {
@@ -292,22 +301,20 @@ func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) {
}
var duf DataUpdateFull
- duf, err := s.CreateDataUpdateFullFromDb(tx, btoi(k), d)
+ duf, err := s.createDataUpdateFullFromDb(tx, btoi(k), d)
if err != nil {
return err
}
res = append(res, duf)
+ if len(res) >= limit {
+ return nil
+ }
}
return nil
})
return
}
-func (s Store) GetUpdates() (res []DataUpdateFull, err error) {
- // TODO: implement from:to with limit
- return s.GetUpdatesAfter(-1)
-}
-
func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) {
err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(dataUpdatesBn))
@@ -323,7 +330,7 @@ func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) {
}
var err error
- if res, err = s.CreateDataUpdateFullFromDb(tx, id, d); err != nil {
+ if res, err = s.createDataUpdateFullFromDb(tx, id, d); err != nil {
return err
}
return nil
@@ -331,10 +338,6 @@ func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) {
return
}
-type lastUpdateQueryResult struct {
- MaxDataUpdateId *int
-}
-
func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
// TODO: implement me!
updateId = 0
@@ -358,9 +361,11 @@ func NewStore(dbPath string) (Store, error) {
if err != nil {
return Store{}, err
}
+ s5l.Printf("store: initialized (UUID: %s)", hubid)
return Store{hubid, db}, nil
}
func (s Store) Close() {
+ s5l.Printf("store: closing")
s.db.Close()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 7b45e59..e5f0a12 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -64,7 +64,7 @@ func TestGetUpdatesAfter(t *testing.T) {
return
}
- res, err := store.GetUpdatesAfter(2)
+ res, err := store.GetUpdatesAfter(2, -1)
t.Logf("got updates (err %v):\n%#v", err, res)
}
@@ -151,7 +151,7 @@ func BenchmarkGetUpdatesAfter(b *testing.B) {
latestId := -1
for {
- updates, err := store.GetUpdatesAfter(latestId)
+ updates, err := store.GetUpdatesAfter(latestId, -1)
if err != nil {
b.Errorf("Failed to retrieve: %v", err)
}