summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-27 02:51:19 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-27 02:51:19 +0200
commit89e09629904d77a4b770315749e855d23af55bf6 (patch)
tree2f6690ef865fdb04653f385c7367dd29233afbed
parenthubinfo table moved to bolt as well (diff)
completely remove sqlite
-rw-r--r--doc/protocol.md1
-rw-r--r--src/hub/Makefile4
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go14
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go255
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go15
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go64
-rwxr-xr-xsrc/hub/test-bolt3
-rwxr-xr-xsrc/hub/test-bolter3
-rwxr-xr-xsrc/hub/test-fwd3
-rwxr-xr-xsrc/hub/test-fwd-es3
-rwxr-xr-xsrc/hub/test-fwd-piwik3
-rwxr-xr-xsrc/hub/test-sqlite5
-rwxr-xr-xsrc/hub/test-srv3
17 files changed, 158 insertions, 232 deletions
diff --git a/doc/protocol.md b/doc/protocol.md
index 74bdbd2..fdd51ee 100644
--- a/doc/protocol.md
+++ b/doc/protocol.md
@@ -24,6 +24,7 @@ values from data updates override values from init. Stateless interfaces will no
init messages and therefore all values must be defined here.
If an interface (i.e. REST) has other means to detect protocol versions the version
field may be omitted entirely.
+The start-time will be processesd and stored with millisecond precision.
{
"version": 1,
diff --git a/src/hub/Makefile b/src/hub/Makefile
index 43b4d9d..1739757 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -38,9 +38,7 @@ endif
EXECUTEABLE := sfive-hub
-LIBS := "gopkg.in/gorp.v2" \
- "github.com/mattn/go-sqlite3" \
- "github.com/boltdb/bolt" \
+LIBS := "github.com/boltdb/bolt" \
"github.com/zenazn/goji" \
"github.com/pborman/uuid" \
"github.com/equinox0815/graphite-golang"
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 7ff58b5..9589812 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -12,7 +12,7 @@ import (
var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags)
func main() {
- db := flag.String("db", "/var/lib/sfive/", "directory to store the database files")
+ db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 6f5ebbb..c5f6e21 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -1,7 +1,6 @@
package sfive
import (
- "path/filepath"
"time"
)
@@ -42,7 +41,7 @@ type getLastUpdateIdToken struct {
}
type StatsSinkServer struct {
- store sqliteStore
+ store Store
quit chan bool
done chan bool
appendData chan StatisticsData
@@ -90,11 +89,7 @@ func (self StatsSinkServer) appendActor() {
token.response <- getHubIdResult{self.store.GetStoreId()}
case token := <-self.getLastUpdateIdChan:
lastUpdateId, err := self.store.GetLastUpdateId()
- if lastUpdateId != nil {
- token.response <- getLastUpdateIdResult{*lastUpdateId, err}
- } else {
- token.response <- getLastUpdateIdResult{0, err}
- }
+ token.response <- getLastUpdateIdResult{lastUpdateId, err}
}
}
}
@@ -147,11 +142,8 @@ func (self StatsSinkServer) Close() {
func NewServer(dbPath string) (server *StatsSinkServer, err error) {
// TODO read configuration and create instance with correct settings
- sqlitePath := filepath.Join(dbPath, "db.sqlite")
- boltPath := filepath.Join(dbPath, "db.bolt")
-
server = new(StatsSinkServer)
- server.store, err = NewStore(sqlitePath, boltPath)
+ server.store, err = NewStore(dbPath)
if err != nil {
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 418d195..bdb0cbf 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -13,8 +13,8 @@ func findMaxId(values []StatisticsData) int {
maxId := -1
for i := range values {
id := values[i].SourceHubDataUpdateId
- if id != nil && *id > maxId {
- maxId = *id
+ if id > maxId {
+ maxId = id
}
}
return maxId
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 6292ba8..a474f47 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -137,9 +137,7 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- if value != nil {
- fmt.Fprintf(w, "%d", *value)
- }
+ fmt.Fprintf(w, "%d", value)
}
func (self StatsSinkServer) ServeWeb(vizAppLocation string) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 3ccc202..1339a4b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -1,66 +1,28 @@
package sfive
import (
- "database/sql"
"encoding/json"
"time"
- // needed for gorp tracing
- // "log"
- // "os"
-
"github.com/boltdb/bolt"
- _ "github.com/mattn/go-sqlite3"
"github.com/pborman/uuid"
- "gopkg.in/gorp.v2"
)
-type sqliteStore struct {
- db *gorp.DbMap
- dbBolt *bolt.DB
- hubId string
-}
-
-func sourceFromStatisticsData(value StatisticsData) sourceDb {
- return sourceDb{
- Hostname: value.SourceId.Hostname,
- StreamId: streamIdDb{
- ContentId: value.SourceId.StreamId.ContentId,
- Format: value.SourceId.StreamId.Format,
- Quality: value.SourceId.StreamId.Quality,
- },
- Tags: value.SourceId.Tags,
- }
-}
-
-func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
- return dataUpdateDb{
- -1,
- -1,
- value.SourceHubUuid,
- value.SourceHubDataUpdateId,
- value.StartTime.Unix(),
- value.Duration,
- value.Data.ClientCount,
- value.Data.BytesReceived,
- value.Data.BytesSent}
-}
-
-func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) {
- du := dataUpdateFromStatisticsData(value)
- cd := value.Data.Clients
- src := sourceFromStatisticsData(value)
-
- return du, cd, src
+type Store struct {
+ hubId string
+ db *bolt.DB
}
-func initDbBolt(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
+func initDb(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
boltDb, err = bolt.Open(boltPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return
}
err = boltDb.Update(func(tx *bolt.Tx) error {
+ if _, err := tx.CreateBucketIfNotExists([]byte(dataUpdatesBn)); err != nil {
+ return err
+ }
if _, err := tx.CreateBucketIfNotExists([]byte(sourcesFwdBn)); err != nil {
return err
}
@@ -96,37 +58,14 @@ func initDbBolt(boltPath string) (boltDb *bolt.DB, hubId string, err error) {
return
}
-func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, err error) {
-
- var db *sql.DB
- db, err = sql.Open("sqlite3", sqlitePath)
- if err != nil {
- return
- }
-
- dbmap = &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}}
- // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds))
-
- dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id")
-
- // TODO use some real migration, yadda yadda
- if err = dbmap.CreateTablesIfNotExists(); err != nil {
- return
- }
-
- return
-}
-
-func (s sqliteStore) insertDataUpdateEntry(srcId int, du *dataUpdateDb) (err error) {
- du.SourceId = srcId
- err = s.db.Insert(du)
- if err != nil {
- return
- }
- return
+func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) {
+ du := NewDataUpdateDb(value)
+ cd := value.Data.Clients
+ src := NewSourceDb(value)
+ return du, cd, src
}
-func (s sqliteStore) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
+func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
bf := tx.Bucket([]byte(sourcesFwdBn))
bf.FillPercent = 1.0 // we only do appends
br := tx.Bucket([]byte(sourcesRevBn))
@@ -155,7 +94,12 @@ func (s sqliteStore) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err
return srcId, err
}
-func (s sqliteStore) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) {
+func (s Store) insertDataUpdateEntry(tx *bolt.Tx, srcId int, du *dataUpdateDb) (duId int, err error) {
+ // TODO: add me
+ return
+}
+
+func (s Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) {
bf := tx.Bucket([]byte(userAgentsFwdBn))
bf.FillPercent = 1.0 // we only do appends
br := tx.Bucket([]byte(userAgentsRevBn))
@@ -178,7 +122,7 @@ func (s sqliteStore) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err e
return uaId, err
}
-func (s sqliteStore) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error {
+func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error {
if len(cd) == 0 {
return nil
}
@@ -202,35 +146,26 @@ func (s sqliteStore) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []C
return b.Put(itob(duId), jsonData)
}
-func (s sqliteStore) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) {
+func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) {
var srcId int
if srcId, err = s.insertNewSource(tx, src); err != nil {
return
}
- if err = s.insertDataUpdateEntry(srcId, &du); err != nil {
+ var duId int
+ if duId, err = s.insertDataUpdateEntry(tx, srcId, &du); err != nil {
return
}
- if err = s.insertDataUpdateClientEntries(tx, du.Id, cd); err != nil {
+ if err = s.insertDataUpdateClientEntries(tx, duId, cd); err != nil {
return
}
return
}
-func (s sqliteStore) Append(update StatisticsData) (err error) {
- return s.AppendMany([]StatisticsData{update})
-}
-
-func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) {
- var tx *gorp.Transaction
- tx, err = s.db.Begin()
- if err != nil {
- return
- }
-
- err = s.dbBolt.Update(func(tx *bolt.Tx) error {
+func (s Store) AppendMany(updates []StatisticsData) (err error) {
+ return s.db.Update(func(tx *bolt.Tx) error {
for _, update := range updates {
du, cd, src := updateFromStatisticsData(update)
if err := s.appendItem(tx, du, cd, src); err != nil {
@@ -239,16 +174,13 @@ func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) {
}
return nil
})
+}
- if err != nil {
- tx.Rollback()
- return
- }
-
- return tx.Commit()
+func (s Store) Append(update StatisticsData) (err error) {
+ return s.AppendMany([]StatisticsData{update})
}
-func (s sqliteStore) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
+func (s Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
b := tx.Bucket([]byte(sourcesRevBn))
jsonData := b.Get(itob(id))
@@ -262,9 +194,9 @@ func (s sqliteStore) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
return
}
-func (s sqliteStore) GetSources() (res []SourceId, err error) {
+func (s Store) GetSources() (res []SourceId, err error) {
res = []SourceId{}
- err = s.dbBolt.View(func(tx *bolt.Tx) error {
+ err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(sourcesRevBn))
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
@@ -281,8 +213,8 @@ func (s sqliteStore) GetSources() (res []SourceId, err error) {
return
}
-func (s sqliteStore) GetSource(id int) (res SourceId, err error) {
- err = s.dbBolt.View(func(tx *bolt.Tx) error {
+func (s Store) GetSource(id int) (res SourceId, err error) {
+ err = s.db.View(func(tx *bolt.Tx) error {
src, err := s.getSource(tx, id)
if err != nil {
return err
@@ -293,12 +225,12 @@ func (s sqliteStore) GetSource(id int) (res SourceId, err error) {
return
}
-func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
- err = s.db.SelectOne(&res, "select * from "+dataUpdatesTn+" where Id = ?", id)
+func (s Store) GetUpdate(id int) (res dataUpdateDb, err error) {
+ // TODO: implement me
return
}
-func (s sqliteStore) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) {
+func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) {
bc := tx.Bucket([]byte(clientDataBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
@@ -321,9 +253,9 @@ func (s sqliteStore) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData
return
}
-func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (res StatisticsData, err error) {
+func (s Store) CreateStatisticsDataFrom(tx *bolt.Tx, duId int, dat dataUpdateDb) (res StatisticsData, err error) {
var clients []ClientData
- if clients, err = s.getClientsByUpdateId(tx, dat.Id); err != nil {
+ if clients, err = s.getClientsByUpdateId(tx, duId); err != nil {
return
}
var src sourceDb
@@ -331,7 +263,7 @@ func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (re
return
}
- res.CopyFromDataUpdateDb(dat, s.hubId)
+ res.CopyFromDataUpdateDb(dat, duId, s.hubId)
res.Hostname = src.Hostname
res.StreamId.ContentId = src.StreamId.ContentId
res.StreamId.Format = src.StreamId.Format
@@ -341,35 +273,24 @@ func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (re
return
}
-func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) {
- err = s.dbBolt.View(func(tx *bolt.Tx) error {
- for i := range dat {
- sd, err := s.CreateStatisticsDataFrom(tx, *dat[i].(*dataUpdateDb))
- if err != nil {
- return err
- }
- res = append(res, sd)
- }
- return nil
- })
- return
-}
-
-func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
- parameters := make(map[string]interface{})
- sql := "SELECT * FROM " + dataUpdatesTn + " WHERE Id > :afterUpdateId limit :limit"
- parameters["afterUpdateId"] = id
- parameters["limit"] = 5000 // TODO: hardcoded value
- var updates []interface{}
- updates, err = s.db.Select(dataUpdateDb{}, sql, parameters)
- s5tl.Printf("sql: %s", sql)
- if err == nil {
- res, _ = s.CreateStatisticsDatasFrom(updates)
- }
+func (s Store) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
+ // err = s.db.View(func(tx *bolt.Tx) error {
+ // // TODO: iterate over ids
+ // duId := 1
+
+ // for i := range dat {
+ // sd, err := s.CreateStatisticsDataFrom(tx, duId, du)
+ // if err != nil {
+ // return err
+ // }
+ // res = append(res, sd)
+ // }
+ // return nil
+ // })
return
}
-func (s sqliteStore) GetUpdates() (res []StatisticsData, err error) {
+func (s Store) GetUpdates() (res []StatisticsData, err error) {
return s.GetUpdatesAfter(-1)
}
@@ -377,50 +298,50 @@ type lastUpdateQueryResult struct {
MaxDataUpdateId *int
}
-func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) {
- result := lastUpdateQueryResult{}
- err = s.db.SelectOne(
- &result,
- "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
- uuid)
- if err == nil {
- updateId = result.MaxDataUpdateId
- } else {
- s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
- }
+func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
+ // TODO: implement me!
+ updateId = -1
+
+ // result := lastUpdateQueryResult{}
+ // err = s.db.SelectOne(
+ // &result,
+ // "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
+ // uuid)
+ // if err == nil {
+ // updateId = result.MaxDataUpdateId
+ // } else {
+ // s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
+ // }
+ // return
return
}
-func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) {
- result := lastUpdateQueryResult{}
- err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn)
- if err == nil {
- updateId = result.MaxDataUpdateId
- } else {
- s5l.Printf("db: failed to find max DataUpdateId: %v", err)
- }
+func (s Store) GetLastUpdateId() (updateId int, err error) {
+ // TODO: implement me!
+ updateId = -1
+
+ // result := lastUpdateQueryResult{}
+ // err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn)
+ // if err == nil {
+ // updateId = result.MaxDataUpdateId
+ // } else {
+ // s5l.Printf("db: failed to find max DataUpdateId: %v", err)
+ // }
return
}
-func (s sqliteStore) GetStoreId() string {
+func (s Store) GetStoreId() string {
return s.hubId
}
-func NewStore(sqlitePath, boltPath string) (sqliteStore, error) {
- boltDb, hubid, err := initDbBolt(boltPath)
- if err != nil {
- return sqliteStore{}, err
- }
-
- db, err := initDbSqlite(sqlitePath)
+func NewStore(dbPath string) (Store, error) {
+ db, hubid, err := initDb(dbPath)
if err != nil {
- boltDb.Close()
- return sqliteStore{}, err
+ return Store{}, err
}
- return sqliteStore{db, boltDb, hubid}, nil
+ return Store{hubid, db}, nil
}
-func (s sqliteStore) Close() {
- s.db.Db.Close()
- s.dbBolt.Close()
+func (s Store) Close() {
+ s.db.Close()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index fe43bf5..38e7bf7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -9,8 +9,7 @@ import (
)
var (
- __sqlitePath = "file:memdb1?mode=memory&cache=shared"
- __boltPath = "/run/s5hub_testing_db.bolt"
+ __boltPath = "/run/s5hub_testing_db.bolt"
)
func TestMain(m *testing.M) {
@@ -24,7 +23,7 @@ func TestMain(m *testing.M) {
func TestAppend(t *testing.T) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
return
@@ -35,7 +34,7 @@ func TestAppend(t *testing.T) {
update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000}
streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
- dat := StatisticsData{nil, nil, source, update}
+ dat := StatisticsData{"", -1, source, update}
err = store.Append(dat)
if err != nil {
@@ -46,7 +45,7 @@ func TestAppend(t *testing.T) {
func TestGetUpdatesAfter(t *testing.T) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
return
@@ -57,7 +56,7 @@ func TestGetUpdatesAfter(t *testing.T) {
update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000}
streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
- dat := StatisticsData{nil, nil, source, update}
+ dat := StatisticsData{"", -1, source, update}
err = store.Append(dat)
if err != nil {
@@ -122,7 +121,7 @@ func generateStatisticsData(n int) (data []StatisticsData) {
func BenchmarkAppendMany(b *testing.B) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
b.Errorf("Failed to initialize: %v", err)
}
@@ -138,7 +137,7 @@ func BenchmarkAppendMany(b *testing.B) {
func BenchmarkGetUpdatesAfter(b *testing.B) {
os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
+ store, err := NewStore(__boltPath)
if err != nil {
b.Errorf("Failed to initialize: %v", err)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 525b6d3..ad6deaf 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -41,8 +41,8 @@ type DataUpdate struct {
}
type StatisticsData struct {
- SourceHubUuid *string
- SourceHubDataUpdateId *int
+ SourceHubUuid string `json:"SourceHubUuid,omitempty"`
+ SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
SourceId
DataUpdate
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index fd21337..d6de68e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -12,15 +12,10 @@ var (
ErrNotFound = errors.New("not found")
)
-// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections
-// this is very much not normalized at all, because I'm too lazy to type
-
const (
- // sqlite table names
- dataUpdatesTn = "DataUpdates"
-
- // bolt bucket names
+ // bucket names
hubInfoBn = "HubInfo"
+ dataUpdatesBn = "DataUpdates"
sourcesFwdBn = "SourcesFwd"
sourcesRevBn = "SourcesRev"
clientDataBn = "ClientData"
@@ -44,6 +39,18 @@ type sourceDb struct {
Tags []string `json:"t"`
}
+func NewSourceDb(value StatisticsData) sourceDb {
+ return sourceDb{
+ Hostname: value.SourceId.Hostname,
+ StreamId: streamIdDb{
+ ContentId: value.SourceId.StreamId.ContentId,
+ Format: value.SourceId.StreamId.Format,
+ Quality: value.SourceId.StreamId.Quality,
+ },
+ Tags: value.SourceId.Tags,
+ }
+}
+
func (s sourceDb) String() string {
return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ","))
}
@@ -66,30 +73,39 @@ type clientDataDb struct {
// stored in dataUpdatesTn
// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
type dataUpdateDb struct {
- Id int
- SourceId int // foreign key to sourcesTn
- SourceHubUuid *string
- SourceHubDataUpdateId *int
- StartTime int64 // time.Time
- Duration int64 // duration in milliseconds
- ClientCount uint
- BytesReceived uint
- BytesSent uint
+ SourceHubUuid string `json:"h,omitempty"`
+ SourceHubDataUpdateId int `json:"hi,omitempty"`
+ SourceId int `json:"si"`
+ StartTime int64 `json:"st"` // unix timestamp in milliseconds
+ Duration int64 `json:"du"` // duration in milliseconds
+ ClientCount uint `json:"cc"`
+ BytesReceived uint `json:"br"`
+ BytesSent uint `json:"bs"`
}
-func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, hubId string) {
- if v.SourceHubUuid == nil {
- s.SourceHubUuid = &hubId
- } else {
- s.SourceHubUuid = v.SourceHubUuid
+func NewDataUpdateDb(v StatisticsData) dataUpdateDb {
+ return dataUpdateDb{
+ v.SourceHubUuid,
+ v.SourceHubDataUpdateId,
+ -1,
+ int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000),
+ v.Duration,
+ v.Data.ClientCount,
+ v.Data.BytesReceived,
+ v.Data.BytesSent,
}
- if v.SourceHubDataUpdateId == nil {
- s.SourceHubDataUpdateId = &v.Id
+}
+
+func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) {
+ if v.SourceHubUuid == "" {
+ s.SourceHubUuid = hubId
+ s.SourceHubDataUpdateId = vId
} else {
+ s.SourceHubUuid = v.SourceHubUuid
s.SourceHubDataUpdateId = v.SourceHubDataUpdateId
}
- s.StartTime = time.Unix(v.StartTime, 0)
+ s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000)
s.Duration = v.Duration
s.Data.ClientCount = v.ClientCount
s.Data.BytesReceived = v.BytesReceived
diff --git a/src/hub/test-bolt b/src/hub/test-bolt
index 7ac3dd2..9a26b04 100755
--- a/src/hub/test-bolt
+++ b/src/hub/test-bolt
@@ -1,6 +1,7 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
BIN="$(go env GOPATH)/bin/bolt"
if [ ! -x "$BIN" ]; then
@@ -11,4 +12,4 @@ if [ ! -x "$BIN" ]; then
exit 1
fi
-exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_D/db.bolt"
+exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_DB"
diff --git a/src/hub/test-bolter b/src/hub/test-bolter
index dc2e98d..9093d51 100755
--- a/src/hub/test-bolter
+++ b/src/hub/test-bolter
@@ -1,6 +1,7 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
BIN="$(go env GOPATH)/bin/bolter"
if [ ! -x "$BIN" ]; then
@@ -11,4 +12,4 @@ if [ ! -x "$BIN" ]; then
exit 1
fi
-exec $BIN --file "$TEST_D/db.bolt" $@
+exec $BIN --file "$TEST_DB" $@
diff --git a/src/hub/test-fwd b/src/hub/test-fwd
index eff7605..ce6e0e3 100755
--- a/src/hub/test-fwd
+++ b/src/hub/test-fwd
@@ -1,5 +1,6 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000"
diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es
index dc7979f..489fcdd 100755
--- a/src/hub/test-fwd-es
+++ b/src/hub/test-fwd-es
@@ -1,5 +1,6 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/e14"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing"
diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik
index b6ac640..6143c70 100755
--- a/src/hub/test-fwd-piwik
+++ b/src/hub/test-fwd-piwik
@@ -1,5 +1,6 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
diff --git a/src/hub/test-sqlite b/src/hub/test-sqlite
deleted file mode 100755
index 1c2dcb1..0000000
--- a/src/hub/test-sqlite
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-TEST_D="./test"
-
-exec sqlite3 "$TEST_D/db.sqlite"
diff --git a/src/hub/test-srv b/src/hub/test-srv
index 37d5897..803ac37 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -1,7 +1,8 @@
#!/bin/sh
TEST_D="./test"
+TEST_DB="$TEST_D/db.bolt"
mkdir -p "$TEST_D"
rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
-exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000"