diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-04-28 00:12:07 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-04-28 00:12:07 +0200 |
commit | 123c16701fe60c3c4d47abfb7d9b3e5f5b931d70 (patch) | |
tree | bda867d45307430c3671ed90e1e54e684faa5c2c /src | |
parent | web uses server to access store now (diff) |
forwarding works now
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 121 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesStore.go | 33 | ||||
-rwxr-xr-x | src/hub/test-bolt | 10 | ||||
-rwxr-xr-x | src/hub/test-bolter | 10 | ||||
-rwxr-xr-x | src/hub/test-fwd | 7 | ||||
-rwxr-xr-x | src/hub/test-fwd-es | 7 | ||||
-rwxr-xr-x | src/hub/test-fwd-piwik | 7 | ||||
-rwxr-xr-x | src/hub/test-srv | 7 |
8 files changed, 140 insertions, 62 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 275a488..a54ccdd 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -45,34 +45,23 @@ const ( ) type Store struct { - hubId string - db *bolt.DB + hubUuid string + db *bolt.DB } -func initDb(dbPath string) (db *bolt.DB, hubId string, err error) { +func initDb(dbPath string) (db *bolt.DB, hubUuid string, err error) { db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { return } err = db.Update(func(tx *bolt.Tx) error { - if _, err := tx.CreateBucketIfNotExists([]byte(dataUpdatesBn)); err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists([]byte(sourcesFwdBn)); err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists([]byte(sourcesRevBn)); err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists([]byte(clientDataBn)); err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists([]byte(userAgentsFwdBn)); err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists([]byte(userAgentsRevBn)); err != nil { - return err + buckets := []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, dataUpdatesBn, + sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn} + for _, bn := range buckets { + if _, err := tx.CreateBucketIfNotExists([]byte(bn)); err != nil { + return err + } } b, err := tx.CreateBucketIfNotExists([]byte(hubInfoBn)) @@ -81,12 +70,12 @@ func initDb(dbPath string) (db *bolt.DB, hubId string, err error) { } bHubId := b.Get([]byte(hubUUIDKey)) if bHubId == nil { - hubId = uuid.New() - if err := b.Put([]byte(hubUUIDKey), []byte(hubId)); err != nil { + hubUuid = uuid.New() + if err := b.Put([]byte(hubUUIDKey), []byte(hubUuid)); err != nil { return err } } else { - hubId = string(bHubId) + hubUuid = string(bHubId) } return nil }) @@ -94,11 +83,31 @@ func initDb(dbPath string) (db *bolt.DB, hubId string, err error) { return } -func updateFromDataUpdateFull(value DataUpdateFull) (sourceDb, dataUpdateDb, []ClientData) { - src := NewSourceDb(value) - du := NewDataUpdateDb(value) - cd := value.Data.Clients - return src, du, cd +func (st Store) insertNewHub(tx *bolt.Tx, hubUuid string) (hubId int, err error) { + if hubUuid == "" { + return + } + + bf := tx.Bucket([]byte(hubUuidsFwdBn)) + bf.FillPercent = 1.0 // we only do appends + br := tx.Bucket([]byte(hubUuidsRevBn)) + br.FillPercent = 1.0 // we only do appends + + bHubId := bf.Get([]byte(hubUuid)) + if bHubId != nil { + return btoi(bHubId), nil + } + + next, _ := bf.NextSequence() + hubId = int(next) + if err = bf.Put([]byte(hubUuid), itob(hubId)); err != nil { + return + } + if err = br.Put(itob(hubId), []byte(hubUuid)); err != nil { + return + } + + return hubId, err } func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) { @@ -107,7 +116,7 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error br := tx.Bucket([]byte(sourcesRevBn)) br.FillPercent = 1.0 // we only do appends - slug := src.String() + slug := src.Slug() bSrcId := bf.Get([]byte(slug)) if bSrcId != nil { return btoi(bSrcId), nil @@ -192,7 +201,21 @@ func (st Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error { return b.Put(itob(duId), jsonData) } -func (st Store) appendItem(tx *bolt.Tx, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) { +func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error { + b := tx.Bucket([]byte(latestUpdatesBn)) + b.FillPercent = 1.0 // we only do appends + + last := b.Get([]byte(uuid)) + if last != nil && btoi(last) > duId { + return nil + } + return b.Put([]byte(uuid), itob(duId)) +} + +func (st Store) appendItem(tx *bolt.Tx, hubUuid string, src sourceDb, du dataUpdateDb, cd []ClientData) (err error) { + if du.SourceHubId, err = st.insertNewHub(tx, hubUuid); err != nil { + return + } if du.SourceId, err = st.insertNewSource(tx, src); err != nil { return } @@ -206,14 +229,20 @@ func (st Store) appendItem(tx *bolt.Tx, src sourceDb, du dataUpdateDb, cd []Clie return } + if hubUuid != "" { + err = st.setLastUpdateForUuid(tx, hubUuid, du.SourceHubDataUpdateId) + } return } func (st Store) AppendMany(updates []DataUpdateFull) (err error) { return st.db.Update(func(tx *bolt.Tx) error { for _, update := range updates { - src, du, cd := updateFromDataUpdateFull(update) - if err := st.appendItem(tx, src, du, cd); err != nil { + hubUuid := update.SourceHubUuid + src := NewSourceDb(update) + du := NewDataUpdateDb(update) + cd := update.Data.Clients + if err := st.appendItem(tx, hubUuid, src, du, cd); err != nil { return err } } @@ -257,6 +286,15 @@ func (st Store) GetSources() (res []SourceId, err error) { return } +func (st Store) getHub(tx *bolt.Tx, id int) string { + b := tx.Bucket([]byte(hubUuidsRevBn)) + uuid := b.Get(itob(id)) + if uuid == nil { + return "" + } + return string(uuid) +} + func (st Store) GetSource(id int) (res SourceId, err error) { err = st.db.View(func(tx *bolt.Tx) error { src, err := st.getSource(tx, id) @@ -269,7 +307,7 @@ func (st Store) GetSource(id int) (res SourceId, err error) { return } -func (st Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) { +func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) { bc := tx.Bucket([]byte(clientDataBn)) bu := tx.Bucket([]byte(userAgentsRevBn)) @@ -294,15 +332,14 @@ func (st Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err func (st Store) createDataUpdateFullFromDb(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { var clients []ClientData - if clients, err = st.getClientsByUpdateId(tx, duId); err != nil { + if clients, err = st.getClients(tx, duId); err != nil { return } var src sourceDb if src, err = st.getSource(tx, du.SourceId); err != nil { return } - - res.CopyFromDataUpdateDb(du, duId, st.hubId) + res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId) res.Hostname = src.Hostname res.StreamId.ContentId = src.StreamId.ContentId res.StreamId.Format = src.StreamId.Format @@ -371,8 +408,14 @@ func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) { } func (st Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) { - // TODO: implement me! - updateId = 0 + err = st.db.View(func(tx *bolt.Tx) error { + bUpdateId := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid)) + if bUpdateId == nil { + return nil + } + updateId = btoi(bUpdateId) + return nil + }) return } @@ -385,7 +428,7 @@ func (st Store) GetLastUpdateId() (updateId int, err error) { } func (st Store) GetStoreId() string { - return st.hubId + return st.hubUuid } func NewStore(dbPath string) (Store, error) { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index b868313..e2ea6f0 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -47,6 +47,9 @@ var ( const ( // bucket names hubInfoBn = "HubInfo" + latestUpdatesBn = "LatestUpdates" + hubUuidsFwdBn = "HubUUIDsFwd" + hubUuidsRevBn = "HubUUIDsRev" dataUpdatesBn = "DataUpdates" sourcesFwdBn = "SourcesFwd" sourcesRevBn = "SourcesRev" @@ -83,7 +86,7 @@ func NewSourceDb(value DataUpdateFull) sourceDb { } } -func (s sourceDb) String() string { +func (s sourceDb) Slug() string { return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ",")) } @@ -104,19 +107,19 @@ type clientDataDb struct { // stored in dataUpdatesBn type dataUpdateDb struct { - SourceHubUuid string `json:"h,omitempty"` - SourceHubDataUpdateId int `json:"hi,omitempty"` - SourceId int `json:"si"` - StartTime int64 `json:"st"` // unix timestamp in milliseconds - Duration int64 `json:"du"` // duration in milliseconds - ClientCount uint `json:"cc"` - BytesReceived uint `json:"br"` - BytesSent uint `json:"bs"` + SourceHubId int `json:"h,omitempty"` + SourceHubDataUpdateId int `json:"hi,omitempty"` + SourceId int `json:"si"` + StartTime int64 `json:"st"` // unix timestamp in milliseconds + Duration int64 `json:"du"` // duration in milliseconds + ClientCount uint `json:"cc"` + BytesReceived uint `json:"br"` + BytesSent uint `json:"bs"` } func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb { return dataUpdateDb{ - v.SourceHubUuid, + -1, v.SourceHubDataUpdateId, -1, int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000), @@ -127,12 +130,12 @@ func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb { } } -func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) { - if v.SourceHubUuid == "" { - s.SourceHubUuid = hubId - s.SourceHubDataUpdateId = vId +func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, srcHubUuid, hubUuid string, id int) { + if srcHubUuid == "" { + s.SourceHubUuid = hubUuid + s.SourceHubDataUpdateId = id } else { - s.SourceHubUuid = v.SourceHubUuid + s.SourceHubUuid = srcHubUuid s.SourceHubDataUpdateId = v.SourceHubDataUpdateId } diff --git a/src/hub/test-bolt b/src/hub/test-bolt index 9a26b04..1787181 100755 --- a/src/hub/test-bolt +++ b/src/hub/test-bolt @@ -1,7 +1,13 @@ #!/bin/sh +if [ -z "$1" ]; then + echo "Usage: $0 <db-name> [ ... args-to-bolt ... ]" + exit 1 +fi + TEST_D="./test" -TEST_DB="$TEST_D/db.bolt" +TEST_DB="$TEST_D/$1.bolt" +shift BIN="$(go env GOPATH)/bin/bolt" if [ ! -x "$BIN" ]; then @@ -12,4 +18,4 @@ if [ ! -x "$BIN" ]; then exit 1 fi -exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_DB" +exec "$BIN" $@ "$TEST_DB" diff --git a/src/hub/test-bolter b/src/hub/test-bolter index 9093d51..24c8901 100755 --- a/src/hub/test-bolter +++ b/src/hub/test-bolter @@ -1,7 +1,13 @@ #!/bin/sh +if [ -z "$1" ]; then + echo "Usage: $0 <db-name> [ ... args-to-bolter ... ]" + exit 1 +fi + TEST_D="./test" -TEST_DB="$TEST_D/db.bolt" +TEST_DB="$TEST_D/$1.bolt" +shift BIN="$(go env GOPATH)/bin/bolter" if [ ! -x "$BIN" ]; then @@ -12,4 +18,4 @@ if [ ! -x "$BIN" ]; then exit 1 fi -exec $BIN --file "$TEST_DB" $@ +exec "$BIN" --file "$TEST_DB" $@ diff --git a/src/hub/test-fwd b/src/hub/test-fwd index ce6e0e3..35dd324 100755 --- a/src/hub/test-fwd +++ b/src/hub/test-fwd @@ -1,6 +1,11 @@ #!/bin/sh +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + TEST_D="./test" -TEST_DB="$TEST_D/db.bolt" +TEST_DB="$TEST_D/$1.bolt" exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000" diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es index 489fcdd..2d74db6 100755 --- a/src/hub/test-fwd-es +++ b/src/hub/test-fwd-es @@ -1,6 +1,11 @@ #!/bin/sh +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + TEST_D="./test" -TEST_DB="$TEST_D/db.bolt" +TEST_DB="$TEST_D/$1.bolt" exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing" diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik index 6143c70..c7df4ae 100755 --- a/src/hub/test-fwd-piwik +++ b/src/hub/test-fwd-piwik @@ -1,6 +1,11 @@ #!/bin/sh +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + TEST_D="./test" -TEST_DB="$TEST_D/db.bolt" +TEST_DB="$TEST_D/$1.bolt" exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at" diff --git a/src/hub/test-srv b/src/hub/test-srv index 19a6539..064fa3a 100755 --- a/src/hub/test-srv +++ b/src/hub/test-srv @@ -1,7 +1,12 @@ #!/bin/sh +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + TEST_D="./test" -TEST_DB="$TEST_D/db.bolt" +TEST_DB="$TEST_D/$1.bolt" mkdir -p "$TEST_D" rm -f "$TEST_D/pipe" "$TEST_D/pipegram" |