summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-28 00:12:07 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-28 00:12:07 +0200
commit123c16701fe60c3c4d47abfb7d9b3e5f5b931d70 (patch)
treebda867d45307430c3671ed90e1e54e684faa5c2c /src/hub
parentweb uses server to access store now (diff)
forwarding works now
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go121
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go33
-rwxr-xr-xsrc/hub/test-bolt10
-rwxr-xr-xsrc/hub/test-bolter10
-rwxr-xr-xsrc/hub/test-fwd7
-rwxr-xr-xsrc/hub/test-fwd-es7
-rwxr-xr-xsrc/hub/test-fwd-piwik7
-rwxr-xr-xsrc/hub/test-srv7
8 files changed, 140 insertions, 62 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 275a488..a54ccdd 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -45,34 +45,23 @@ const (
)
type Store struct {
- hubId string
- db *bolt.DB
+ hubUuid string
+ db *bolt.DB
}
-func initDb(dbPath string) (db *bolt.DB, hubId string, err error) {
+func initDb(dbPath string) (db *bolt.DB, hubUuid string, err error) {
db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return
}
err = db.Update(func(tx *bolt.Tx) error {
- if _, err := tx.CreateBucketIfNotExists([]byte(dataUpdatesBn)); err != nil {
- return err
- }
- if _, err := tx.CreateBucketIfNotExists([]byte(sourcesFwdBn)); err != nil {
- return err
- }
- if _, err := tx.CreateBucketIfNotExists([]byte(sourcesRevBn)); err != nil {
- return err
- }
- if _, err := tx.CreateBucketIfNotExists([]byte(clientDataBn)); err != nil {
- return err
- }
- if _, err := tx.CreateBucketIfNotExists([]byte(userAgentsFwdBn)); err != nil {
- return err
- }
- if _, err := tx.CreateBucketIfNotExists([]byte(userAgentsRevBn)); err != nil {
- return err
+ buckets := []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, dataUpdatesBn,
+ sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn}
+ for _, bn := range buckets {
+ if _, err := tx.CreateBucketIfNotExists([]byte(bn)); err != nil {
+ return err
+ }
}
b, err := tx.CreateBucketIfNotExists([]byte(hubInfoBn))
@@ -81,12 +70,12 @@ func initDb(dbPath string) (db *bolt.DB, hubId string, err error) {
}
bHubId := b.Get([]byte(hubUUIDKey))
if bHubId == nil {
- hubId = uuid.New()
- if err := b.Put([]byte(hubUUIDKey), []byte(hubId)); err != nil {
+ hubUuid = uuid.New()
+ if err := b.Put([]byte(hubUUIDKey), []byte(hubUuid)); err != nil {
return err
}
} else {
- hubId = string(bHubId)
+ hubUuid = string(bHubId)
}
return nil
})
@@ -94,11 +83,31 @@ func initDb(dbPath string) (db *bolt.DB, hubId string, err error) {
return
}
-func updateFromDataUpdateFull(value DataUpdateFull) (sourceDb, dataUpdateDb, []ClientData) {
- src := NewSourceDb(value)
- du := NewDataUpdateDb(value)
- cd := value.Data.Clients
- return src, du, cd
+func (st Store) insertNewHub(tx *bolt.Tx, hubUuid string) (hubId int, err error) {
+ if hubUuid == "" {
+ return
+ }
+
+ bf := tx.Bucket([]byte(hubUuidsFwdBn))
+ bf.FillPercent = 1.0 // we only do appends
+ br := tx.Bucket([]byte(hubUuidsRevBn))
+ br.FillPercent = 1.0 // we only do appends
+
+ bHubId := bf.Get([]byte(hubUuid))
+ if bHubId != nil {
+ return btoi(bHubId), nil
+ }
+
+ next, _ := bf.NextSequence()
+ hubId = int(next)
+ if err = bf.Put([]byte(hubUuid), itob(hubId)); err != nil {
+ return
+ }
+ if err = br.Put(itob(hubId), []byte(hubUuid)); err != nil {
+ return
+ }
+
+ return hubId, err
}
func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
@@ -107,7 +116,7 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error
br := tx.Bucket([]byte(sourcesRevBn))
br.FillPercent = 1.0 // we only do appends
- slug := src.String()
+ slug := src.Slug()
bSrcId := bf.Get([]byte(slug))
if bSrcId != nil {
return btoi(bSrcId), nil
@@ -192,7 +201,21 @@ func (st Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error {
return b.Put(itob(duId), jsonData)
}
-func (st Store) appendItem(tx *bolt.Tx, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) {
+func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error {
+ b := tx.Bucket([]byte(latestUpdatesBn))
+ b.FillPercent = 1.0 // we only do appends
+
+ last := b.Get([]byte(uuid))
+ if last != nil && btoi(last) > duId {
+ return nil
+ }
+ return b.Put([]byte(uuid), itob(duId))
+}
+
+func (st Store) appendItem(tx *bolt.Tx, hubUuid string, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) {
+ if du.SourceHubId, err = st.insertNewHub(tx, hubUuid); err != nil {
+ return
+ }
if du.SourceId, err = st.insertNewSource(tx, src); err != nil {
return
}
@@ -206,14 +229,20 @@ func (st Store) appendItem(tx *bolt.Tx, src sourceDb, du dataUpdateDb, cd []Clie
return
}
+ if hubUuid != "" {
+ err = st.setLastUpdateForUuid(tx, hubUuid, du.SourceHubDataUpdateId)
+ }
return
}
func (st Store) AppendMany(updates []DataUpdateFull) (err error) {
return st.db.Update(func(tx *bolt.Tx) error {
for _, update := range updates {
- src, du, cd := updateFromDataUpdateFull(update)
- if err := st.appendItem(tx, src, du, cd); err != nil {
+ hubUuid := update.SourceHubUuid
+ src := NewSourceDb(update)
+ du := NewDataUpdateDb(update)
+ cd := update.Data.Clients
+ if err := st.appendItem(tx, hubUuid, src, du, cd); err != nil {
return err
}
}
@@ -257,6 +286,15 @@ func (st Store) GetSources() (res []SourceId, err error) {
return
}
+func (st Store) getHub(tx *bolt.Tx, id int) string {
+ b := tx.Bucket([]byte(hubUuidsRevBn))
+ uuid := b.Get(itob(id))
+ if uuid == nil {
+ return ""
+ }
+ return string(uuid)
+}
+
func (st Store) GetSource(id int) (res SourceId, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
src, err := st.getSource(tx, id)
@@ -269,7 +307,7 @@ func (st Store) GetSource(id int) (res SourceId, err error) {
return
}
-func (st Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) {
+func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
bc := tx.Bucket([]byte(clientDataBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
@@ -294,15 +332,14 @@ func (st Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err
func (st Store) createDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
var clients []ClientData
- if clients, err = st.getClientsByUpdateId(tx, duId); err != nil {
+ if clients, err = st.getClients(tx, duId); err != nil {
return
}
var src sourceDb
if src, err = st.getSource(tx, du.SourceId); err != nil {
return
}
-
- res.CopyFromDataUpdateDb(du, duId, st.hubId)
+ res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId)
res.Hostname = src.Hostname
res.StreamId.ContentId = src.StreamId.ContentId
res.StreamId.Format = src.StreamId.Format
@@ -371,8 +408,14 @@ func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) {
}
func (st Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
- // TODO: implement me!
- updateId = 0
+ err = st.db.View(func(tx *bolt.Tx) error {
+ bUpdateId := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid))
+ if bUpdateId == nil {
+ return nil
+ }
+ updateId = btoi(bUpdateId)
+ return nil
+ })
return
}
@@ -385,7 +428,7 @@ func (st Store) GetLastUpdateId() (updateId int, err error) {
}
func (st Store) GetStoreId() string {
- return st.hubId
+ return st.hubUuid
}
func NewStore(dbPath string) (Store, error) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index b868313..e2ea6f0 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -47,6 +47,9 @@ var (
const (
// bucket names
hubInfoBn = "HubInfo"
+ latestUpdatesBn = "LatestUpdates"
+ hubUuidsFwdBn = "HubUUIDsFwd"
+ hubUuidsRevBn = "HubUUIDsRev"
dataUpdatesBn = "DataUpdates"
sourcesFwdBn = "SourcesFwd"
sourcesRevBn = "SourcesRev"
@@ -83,7 +86,7 @@ func NewSourceDb(value DataUpdateFull) sourceDb {
}
}
-func (s sourceDb) String() string {
+func (s sourceDb) Slug() string {
return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ","))
}
@@ -104,19 +107,19 @@ type clientDataDb struct {
// stored in dataUpdatesBn
type dataUpdateDb struct {
- SourceHubUuid string `json:"h,omitempty"`
- SourceHubDataUpdateId int `json:"hi,omitempty"`
- SourceId int `json:"si"`
- StartTime int64 `json:"st"` // unix timestamp in milliseconds
- Duration int64 `json:"du"` // duration in milliseconds
- ClientCount uint `json:"cc"`
- BytesReceived uint `json:"br"`
- BytesSent uint `json:"bs"`
+ SourceHubId int `json:"h,omitempty"`
+ SourceHubDataUpdateId int `json:"hi,omitempty"`
+ SourceId int `json:"si"`
+ StartTime int64 `json:"st"` // unix timestamp in milliseconds
+ Duration int64 `json:"du"` // duration in milliseconds
+ ClientCount uint `json:"cc"`
+ BytesReceived uint `json:"br"`
+ BytesSent uint `json:"bs"`
}
func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb {
return dataUpdateDb{
- v.SourceHubUuid,
+ -1,
v.SourceHubDataUpdateId,
-1,
int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000),
@@ -127,12 +130,12 @@ func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb {
}
}
-func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) {
- if v.SourceHubUuid == "" {
- s.SourceHubUuid = hubId
- s.SourceHubDataUpdateId = vId
+func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, srcHubUuid, hubUuid string, id int) {
+ if srcHubUuid == "" {
+ s.SourceHubUuid = hubUuid
+ s.SourceHubDataUpdateId = id
} else {
- s.SourceHubUuid = v.SourceHubUuid
+ s.SourceHubUuid = srcHubUuid
s.SourceHubDataUpdateId = v.SourceHubDataUpdateId
}
diff --git a/src/hub/test-bolt b/src/hub/test-bolt
index 9a26b04..1787181 100755
--- a/src/hub/test-bolt
+++ b/src/hub/test-bolt
@@ -1,7 +1,13 @@
#!/bin/sh
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name> [ ... args-to-bolt ... ]"
+ exit 1
+fi
+
TEST_D="./test"
-TEST_DB="$TEST_D/db.bolt"
+TEST_DB="$TEST_D/$1.bolt"
+shift
BIN="$(go env GOPATH)/bin/bolt"
if [ ! -x "$BIN" ]; then
@@ -12,4 +18,4 @@ if [ ! -x "$BIN" ]; then
exit 1
fi
-exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_DB"
+exec "$BIN" $@ "$TEST_DB"
diff --git a/src/hub/test-bolter b/src/hub/test-bolter
index 9093d51..24c8901 100755
--- a/src/hub/test-bolter
+++ b/src/hub/test-bolter
@@ -1,7 +1,13 @@
#!/bin/sh
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name> [ ... args-to-bolter ... ]"
+ exit 1
+fi
+
TEST_D="./test"
-TEST_DB="$TEST_D/db.bolt"
+TEST_DB="$TEST_D/$1.bolt"
+shift
BIN="$(go env GOPATH)/bin/bolter"
if [ ! -x "$BIN" ]; then
@@ -12,4 +18,4 @@ if [ ! -x "$BIN" ]; then
exit 1
fi
-exec $BIN --file "$TEST_DB" $@
+exec "$BIN" --file "$TEST_DB" $@
diff --git a/src/hub/test-fwd b/src/hub/test-fwd
index ce6e0e3..35dd324 100755
--- a/src/hub/test-fwd
+++ b/src/hub/test-fwd
@@ -1,6 +1,11 @@
#!/bin/sh
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
TEST_D="./test"
-TEST_DB="$TEST_D/db.bolt"
+TEST_DB="$TEST_D/$1.bolt"
exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000"
diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es
index 489fcdd..2d74db6 100755
--- a/src/hub/test-fwd-es
+++ b/src/hub/test-fwd-es
@@ -1,6 +1,11 @@
#!/bin/sh
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
TEST_D="./test"
-TEST_DB="$TEST_D/db.bolt"
+TEST_DB="$TEST_D/$1.bolt"
exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing"
diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik
index 6143c70..c7df4ae 100755
--- a/src/hub/test-fwd-piwik
+++ b/src/hub/test-fwd-piwik
@@ -1,6 +1,11 @@
#!/bin/sh
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
TEST_D="./test"
-TEST_DB="$TEST_D/db.bolt"
+TEST_DB="$TEST_D/$1.bolt"
exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
diff --git a/src/hub/test-srv b/src/hub/test-srv
index 19a6539..064fa3a 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -1,7 +1,12 @@
#!/bin/sh
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
TEST_D="./test"
-TEST_DB="$TEST_D/db.bolt"
+TEST_DB="$TEST_D/$1.bolt"
mkdir -p "$TEST_D"
rm -f "$TEST_D/pipe" "$TEST_D/pipegram"