summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-27 02:51:19 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-27 02:51:19 +0200
commit89e09629904d77a4b770315749e855d23af55bf6 (patch)
tree2f6690ef865fdb04653f385c7367dd29233afbed /src/hub
parenthubinfo table moved to bolt as well (diff)
completely remove sqlite
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/Makefile4
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go14
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go255
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go15
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go64
-rwxr-xr-xsrc/hub/test-bolt3
-rwxr-xr-xsrc/hub/test-bolter3
-rwxr-xr-xsrc/hub/test-fwd3
-rwxr-xr-xsrc/hub/test-fwd-es3
-rwxr-xr-xsrc/hub/test-fwd-piwik3
-rwxr-xr-xsrc/hub/test-sqlite5
-rwxr-xr-xsrc/hub/test-srv3
16 files changed, 157 insertions, 232 deletions
diff --git a/src/hub/Makefile b/src/hub/Makefile
index 43b4d9d..1739757 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -38,9 +38,7 @@ endif
EXECUTEABLE := sfive-hub
-LIBS := "gopkg.in/gorp.v2" \
- "github.com/mattn/go-sqlite3" \
- "github.com/boltdb/bolt" \
+LIBS := "github.com/boltdb/bolt" \
"github.com/zenazn/goji" \
"github.com/pborman/uuid" \
"github.com/equinox0815/graphite-golang"
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 7ff58b5..9589812 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -12,7 +12,7 @@ import (
var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags)
func main() {
- db := flag.String("db", "/var/lib/sfive/", "directory to store the database files")
+ db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 6f5ebbb..c5f6e21 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -1,7 +1,6 @@
package sfive
import (
- "path/filepath"
"time"
)
@@ -42,7 +41,7 @@ type getLastUpdateIdToken struct {
}
type StatsSinkServer struct {
- store sqliteStore
+ store Store
quit chan bool
done chan bool
appendData chan StatisticsData
@@ -90,11 +89,7 @@ func (self StatsSinkServer) appendActor() {
token.response <- getHubIdResult{self.store.GetStoreId()}
case token := <-self.getLastUpdateIdChan:
lastUpdateId, err := self.store.GetLastUpdateId()
- if lastUpdateId != nil {
- token.response <- getLastUpdateIdResult{*lastUpdateId, err}
- } else {
- token.response <- getLastUpdateIdResult{0, err}
- }
+ token.response <- getLastUpdateIdResult{lastUpdateId, err}
}
}
}
@@ -147,11 +142,8 @@ func (self StatsSinkServer) Close() {
func NewServer(dbPath string) (server *StatsSinkServer, err error) {
// TODO read configuration and create instance with correct settings
- sqlitePath := filepath.Join(dbPath, "db.sqlite")
- boltPath := filepath.Join(dbPath, "db.bolt")
-
server = new(StatsSinkServer)
- server.store, err = NewStore(sqlitePath, boltPath)
+ server.store, err = NewStore(dbPath)
if err != nil {
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 418d195..bdb0cbf 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -13,8 +13,8 @@ func findMaxId(values []StatisticsData) int {
maxId := -1
for i := range values {
id := values[i].SourceHubDataUpdateId
- if id != nil && *id > maxId {
- maxId = *id
+ if id > maxId {
+ maxId = id
}
}
return maxId
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 6292ba8..a474f47 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -137,9 +137,7 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- if value != nil {
- fmt.Fprintf(w, "%d", *value)
- }
+ fmt.Fprintf(w, "%d", value)
}
func (self StatsSinkServer) ServeWeb(vizAppLocation string) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 3ccc202..1339a4b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -1,66 +1,28 @@
package sfive
import (
- "database/sql"
"encoding/json"
"time"
- // needed for gorp tracing
- // "log"
- // "os"
-
"github.com/boltdb/bolt"
- _ "github.com/mattn/go-sqlite3"
"github.com/pborman/uuid"
- "gopkg.in/gorp.v2"
)
-type sqliteStore struct {
- db *gorp.DbMap
- dbBolt *bolt.DB
- hubId string
-}
-
-func sourceFromStatisticsData(value StatisticsData) sourceDb {
- return sourceDb{
- Hostname: value.SourceId.Hostname,
- StreamId: streamIdDb{
- ContentId: value.SourceId.StreamId.ContentId,
- Format: value.SourceId.StreamId.Format,
- Quality: value.SourceId.StreamId.Quality,
- },
- Tags: value.SourceId.Tags,
- }
-}
-
-func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
- return dataUpdateDb{
- -1,
- -1,
- value.SourceHubUuid,
- value.SourceHubDataUpdateId,
- value.StartTime.Unix(),
- value.Duration,
- value.Data.ClientCount,
- value.Data.BytesReceived,
- value.Data.BytesSent}
-}
-
-func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) {
- du := dataUpdateFromStatisticsData(value)
- cd := value.Data.Clients
- src := sourceFromStatisticsData(value)
-
- return du, cd, src
+type Store struct {
+ hubId string
+ db *bolt.DB
}
-func initDbBolt(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
+func initDb(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
boltDb, err = bolt.Open(boltPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return
}
err = boltDb.Update(func(tx *bolt.Tx) error {
+ if _, err := tx.CreateBucketIfNotExists([]byte(dataUpdatesBn)); err != nil {
+ return err
+ }
if _, err := tx.CreateBucketIfNotExists([]byte(sourcesFwdBn)); err != nil {
return err
}
@@ -96,37 +58,14 @@ func initDbBolt(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
return
}
-func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, err error) {
-
- var db *sql.DB
- db, err = sql.Open("sqlite3", sqlitePath)
- if err != nil {
- return
- }
-
- dbmap = &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}}
- // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds))
-
- dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id")
-
- // TODO use some real migration, yadda yadda
- if err = dbmap.CreateTablesIfNotExists(); err != nil {
- return
- }
-
- return
-}
-
-func (s sqliteStore) insertDataUpdateEntry(srcId int, du *dataUpdateDb) (err error) {
- du.SourceId = srcId
- err = s.db.Insert(du)
- if err != nil {
- return
- }
- return
+func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) {
+ du := NewDataUpdateDb(value)
+ cd := value.Data.Clients
+ src := NewSourceDb(value)
+ return du, cd, src
}
-func (s sqliteStore) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
+func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
bf := tx.Bucket([]byte(sourcesFwdBn))
bf.FillPercent = 1.0 // we only do appends
br := tx.Bucket([]byte(sourcesRevBn))
@@ -155,7 +94,12 @@ func (s sqliteStore) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err
return srcId, err
}
-func (s sqliteStore) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) {
+func (s Store) insertDataUpdateEntry(tx *bolt.Tx, srcId int, du *dataUpdateDb) (duId int, err error) {
+ // TODO: add me
+ return
+}
+
+func (s Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) {
bf := tx.Bucket([]byte(userAgentsFwdBn))
bf.FillPercent = 1.0 // we only do appends
br := tx.Bucket([]byte(userAgentsRevBn))
@@ -178,7 +122,7 @@ func (s sqliteStore) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err e
return uaId, err
}
-func (s sqliteStore) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error {
+func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error {
if len(cd) == 0 {
return nil
}
@@ -202,35 +146,26 @@ func (s sqliteStore) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []C
return b.Put(itob(duId), jsonData)
}
-func (s sqliteStore) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) {
+func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) {
var srcId int
if srcId, err = s.insertNewSource(tx, src); err != nil {
return
}
- if err = s.insertDataUpdateEntry(srcId, &du); err != nil {
+ var duId int
+ if duId, err = s.insertDataUpdateEntry(tx, srcId, &du); err != nil {
return
}
- if err = s.insertDataUpdateClientEntries(tx, du.Id, cd); err != nil {
+ if err = s.insertDataUpdateClientEntries(tx, duId, cd); err != nil {
return
}
return
}
-func (s sqliteStore) Append(update StatisticsData) (err error) {
- return s.AppendMany([]StatisticsData{update})
-}
-
-func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) {
- var tx *gorp.Transaction
- tx, err = s.db.Begin()
- if err != nil {
- return
- }
-
- err = s.dbBolt.Update(func(tx *bolt.Tx) error {
+func (s Store) AppendMany(updates []StatisticsData) (err error) {
+ return s.db.Update(func(tx *bolt.Tx) error {
for _, update := range updates {
du, cd, src := updateFromStatisticsData(update)
if err := s.appendItem(tx, du, cd, src); err != nil {
@@ -239,16 +174,13 @@ func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) {
}
return nil
})
+}
- if err != nil {
- tx.Rollback()
- return
- }
-
- return tx.Commit()
+func (s Store) Append(update StatisticsData) (err error) {
+ return s.AppendMany([]StatisticsData{update})
}
-func (s sqliteStore) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
+func (s Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
b := tx.Bucket([]byte(sourcesRevBn))
jsonData := b.Get(itob(id))
@@ -262,9 +194,9 @@ func (s sqliteStore) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
return
}
-func (s sqliteStore) GetSources() (res []SourceId, err error) {
+func (s Store) GetSources() (res []SourceId, err error) {
res = []SourceId{}
- err = s.dbBolt.View(func(tx *bolt.Tx) error {
+ err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(sourcesRevBn))
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
@@ -281,8 +213,8 @@ func (s sqliteStore) GetSources() (res []SourceId, err error) {
return
}
-func (s sqliteStore) GetSource(id int) (res SourceId, err error) {
- err = s.dbBolt.View(func(tx *bolt.Tx) error {
+func (s Store) GetSource(id int) (res SourceId, err error) {
+ err = s.db.View(func(tx *bolt.Tx) error {
src, err := s.getSource(tx, id)
if err != nil {
return err
@@ -293,12 +225,12 @@ func (s sqliteStore) GetSource(id int) (res SourceId, err error) {
return
}
-func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
- err = s.db.SelectOne(&res, "select * from "+dataUpdatesTn+" where Id = ?", id)
+func (s Store) GetUpdate(id int) (res dataUpdateDb, err error) {
+ // TODO: implement me
return
}
-func (s sqliteStore) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) {
+func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) {
bc := tx.Bucket([]byte(clientDataBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
@@ -321,9 +253,9 @@ func (s sqliteStore) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData
return
}
-func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (res StatisticsData, err error) {
+func (s Store) CreateStatisticsDataFrom(tx *bolt.Tx, duId int, dat dataUpdateDb) (res StatisticsData, err error) {
var clients []ClientData
- if clients, err = s.getClientsByUpdateId(tx, dat.Id); err != nil {
+ if clients, err = s.getClientsByUpdateId(tx, duId); err != nil {
return
}
var src sourceDb
@@ -331,7 +263,7 @@ func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (re
return
}
- res.CopyFromDataUpdateDb(dat, s.hubId)
+ res.CopyFromDataUpdateDb(dat, duId, s.hubId)
res.Hostname = src.Hostname
res.StreamId.ContentId = src.StreamId.ContentId
res.StreamId.Format = src.StreamId.Format
@@ -341,35 +273,24 @@ func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (re
return
}
-func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) {
- err = s.dbBolt.View(func(tx *bolt.Tx) error {
- for i := range dat {
- sd, err := s.CreateStatisticsDataFrom(tx, *dat[i].(*dataUpdateDb))
- if err != nil {
- return err
- }
- res = append(res, sd)
- }
- return nil
- })
- return
-}
-
-func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
- parameters := make(map[string]interface{})
- sql := "SELECT * FROM " + dataUpdatesTn + " WHERE Id > :afterUpdateId limit :limit"
- parameters["afterUpdateId"] = id
- parameters["limit"] = 5000 // TODO: hardcoded value
- var updates []interface{}
- updates, err = s.db.Select(dataUpdateDb{}, sql, parameters)
- s5tl.Printf("sql: %s", sql)
- if err == nil {
- res, _ = s.CreateStatisticsDatasFrom(updates)
- }
+func (s Store) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
+ // err = s.db.View(func(tx *bolt.Tx) error {
+ // // TODO: iterate over ids
+ // duId := 1
+
+ // for i := range dat {
+ // sd, err := s.CreateStatisticsDataFrom(tx, duId, du)
+ // if err != nil {
+ // return err
+ // }
+ // res = append(res, sd)
+ // }
+ // return nil
+ // })
return
}
-func (s sqliteStore) GetUpdates() (res []StatisticsData, err error) {
+func (s Store) GetUpdates() (res []StatisticsData, err error) {
return s.GetUpdatesAfter(-1)
}
@@ -377,50 +298,50 @@ type lastUpdateQueryResult struct {
MaxDataUpdateId *int
}
-func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) {
- result := lastUpdateQueryResult{}
- err = s.db.SelectOne(
- &result,
- "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
- uuid)
- if err == nil {
- updateId = result.MaxDataUpdateId
- } else {
- s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
- }
+func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
+ // TODO: implement me!
+ updateId = -1
+
+ // result := lastUpdateQueryResult{}
+ // err = s.db.SelectOne(
+ // &result,
+ // "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
+ // uuid)
+ // if err == nil {
+ // updateId = result.MaxDataUpdateId
+ // } else {
+ // s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
+ // }
+ // return
return
}
-func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) {
- result := lastUpdateQueryResult{}
- err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn)
- if err == nil {
- updateId = result.MaxDataUpdateId
- } else {
- s5l.Printf("db: failed to find max DataUpdateId: %v", err)
- }
+func (s Store) GetLastUpdateId() (updateId int, err error) {
+ // TODO: implement me!
+ updateId = -1
+
+ // result := lastUpdateQueryResult{}
+ // err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn)
+ // if err == nil {
+ // updateId = result.MaxDataUpdateId
+ // } else {
+ // s5l.Printf("db: failed to find max DataUpdateId: %v", err)
+ // }
return
}
-func (s sqliteStore) GetStoreId() string {
+func (s Store) GetStoreId() string {
return s.hubId
}
-func NewStore(sqlitePath, boltPath string) (sqliteStore, error) {
- boltDb, hubid, err := initDbBolt(boltPath)
- if err != nil {
- return sqliteStore{}, err
- }
-
- db, err := initDbSqlite(sqlitePath)
+func NewStore(dbPath string) (Store, error) {
+ db, hubid, err := initDb(dbPath)
if err != nil {
- boltDb.Close()
- return sqliteStore{}, err
+ return Store{}, err
}
- return sqliteStore{db, boltDb, hubid}, nil
+ return Store{hubid, db}, nil
}
-func (s sqliteStore) Close() {
- s.db.Db.Close()
- s.dbBolt.Close()
+func (s Store) Close() {
+ s.db.Close()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index fe43bf5..38e7bf7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -9,8 +9,7 @@ import (
)
var (
- __sqlitePath = "file:memdb1?mode=memory&cache=shared"
- __boltPath = "/run/s5hub_testing_db.bolt"
+ __boltPath = "/run/s5hub_testing_db.bolt"
)
func TestMain(m *testing.M) {
@@ -24,7 +23,7 @@ func TestMain(m *testing.M) {
func TestAppend(t *testing.T) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
return
@@ -35,7 +34,7 @@ func TestAppend(t *testing.T) {
update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000}
streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
- dat := StatisticsData{nil, nil, source, update}
+ dat := StatisticsData{"", -1, source, update}
err = store.Append(dat)
if err != nil {
@@ -46,7 +45,7 @@ func TestAppend(t *testing.T) {
func TestGetUpdatesAfter(t *testing.T) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
return
@@ -57,7 +56,7 @@ func TestGetUpdatesAfter(t *testing.T) {
update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000}
streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
- dat := StatisticsData{nil, nil, source, update}
+ dat := StatisticsData{"", -1, source, update}
err = store.Append(dat)
if err != nil {
@@ -122,7 +121,7 @@ func generateStatisticsData(n int) (data []StatisticsData) {
func BenchmarkAppendMany(b *testing.B) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
b.Errorf("Failed to initialize: %v", err)
}
@@ -138,7 +137,7 @@ func BenchmarkAppendMany(b *testing.B) {
func BenchmarkGetUpdatesAfter(b *testing.B) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
b.Errorf("Failed to initialize: %v", err)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 525b6d3..ad6deaf 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -41,8 +41,8 @@ type DataUpdate struct {
}
type StatisticsData struct {
- SourceHubUuid *string
- SourceHubDataUpdateId *int
+ SourceHubUuid string `json:"SourceHubUuid,omitempty"`
+ SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
SourceId
DataUpdate
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index fd21337..d6de68e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -12,15 +12,10 @@ var (
ErrNotFound = errors.New("not found")
)
-// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections
-// this is very much not normalized at all, because I'm too lazy to type
-
const (
- // sqlite table names
- dataUpdatesTn = "DataUpdates"
-
- // bolt bucket names
+ // bucket names
hubInfoBn = "HubInfo"
+ dataUpdatesBn = "DataUpdates"
sourcesFwdBn = "SourcesFwd"
sourcesRevBn = "SourcesRev"
clientDataBn = "ClientData"
@@ -44,6 +39,18 @@ type sourceDb struct {
Tags []string `json:"t"`
}
+func NewSourceDb(value StatisticsData) sourceDb {
+ return sourceDb{
+ Hostname: value.SourceId.Hostname,
+ StreamId: streamIdDb{
+ ContentId: value.SourceId.StreamId.ContentId,
+ Format: value.SourceId.StreamId.Format,
+ Quality: value.SourceId.StreamId.Quality,
+ },
+ Tags: value.SourceId.Tags,
+ }
+}
+
func (s sourceDb) String() string {
return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ","))
}
@@ -66,30 +73,39 @@ type clientDataDb struct {
// stored in dataUpdatesTn
// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
type dataUpdateDb struct {
- Id int
- SourceId int // foreign key to sourcesTn
- SourceHubUuid *string
- SourceHubDataUpdateId *int
- StartTime int64 // time.Time
- Duration int64 // duration in milliseconds
- ClientCount uint
- BytesReceived uint
- BytesSent uint
+ SourceHubUuid string `json:"h,omitempty"`
+ SourceHubDataUpdateId int `json:"hi,omitempty"`
+ SourceId int `json:"si"`
+ StartTime int64 `json:"st"` // unix timestamp in milliseconds
+ Duration int64 `json:"du"` // duration in milliseconds
+ ClientCount uint `json:"cc"`
+ BytesReceived uint `json:"br"`
+ BytesSent uint `json:"bs"`
}
-func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, hubId string) {
- if v.SourceHubUuid == nil {
- s.SourceHubUuid = &hubId
- } else {
- s.SourceHubUuid = v.SourceHubUuid
+func NewDataUpdateDb(v StatisticsData) dataUpdateDb {
+ return dataUpdateDb{
+ v.SourceHubUuid,
+ v.SourceHubDataUpdateId,
+ -1,
+ int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000),
+ v.Duration,
+ v.Data.ClientCount,
+ v.Data.BytesReceived,
+ v.Data.BytesSent,
}
- if v.SourceHubDataUpdateId == nil {
- s.SourceHubDataUpdateId = &v.Id
+}
+
+func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) {
+ if v.SourceHubUuid == "" {
+ s.SourceHubUuid = hubId
+ s.SourceHubDataUpdateId = vId
} else {
+ s.SourceHubUuid = v.SourceHubUuid
s.SourceHubDataUpdateId = v.SourceHubDataUpdateId
}
- s.StartTime = time.Unix(v.StartTime, 0)
+ s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000)
s.Duration = v.Duration
s.Data.ClientCount = v.ClientCount
s.Data.BytesReceived = v.BytesReceived
diff --git a/src/hub/test-bolt b/src/hub/test-bolt
index 7ac3dd2..9a26b04 100755
--- a/src/hub/test-bolt
+++ b/src/hub/test-bolt
@@ -1,6 +1,7 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
BIN="$(go env GOPATH)/bin/bolt"
if [ ! -x "$BIN" ]; then
@@ -11,4 +12,4 @@ if [ ! -x "$BIN" ]; then
exit 1
fi
-exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_D/db.bolt"
+exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_DB"
diff --git a/src/hub/test-bolter b/src/hub/test-bolter
index dc2e98d..9093d51 100755
--- a/src/hub/test-bolter
+++ b/src/hub/test-bolter
@@ -1,6 +1,7 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
BIN="$(go env GOPATH)/bin/bolter"
if [ ! -x "$BIN" ]; then
@@ -11,4 +12,4 @@ if [ ! -x "$BIN" ]; then
exit 1
fi
-exec $BIN --file "$TEST_D/db.bolt" $@
+exec $BIN --file "$TEST_DB" $@
diff --git a/src/hub/test-fwd b/src/hub/test-fwd
index eff7605..ce6e0e3 100755
--- a/src/hub/test-fwd
+++ b/src/hub/test-fwd
@@ -1,5 +1,6 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000"
diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es
index dc7979f..489fcdd 100755
--- a/src/hub/test-fwd-es
+++ b/src/hub/test-fwd-es
@@ -1,5 +1,6 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/e14"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing"
diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik
index b6ac640..6143c70 100755
--- a/src/hub/test-fwd-piwik
+++ b/src/hub/test-fwd-piwik
@@ -1,5 +1,6 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
diff --git a/src/hub/test-sqlite b/src/hub/test-sqlite
deleted file mode 100755
index 1c2dcb1..0000000
--- a/src/hub/test-sqlite
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-TEST_D="./test"
-
-exec sqlite3 "$TEST_D/db.sqlite"
diff --git a/src/hub/test-srv b/src/hub/test-srv
index 37d5897..803ac37 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -1,7 +1,8 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
mkdir -p "$TEST_D"
rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000"