diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-04-27 05:38:15 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-04-27 05:38:15 +0200 |
commit | 7e2fc6b27d3cc278e20e19e87cb1869b9edc5c77 (patch) | |
tree | 6ba0a6bf5c7c2b0943f1950d06b8968db6f095fb | |
parent | no more self (diff) |
get updates limit
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5log.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 27 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 9 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 33 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 4 |
9 files changed, 35 insertions, 50 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go index 9b80f6f..429a537 100644 --- a/src/hub/src/spreadspace.org/sfive/s5log.go +++ b/src/hub/src/spreadspace.org/sfive/s5log.go @@ -9,6 +9,6 @@ import ( var ( s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags) // use ioutil.Discard to switch that thing off - // s5tl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags) - s5tl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags) + // s5dl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags) + s5dl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags) ) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index ac3e086..26eb0ac 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -16,10 +16,7 @@ type getUpdatesResult struct { type getUpdatesAfterToken struct { id int - response chan getUpdatesResult -} - -type getUpdatesToken struct { + limit int response chan getUpdatesResult } @@ -45,9 +42,8 @@ type Server struct { quit chan bool done chan bool appendData chan DataUpdateFull - appendManyData chan appendManyToken // chan []DataUpdateFull + appendManyData chan appendManyToken getUpdatesAfterChan chan getUpdatesAfterToken - getUpdatesChan chan getUpdatesToken getHubIdChan chan getHubIdToken getLastUpdateIdChan chan getLastUpdateIdToken } @@ -80,10 +76,7 @@ func (srv Server) appendActor() { token.response <- true } case token := <-srv.getUpdatesAfterChan: - values, err := srv.store.GetUpdatesAfter(token.id) - token.response <- getUpdatesResult{values, err} - case token := <-srv.getUpdatesChan: - values, err := srv.store.GetUpdates() + values, err := srv.store.GetUpdatesAfter(token.id, token.limit) token.response <- getUpdatesResult{values, err} case token := <-srv.getHubIdChan: token.response <- getHubIdResult{srv.store.GetStoreId()} @@ -94,22 +87,14 @@ func (srv Server) appendActor() { } } -func (srv Server) getUpdatesAfterInvoke(id int) ([]DataUpdateFull, error) { - token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)} +func (srv Server) getUpdatesAfterInvoke(id, limit int) ([]DataUpdateFull, error) { + token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)} defer close(token.response) srv.getUpdatesAfterChan <- token res := <-token.response return res.values, res.err } -func (srv Server) getUpdatesInvoke() ([]DataUpdateFull, error) { - token := getUpdatesToken{response: make(chan getUpdatesResult, 1)} - defer close(token.response) - srv.getUpdatesChan <- token - res := <-token.response - return res.values, res.err -} - func (srv Server) getHubIdInvoke() string { token := getHubIdToken{response: make(chan getHubIdResult, 1)} defer close(token.response) @@ -134,7 +119,6 @@ func (srv Server) Close() { close(srv.appendData) close(srv.appendManyData) close(srv.getUpdatesAfterChan) - close(srv.getUpdatesChan) close(srv.getHubIdChan) close(srv.getLastUpdateIdChan) srv.store.Close() @@ -153,7 +137,6 @@ func NewServer(dbPath string) (server *Server, err error) { server.appendData = make(chan DataUpdateFull, 5) server.appendManyData = make(chan appendManyToken, 5) server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) - server.getUpdatesChan = make(chan getUpdatesToken, 3) server.getHubIdChan = make(chan getHubIdToken, 1) server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1) go server.appendActor() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 6c9823f..d5fee0c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -74,7 +74,7 @@ tryResync: nextBatch: for { - updates, err := srv.getUpdatesAfterInvoke(lastId) + updates, err := srv.getUpdatesAfterInvoke(lastId, 5000) if err != nil { s5l.Printf("fwd: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 622b307..56af7ca 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -21,7 +21,7 @@ func (srv Server) getLastUpdateEs(baseurl string, client *http.Client) (latestId storeId = srv.getHubIdInvoke() queryJson := fmt.Sprintf(lastUpdateJson, storeId) - s5tl.Printf("fwd-es: query: %s", queryJson) + s5dl.Printf("fwd-es: query: %s", queryJson) var resp *http.Response resp, err = client.Post(url, "application/json", strings.NewReader(queryJson)) @@ -43,7 +43,7 @@ func (srv Server) getLastUpdateEs(baseurl string, client *http.Client) (latestId return } - s5tl.Printf("fwd-es: lastupdate response: %s\n", body) + s5dl.Printf("fwd-es: lastupdate response: %s\n", body) if len(body) == 0 { latestId = -1 @@ -82,7 +82,7 @@ tryResync: nextBatch: for { - updates, err := srv.getUpdatesAfterInvoke(lastId) + updates, err := srv.getUpdatesAfterInvoke(lastId, 5000) if err != nil { s5l.Printf("fwd-es: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -109,8 +109,6 @@ tryResync: postData.WriteRune('\n') } - //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes())) - s5l.Printf("fwd-es: marshal OK") resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) @@ -131,7 +129,6 @@ tryResync: s5l.Printf("fwd-es: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-es: new lastid: %d", lastId) - //time.Sleep(1 * time.Second) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index 8ab0ac6..d865dac 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -38,7 +38,7 @@ tryResync: nextBatch: for { - updates, err := srv.getUpdatesAfterInvoke(lastId) + updates, err := srv.getUpdatesAfterInvoke(lastId, 5000) if err != nil { s5l.Printf("fwd-graphite: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index a7566a8..6cae87e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -42,7 +42,7 @@ tryResync: nextBatch: for { - updates, err := srv.getUpdatesAfterInvoke(lastId) + updates, err := srv.getUpdatesAfterInvoke(lastId, 5000) if err != nil { s5l.Printf("fwd-piwik: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 6e2cf00..12e6ccb 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -58,7 +58,7 @@ func (srv Server) getSource(c web.C, w http.ResponseWriter, r *http.Request) { func (srv Server) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "updates" - values, err := srv.getUpdatesInvoke() + values, err := srv.getUpdatesAfterInvoke(-1, 3) // TODO: get start and limit from param if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 64cb879..123fcd7 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -8,6 +8,10 @@ import ( "github.com/pborman/uuid" ) +const ( + StoreGetUpdatesLimit = 42000 +) + type Store struct { hubId string db *bolt.DB @@ -256,7 +260,7 @@ func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err return } -func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { +func (s Store) createDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { var clients []ClientData if clients, err = s.getClientsByUpdateId(tx, duId); err != nil { return @@ -276,10 +280,15 @@ func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb return } -func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) { - if id < 0 { +func (s Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) { + res = []DataUpdateFull{} + if id < 0 { // TODO: interpret negative values as last x values id = 0 } + if limit < 0 || limit > StoreGetUpdatesLimit { + s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit) + limit = StoreGetUpdatesLimit + } err = s.db.View(func(tx *bolt.Tx) error { c := tx.Bucket([]byte(dataUpdatesBn)).Cursor() if k, _ := c.Seek(itob(id)); k == nil { @@ -292,22 +301,20 @@ func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) { } var duf DataUpdateFull - duf, err := s.CreateDataUpdateFullFromDb(tx, btoi(k), d) + duf, err := s.createDataUpdateFullFromDb(tx, btoi(k), d) if err != nil { return err } res = append(res, duf) + if len(res) >= limit { + return nil + } } return nil }) return } -func (s Store) GetUpdates() (res []DataUpdateFull, err error) { - // TODO: implement from:to with limit - return s.GetUpdatesAfter(-1) -} - func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) { err = s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataUpdatesBn)) @@ -323,7 +330,7 @@ func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) { } var err error - if res, err = s.CreateDataUpdateFullFromDb(tx, id, d); err != nil { + if res, err = s.createDataUpdateFullFromDb(tx, id, d); err != nil { return err } return nil @@ -331,10 +338,6 @@ func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) { return } -type lastUpdateQueryResult struct { - MaxDataUpdateId *int -} - func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) { // TODO: implement me! updateId = 0 @@ -358,9 +361,11 @@ func NewStore(dbPath string) (Store, error) { if err != nil { return Store{}, err } + s5l.Printf("store: initialized (UUID: %s)", hubid) return Store{hubid, db}, nil } func (s Store) Close() { + s5l.Printf("store: closing") s.db.Close() } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 7b45e59..e5f0a12 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -64,7 +64,7 @@ func TestGetUpdatesAfter(t *testing.T) { return } - res, err := store.GetUpdatesAfter(2) + res, err := store.GetUpdatesAfter(2, -1) t.Logf("got updates (err %v):\n%#v", err, res) } @@ -151,7 +151,7 @@ func BenchmarkGetUpdatesAfter(b *testing.B) { latestId := -1 for { - updates, err := store.GetUpdatesAfter(latestId) + updates, err := store.GetUpdatesAfter(latestId, -1) if err != nil { b.Errorf("Failed to retrieve: %v", err) } |