summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-27 04:04:10 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-27 04:04:10 +0200
commit9e51b380909dea3149cb0d3fdc57090d5153e04c (patch)
tree979455be66410882c731c1fddb0196ac5617b32e /src
parentno more stats in names (diff)
storing data updates into bolt works now
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go17
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go137
2 files changed, 94 insertions, 60 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index efe4718..6d3af91 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -80,7 +80,11 @@ func (self Server) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
}
value, err := self.store.GetUpdate(int(id))
if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ if err == ErrNotFound {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound)
+ } else {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ }
return
}
jsonString, err := json.Marshal(value)
@@ -129,6 +133,16 @@ func (self Server) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
// TODO send response channel, wait for OK
}
+func (self Server) getLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
+ const resourceName = "lastupdateid"
+ value, err := self.store.GetLastUpdateId()
+ if err != nil {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ return
+ }
+ fmt.Fprintf(w, "%d", value)
+}
+
func (self Server) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "lastupdateid"
id := c.URLParams["id"]
@@ -155,6 +169,7 @@ func (self Server) ServeWeb(vizAppLocation string) {
goji.Get("/updates", self.getUpdateList)
goji.Get("/updates/:id", self.getUpdate)
goji.Post("/updates", self.postUpdate)
+ goji.Get("/lastupdate", self.getLastUpdateId)
goji.Get("/lastupdate/:id", self.getLastUpdateIdForUuid)
goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation))))
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 4845660..64cb879 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -58,11 +58,11 @@ func initDb(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
return
}
-func updateFromDataUpdateFull(value DataUpdateFull) (dataUpdateDb, []ClientData, sourceDb) {
+func updateFromDataUpdateFull(value DataUpdateFull) (sourceDb, dataUpdateDb, []ClientData) {
+ src := NewSourceDb(value)
du := NewDataUpdateDb(value)
cd := value.Data.Clients
- src := NewSourceDb(value)
- return du, cd, src
+ return src, du, cd
}
func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
@@ -94,8 +94,18 @@ func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error)
return srcId, err
}
-func (s Store) insertDataUpdateEntry(tx *bolt.Tx, srcId int, du *dataUpdateDb) (duId int, err error) {
- // TODO: add me
+func (s Store) insertDataUpdate(tx *bolt.Tx, du dataUpdateDb) (duId int, err error) {
+ b := tx.Bucket([]byte(dataUpdatesBn))
+ b.FillPercent = 1.0 // we only do appends
+
+ next, _ := b.NextSequence()
+ duId = int(next)
+
+ var jsonData []byte
+ if jsonData, err = json.Marshal(du); err != nil {
+ return
+ }
+ err = b.Put(itob(duId), jsonData)
return
}
@@ -122,7 +132,7 @@ func (s Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error)
return uaId, err
}
-func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error {
+func (s Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error {
if len(cd) == 0 {
return nil
}
@@ -146,18 +156,17 @@ func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientD
return b.Put(itob(duId), jsonData)
}
-func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) {
- var srcId int
- if srcId, err = s.insertNewSource(tx, src); err != nil {
+func (s Store) appendItem(tx *bolt.Tx, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) {
+ if du.SourceId, err = s.insertNewSource(tx, src); err != nil {
return
}
var duId int
- if duId, err = s.insertDataUpdateEntry(tx, srcId, &du); err != nil {
+ if duId, err = s.insertDataUpdate(tx, du); err != nil {
return
}
- if err = s.insertDataUpdateClientEntries(tx, duId, cd); err != nil {
+ if err = s.insertClientData(tx, duId, cd); err != nil {
return
}
@@ -167,8 +176,8 @@ func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sou
func (s Store) AppendMany(updates []DataUpdateFull) (err error) {
return s.db.Update(func(tx *bolt.Tx) error {
for _, update := range updates {
- du, cd, src := updateFromDataUpdateFull(update)
- if err := s.appendItem(tx, du, cd, src); err != nil {
+ src, du, cd := updateFromDataUpdateFull(update)
+ if err := s.appendItem(tx, src, du, cd); err != nil {
return err
}
}
@@ -197,8 +206,7 @@ func (s Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
func (s Store) GetSources() (res []SourceId, err error) {
res = []SourceId{}
err = s.db.View(func(tx *bolt.Tx) error {
- b := tx.Bucket([]byte(sourcesRevBn))
- c := b.Cursor()
+ c := tx.Bucket([]byte(sourcesRevBn)).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var s sourceDb
if err := json.Unmarshal(v, &s); err != nil {
@@ -225,11 +233,6 @@ func (s Store) GetSource(id int) (res SourceId, err error) {
return
}
-func (s Store) GetUpdate(id int) (res dataUpdateDb, err error) {
- // TODO: implement me
- return
-}
-
func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) {
bc := tx.Bucket([]byte(clientDataBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
@@ -253,17 +256,17 @@ func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err
return
}
-func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, dat dataUpdateDb) (res DataUpdateFull, err error) {
+func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
var clients []ClientData
if clients, err = s.getClientsByUpdateId(tx, duId); err != nil {
return
}
var src sourceDb
- if src, err = s.getSource(tx, dat.SourceId); err != nil {
+ if src, err = s.getSource(tx, du.SourceId); err != nil {
return
}
- res.CopyFromDataUpdateDb(dat, duId, s.hubId)
+ res.CopyFromDataUpdateDb(du, duId, s.hubId)
res.Hostname = src.Hostname
res.StreamId.ContentId = src.StreamId.ContentId
res.StreamId.Format = src.StreamId.Format
@@ -274,59 +277,75 @@ func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, dat dataUpdateD
}
func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) {
- // err = s.db.View(func(tx *bolt.Tx) error {
- // // TODO: iterate over ids
- // duId := 1
-
- // for i := range dat {
- // sd, err := s.CreateDataUpdateFullFromDb(tx, duId, du)
- // if err != nil {
- // return err
- // }
- // res = append(res, sd)
- // }
- // return nil
- // })
+ if id < 0 {
+ id = 0
+ }
+ err = s.db.View(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte(dataUpdatesBn)).Cursor()
+ if k, _ := c.Seek(itob(id)); k == nil {
+ return nil
+ }
+ for k, v := c.Next(); k != nil; k, v = c.Next() {
+ var d dataUpdateDb
+ if err := json.Unmarshal(v, &d); err != nil {
+ return err
+ }
+
+ var duf DataUpdateFull
+ duf, err := s.CreateDataUpdateFullFromDb(tx, btoi(k), d)
+ if err != nil {
+ return err
+ }
+ res = append(res, duf)
+ }
+ return nil
+ })
return
}
func (s Store) GetUpdates() (res []DataUpdateFull, err error) {
+ // TODO: implement from:to with limit
return s.GetUpdatesAfter(-1)
}
+func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) {
+ err = s.db.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(dataUpdatesBn))
+
+ jsonData := b.Get(itob(id))
+ if jsonData == nil {
+ return ErrNotFound
+ }
+
+ var d dataUpdateDb
+ if err := json.Unmarshal(jsonData, &d); err != nil {
+ return err
+ }
+
+ var err error
+ if res, err = s.CreateDataUpdateFullFromDb(tx, id, d); err != nil {
+ return err
+ }
+ return nil
+ })
+ return
+}
+
type lastUpdateQueryResult struct {
MaxDataUpdateId *int
}
func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
// TODO: implement me!
- updateId = -1
-
- // result := lastUpdateQueryResult{}
- // err = s.db.SelectOne(
- // &result,
- // "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
- // uuid)
- // if err == nil {
- // updateId = result.MaxDataUpdateId
- // } else {
- // s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
- // }
- // return
+ updateId = 0
return
}
func (s Store) GetLastUpdateId() (updateId int, err error) {
- // TODO: implement me!
- updateId = -1
-
- // result := lastUpdateQueryResult{}
- // err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn)
- // if err == nil {
- // updateId = result.MaxDataUpdateId
- // } else {
- // s5l.Printf("db: failed to find max DataUpdateId: %v", err)
- // }
+ err = s.db.View(func(tx *bolt.Tx) error {
+ updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence())
+ return nil
+ })
return
}