diff options
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 17 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 137 |
2 files changed, 94 insertions, 60 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index efe4718..6d3af91 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -80,7 +80,11 @@ func (self Server) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) { } value, err := self.store.GetUpdate(int(id)) if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + if err == ErrNotFound { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound) + } else { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + } return } jsonString, err := json.Marshal(value) @@ -129,6 +133,16 @@ func (self Server) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) { // TODO send response channel, wait for OK } +func (self Server) getLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) { + const resourceName = "lastupdateid" + value, err := self.store.GetLastUpdateId() + if err != nil { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + return + } + fmt.Fprintf(w, "%d", value) +} + func (self Server) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "lastupdateid" id := c.URLParams["id"] @@ -155,6 +169,7 @@ func (self Server) ServeWeb(vizAppLocation string) { goji.Get("/updates", self.getUpdateList) goji.Get("/updates/:id", self.getUpdate) goji.Post("/updates", self.postUpdate) + goji.Get("/lastupdate", self.getLastUpdateId) goji.Get("/lastupdate/:id", self.getLastUpdateIdForUuid) goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation)))) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 4845660..64cb879 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -58,11 +58,11 @@ func initDb(boltPath string) (boltDb *bolt.DB, hubId string, err error) { return } -func updateFromDataUpdateFull(value DataUpdateFull) (dataUpdateDb, []ClientData, sourceDb) { +func updateFromDataUpdateFull(value DataUpdateFull) (sourceDb, dataUpdateDb, []ClientData) { + src := NewSourceDb(value) du := NewDataUpdateDb(value) cd := value.Data.Clients - src := NewSourceDb(value) - return du, cd, src + return src, du, cd } func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) { @@ -94,8 +94,18 @@ func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) return srcId, err } -func (s Store) insertDataUpdateEntry(tx *bolt.Tx, srcId int, du *dataUpdateDb) (duId int, err error) { - // TODO: add me +func (s Store) insertDataUpdate(tx *bolt.Tx, du dataUpdateDb) (duId int, err error) { + b := tx.Bucket([]byte(dataUpdatesBn)) + b.FillPercent = 1.0 // we only do appends + + next, _ := b.NextSequence() + duId = int(next) + + var jsonData []byte + if jsonData, err = json.Marshal(du); err != nil { + return + } + err = b.Put(itob(duId), jsonData) return } @@ -122,7 +132,7 @@ func (s Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) return uaId, err } -func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error { +func (s Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error { if len(cd) == 0 { return nil } @@ -146,18 +156,17 @@ func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientD return b.Put(itob(duId), jsonData) } -func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) { - var srcId int - if srcId, err = s.insertNewSource(tx, src); err != nil { +func (s Store) appendItem(tx *bolt.Tx, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) { + if du.SourceId, err = s.insertNewSource(tx, src); err != nil { return } var duId int - if duId, err = s.insertDataUpdateEntry(tx, srcId, &du); err != nil { + if duId, err = s.insertDataUpdate(tx, du); err != nil { return } - if err = s.insertDataUpdateClientEntries(tx, duId, cd); err != nil { + if err = s.insertClientData(tx, duId, cd); err != nil { return } @@ -167,8 +176,8 @@ func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sou func (s Store) AppendMany(updates []DataUpdateFull) (err error) { return s.db.Update(func(tx *bolt.Tx) error { for _, update := range updates { - du, cd, src := updateFromDataUpdateFull(update) - if err := s.appendItem(tx, du, cd, src); err != nil { + src, du, cd := updateFromDataUpdateFull(update) + if err := s.appendItem(tx, src, du, cd); err != nil { return err } } @@ -197,8 +206,7 @@ func (s Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { func (s Store) GetSources() (res []SourceId, err error) { res = []SourceId{} err = s.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(sourcesRevBn)) - c := b.Cursor() + c := tx.Bucket([]byte(sourcesRevBn)).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { var s sourceDb if err := json.Unmarshal(v, &s); err != nil { @@ -225,11 +233,6 @@ func (s Store) GetSource(id int) (res SourceId, err error) { return } -func (s Store) GetUpdate(id int) (res dataUpdateDb, err error) { - // TODO: implement me - return -} - func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) { bc := tx.Bucket([]byte(clientDataBn)) bu := tx.Bucket([]byte(userAgentsRevBn)) @@ -253,17 +256,17 @@ func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err return } -func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, dat dataUpdateDb) (res DataUpdateFull, err error) { +func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { var clients []ClientData if clients, err = s.getClientsByUpdateId(tx, duId); err != nil { return } var src sourceDb - if src, err = s.getSource(tx, dat.SourceId); err != nil { + if src, err = s.getSource(tx, du.SourceId); err != nil { return } - res.CopyFromDataUpdateDb(dat, duId, s.hubId) + res.CopyFromDataUpdateDb(du, duId, s.hubId) res.Hostname = src.Hostname res.StreamId.ContentId = src.StreamId.ContentId res.StreamId.Format = src.StreamId.Format @@ -274,59 +277,75 @@ func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, dat dataUpdateD } func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) { - // err = s.db.View(func(tx *bolt.Tx) error { - // // TODO: iterate over ids - // duId := 1 - - // for i := range dat { - // sd, err := s.CreateDataUpdateFullFromDb(tx, duId, du) - // if err != nil { - // return err - // } - // res = append(res, sd) - // } - // return nil - // }) + if id < 0 { + id = 0 + } + err = s.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte(dataUpdatesBn)).Cursor() + if k, _ := c.Seek(itob(id)); k == nil { + return nil + } + for k, v := c.Next(); k != nil; k, v = c.Next() { + var d dataUpdateDb + if err := json.Unmarshal(v, &d); err != nil { + return err + } + + var duf DataUpdateFull + duf, err := s.CreateDataUpdateFullFromDb(tx, btoi(k), d) + if err != nil { + return err + } + res = append(res, duf) + } + return nil + }) return } func (s Store) GetUpdates() (res []DataUpdateFull, err error) { + // TODO: implement from:to with limit return s.GetUpdatesAfter(-1) } +func (s Store) GetUpdate(id int) (res DataUpdateFull, err error) { + err = s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(dataUpdatesBn)) + + jsonData := b.Get(itob(id)) + if jsonData == nil { + return ErrNotFound + } + + var d dataUpdateDb + if err := json.Unmarshal(jsonData, &d); err != nil { + return err + } + + var err error + if res, err = s.CreateDataUpdateFullFromDb(tx, id, d); err != nil { + return err + } + return nil + }) + return +} + type lastUpdateQueryResult struct { MaxDataUpdateId *int } func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) { // TODO: implement me! - updateId = -1 - - // result := lastUpdateQueryResult{} - // err = s.db.SelectOne( - // &result, - // "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?", - // uuid) - // if err == nil { - // updateId = result.MaxDataUpdateId - // } else { - // s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err) - // } - // return + updateId = 0 return } func (s Store) GetLastUpdateId() (updateId int, err error) { - // TODO: implement me! - updateId = -1 - - // result := lastUpdateQueryResult{} - // err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn) - // if err == nil { - // updateId = result.MaxDataUpdateId - // } else { - // s5l.Printf("db: failed to find max DataUpdateId: %v", err) - // } + err = s.db.View(func(tx *bolt.Tx) error { + updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence()) + return nil + }) return } |