summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-30 02:07:59 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-30 02:07:59 +0200
commita746d358967a115543862bb296715a313743c957 (patch)
tree4f82d256d4cd5ade50b73d6e49f3be992a72f4f0 /src/hub
parentintroduced read-only mode (diff)
reorganized some functions
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go195
1 files changed, 110 insertions, 85 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index d2a0734..568b448 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -59,7 +59,11 @@ type Store struct {
db *bolt.DB
}
-func checkDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid string, err error) {
+//
+// Initialization and Destruction
+//
+
+func openDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid string, err error) {
if _, err = os.Stat(dbPath); err != nil {
if os.IsNotExist(err) {
err = nil
@@ -105,7 +109,7 @@ func checkDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid st
return
}
-func initDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err error) {
+func createDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err error) {
db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond})
if err != nil {
return
@@ -136,6 +140,40 @@ func initDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err error)
return
}
+func NewStore(dbPath string, readOnly bool) (Store, error) {
+ db, version, hubid, err := openDb(dbPath, readOnly)
+ if err != nil {
+ return Store{}, err
+ }
+
+ if db != nil {
+ s5l.Printf("store: opened (UUID: %s)", hubid)
+ return Store{version, hubid, db}, nil
+ }
+
+ if readOnly {
+ return Store{}, errors.New("store: failed to open, requested read-only mode but file does not exist.")
+ }
+
+ db, version, hubid, err = createDb(dbPath)
+ if err != nil {
+ return Store{}, err
+ }
+ s5l.Printf("store: initialized (UUID: %s)", hubid)
+ return Store{version, hubid, db}, nil
+}
+
+func (st Store) Close() {
+ s5l.Printf("store: closing")
+ st.db.Close()
+}
+
+//
+// Append data
+//
+
+// append key-value pairs to buckets
+
func (st Store) insertNewHub(tx *bolt.Tx, hubUuid string) (hubId int, err error) {
if hubUuid == "" {
return
@@ -265,20 +303,22 @@ func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error {
return b.Put([]byte(uuid), itob(duId))
}
-func (st Store) appendItem(tx *bolt.Tx, hubUuid string, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) {
+// Split up the multidimensional dataupdate and append all the key-value pairs
+
+func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) {
+ du := NewDataUpdateDb(update)
+
+ hubUuid := update.SourceHubUuid
if du.SourceHubId, err = st.insertNewHub(tx, hubUuid); err != nil {
return
}
- if du.SourceId, err = st.insertNewSource(tx, src); err != nil {
+ if du.SourceId, err = st.insertNewSource(tx, NewSourceDb(update)); err != nil {
return
}
-
- var duId int
if duId, err = st.insertDataUpdate(tx, du); err != nil {
return
}
-
- if err = st.insertClientData(tx, duId, cd); err != nil {
+ if err = st.insertClientData(tx, duId, update.Data.Clients); err != nil {
return
}
@@ -288,14 +328,12 @@ func (st Store) appendItem(tx *bolt.Tx, hubUuid string, src sourceDb, du dataUpd
return
}
+// Public Append Interface
+
func (st Store) AppendMany(updates []DataUpdateFull) (err error) {
return st.db.Update(func(tx *bolt.Tx) error {
for _, update := range updates {
- hubUuid := update.SourceHubUuid
- src := NewSourceDb(update)
- du := NewDataUpdateDb(update)
- cd := update.Data.Clients
- if err := st.appendItem(tx, hubUuid, src, du, cd); err != nil {
+ if _, err := st.appendItem(tx, update); err != nil {
return err
}
}
@@ -307,37 +345,11 @@ func (st Store) Append(update DataUpdateFull) (err error) {
return st.AppendMany([]DataUpdateFull{update})
}
-func (st Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
- b := tx.Bucket([]byte(sourcesRevBn))
-
- jsonData := b.Get(itob(id))
- if jsonData == nil {
- err = ErrNotFound
- return
- }
- if err = json.Unmarshal(jsonData, &res); err != nil {
- return
- }
- return
-}
+//
+// Fetch data
+//
-func (st Store) GetSources() (res []SourceId, err error) {
- res = []SourceId{}
- err = st.db.View(func(tx *bolt.Tx) error {
- c := tx.Bucket([]byte(sourcesRevBn)).Cursor()
- for k, v := c.First(); k != nil; k, v = c.Next() {
- var s sourceDb
- if err := json.Unmarshal(v, &s); err != nil {
- return err
- }
- var src SourceId
- src.CopyFromSourceDb(s)
- res = append(res, src)
- }
- return nil
- })
- return
-}
+// fetch key-value pairs from buckets
func (st Store) getHub(tx *bolt.Tx, id int) string {
b := tx.Bucket([]byte(hubUuidsRevBn))
@@ -348,15 +360,17 @@ func (st Store) getHub(tx *bolt.Tx, id int) string {
return string(uuid)
}
-func (st Store) GetSource(id int) (res SourceId, err error) {
- err = st.db.View(func(tx *bolt.Tx) error {
- src, err := st.getSource(tx, id)
- if err != nil {
- return err
- }
- res.CopyFromSourceDb(src)
- return nil
- })
+func (st Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
+ b := tx.Bucket([]byte(sourcesRevBn))
+
+ jsonData := b.Get(itob(id))
+ if jsonData == nil {
+ err = ErrNotFound
+ return
+ }
+ if err = json.Unmarshal(jsonData, &res); err != nil {
+ return
+ }
return
}
@@ -383,7 +397,9 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
return
}
-func (st Store) createDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
+// fetch all the key-value pairs and merge them into the multidimensional dataupdate
+
+func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
var clients []ClientData
if clients, err = st.getClients(tx, duId); err != nil {
return
@@ -402,6 +418,8 @@ func (st Store) createDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateD
return
}
+// Public Fetch Interface
+
func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) {
res = []DataUpdateFull{}
if id < 0 { // TODO: interpret negative values as last x values
@@ -422,8 +440,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error)
return err
}
- var duf DataUpdateFull
- duf, err := st.createDataUpdateFullFromDb(tx, btoi(k), d)
+ duf, err := st.fetchItem(tx, btoi(k), d)
if err != nil {
return err
}
@@ -452,7 +469,7 @@ func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) {
}
var err error
- if res, err = st.createDataUpdateFullFromDb(tx, id, d); err != nil {
+ if res, err = st.fetchItem(tx, id, d); err != nil {
return err
}
return nil
@@ -460,6 +477,22 @@ func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) {
return
}
+//
+// Auxilliary Data
+//
+
+func (st Store) GetStoreId() string {
+ return st.hubUuid
+}
+
+func (st Store) GetLastUpdateId() (updateId int, err error) {
+ err = st.db.View(func(tx *bolt.Tx) error {
+ updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence())
+ return nil
+ })
+ return
+}
+
func (st Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
bUpdateId := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid))
@@ -472,40 +505,32 @@ func (st Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
return
}
-func (st Store) GetLastUpdateId() (updateId int, err error) {
+func (st Store) GetSource(id int) (res SourceId, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
- updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence())
+ src, err := st.getSource(tx, id)
+ if err != nil {
+ return err
+ }
+ res.CopyFromSourceDb(src)
return nil
})
return
}
-func (st Store) GetStoreId() string {
- return st.hubUuid
-}
-
-func NewStore(dbPath string, readOnly bool) (Store, error) {
- db, version, hubid, err := checkDb(dbPath, readOnly)
- if err != nil {
- return Store{}, err
- }
-
- if db != nil {
- s5l.Printf("store: opened (UUID: %s)", hubid)
- } else {
- if readOnly {
- return Store{}, errors.New("store: failed to open, requested read-only mode but file does not exist.")
- }
- db, version, hubid, err = initDb(dbPath)
- if err != nil {
- return Store{}, err
+func (st Store) GetSources() (res []SourceId, err error) {
+ res = []SourceId{}
+ err = st.db.View(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte(sourcesRevBn)).Cursor()
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ var s sourceDb
+ if err := json.Unmarshal(v, &s); err != nil {
+ return err
+ }
+ var src SourceId
+ src.CopyFromSourceDb(s)
+ res = append(res, src)
}
- s5l.Printf("store: initialized (UUID: %s)", hubid)
- }
- return Store{version, hubid, db}, nil
-}
-
-func (st Store) Close() {
- s5l.Printf("store: closing")
- st.db.Close()
+ return nil
+ })
+ return
}