From 89e09629904d77a4b770315749e855d23af55bf6 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 27 Apr 2017 02:51:19 +0200 Subject: completely remove sqlite --- src/hub/Makefile | 4 +- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 2 +- src/hub/src/spreadspace.org/sfive/s5srv.go | 14 +- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 4 +- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 4 +- src/hub/src/spreadspace.org/sfive/s5store.go | 255 ++++++++-------------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 15 +- src/hub/src/spreadspace.org/sfive/s5typesApi.go | 4 +- src/hub/src/spreadspace.org/sfive/s5typesStore.go | 64 ++++-- src/hub/test-bolt | 3 +- src/hub/test-bolter | 3 +- src/hub/test-fwd | 3 +- src/hub/test-fwd-es | 3 +- src/hub/test-fwd-piwik | 3 +- src/hub/test-sqlite | 5 - src/hub/test-srv | 3 +- 16 files changed, 157 insertions(+), 232 deletions(-) delete mode 100755 src/hub/test-sqlite (limited to 'src') diff --git a/src/hub/Makefile b/src/hub/Makefile index 43b4d9d..1739757 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -38,9 +38,7 @@ endif EXECUTEABLE := sfive-hub -LIBS := "gopkg.in/gorp.v2" \ - "github.com/mattn/go-sqlite3" \ - "github.com/boltdb/bolt" \ +LIBS := "github.com/boltdb/bolt" \ "github.com/zenazn/goji" \ "github.com/pborman/uuid" \ "github.com/equinox0815/graphite-golang" diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 7ff58b5..9589812 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -12,7 +12,7 @@ import ( var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags) func main() { - db := flag.String("db", "/var/lib/sfive/", "directory to store the database files") + db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file") pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver") ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe") diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 6f5ebbb..c5f6e21 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -1,7 +1,6 @@ package sfive import ( - "path/filepath" "time" ) @@ -42,7 +41,7 @@ type getLastUpdateIdToken struct { } type StatsSinkServer struct { - store sqliteStore + store Store quit chan bool done chan bool appendData chan StatisticsData @@ -90,11 +89,7 @@ func (self StatsSinkServer) appendActor() { token.response <- getHubIdResult{self.store.GetStoreId()} case token := <-self.getLastUpdateIdChan: lastUpdateId, err := self.store.GetLastUpdateId() - if lastUpdateId != nil { - token.response <- getLastUpdateIdResult{*lastUpdateId, err} - } else { - token.response <- getLastUpdateIdResult{0, err} - } + token.response <- getLastUpdateIdResult{lastUpdateId, err} } } } @@ -147,11 +142,8 @@ func (self StatsSinkServer) Close() { func NewServer(dbPath string) (server *StatsSinkServer, err error) { // TODO read configuration and create instance with correct settings - sqlitePath := filepath.Join(dbPath, "db.sqlite") - boltPath := filepath.Join(dbPath, "db.bolt") - server = new(StatsSinkServer) - server.store, err = NewStore(sqlitePath, boltPath) + server.store, err = NewStore(dbPath) if err != nil { return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 418d195..bdb0cbf 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -13,8 +13,8 @@ func findMaxId(values []StatisticsData) int { maxId := -1 for i := range values { id := values[i].SourceHubDataUpdateId - if id != nil && *id > maxId { - maxId = *id + if id > maxId { + maxId = id } } return maxId diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 6292ba8..a474f47 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -137,9 +137,7 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - if value != nil { - fmt.Fprintf(w, "%d", *value) - } + fmt.Fprintf(w, "%d", value) } func (self StatsSinkServer) ServeWeb(vizAppLocation string) { diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 3ccc202..1339a4b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -1,66 +1,28 @@ package sfive import ( - "database/sql" "encoding/json" "time" - // needed for gorp tracing - // "log" - // "os" - "github.com/boltdb/bolt" - _ "github.com/mattn/go-sqlite3" "github.com/pborman/uuid" - "gopkg.in/gorp.v2" ) -type sqliteStore struct { - db *gorp.DbMap - dbBolt *bolt.DB - hubId string -} - -func sourceFromStatisticsData(value StatisticsData) sourceDb { - return sourceDb{ - Hostname: value.SourceId.Hostname, - StreamId: streamIdDb{ - ContentId: value.SourceId.StreamId.ContentId, - Format: value.SourceId.StreamId.Format, - Quality: value.SourceId.StreamId.Quality, - }, - Tags: value.SourceId.Tags, - } -} - -func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { - return dataUpdateDb{ - -1, - -1, - value.SourceHubUuid, - value.SourceHubDataUpdateId, - value.StartTime.Unix(), - value.Duration, - value.Data.ClientCount, - value.Data.BytesReceived, - value.Data.BytesSent} -} - -func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) { - du := dataUpdateFromStatisticsData(value) - cd := value.Data.Clients - src := sourceFromStatisticsData(value) - - return du, cd, src +type Store struct { + hubId string + db *bolt.DB } -func initDbBolt(boltPath string) (boltDb *bolt.DB, hubId string, err error) { +func initDb(boltPath string) (boltDb *bolt.DB, hubId string, err error) { boltDb, err = bolt.Open(boltPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { return } err = boltDb.Update(func(tx *bolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists([]byte(dataUpdatesBn)); err != nil { + return err + } if _, err := tx.CreateBucketIfNotExists([]byte(sourcesFwdBn)); err != nil { return err } @@ -96,37 +58,14 @@ func initDbBolt(boltPath string) (boltDb *bolt.DB, hubId string, err error) { return } -func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, err error) { - - var db *sql.DB - db, err = sql.Open("sqlite3", sqlitePath) - if err != nil { - return - } - - dbmap = &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} - // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds)) - - dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id") - - // TODO use some real migration, yadda yadda - if err = dbmap.CreateTablesIfNotExists(); err != nil { - return - } - - return -} - -func (s sqliteStore) insertDataUpdateEntry(srcId int, du *dataUpdateDb) (err error) { - du.SourceId = srcId - err = s.db.Insert(du) - if err != nil { - return - } - return +func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) { + du := NewDataUpdateDb(value) + cd := value.Data.Clients + src := NewSourceDb(value) + return du, cd, src } -func (s sqliteStore) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) { +func (s Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) { bf := tx.Bucket([]byte(sourcesFwdBn)) bf.FillPercent = 1.0 // we only do appends br := tx.Bucket([]byte(sourcesRevBn)) @@ -155,7 +94,12 @@ func (s sqliteStore) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err return srcId, err } -func (s sqliteStore) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) { +func (s Store) insertDataUpdateEntry(tx *bolt.Tx, srcId int, du *dataUpdateDb) (duId int, err error) { + // TODO: add me + return +} + +func (s Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) { bf := tx.Bucket([]byte(userAgentsFwdBn)) bf.FillPercent = 1.0 // we only do appends br := tx.Bucket([]byte(userAgentsRevBn)) @@ -178,7 +122,7 @@ func (s sqliteStore) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err e return uaId, err } -func (s sqliteStore) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error { +func (s Store) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []ClientData) error { if len(cd) == 0 { return nil } @@ -202,35 +146,26 @@ func (s sqliteStore) insertDataUpdateClientEntries(tx *bolt.Tx, duId int, cd []C return b.Put(itob(duId), jsonData) } -func (s sqliteStore) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) { +func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sourceDb) (err error) { var srcId int if srcId, err = s.insertNewSource(tx, src); err != nil { return } - if err = s.insertDataUpdateEntry(srcId, &du); err != nil { + var duId int + if duId, err = s.insertDataUpdateEntry(tx, srcId, &du); err != nil { return } - if err = s.insertDataUpdateClientEntries(tx, du.Id, cd); err != nil { + if err = s.insertDataUpdateClientEntries(tx, duId, cd); err != nil { return } return } -func (s sqliteStore) Append(update StatisticsData) (err error) { - return s.AppendMany([]StatisticsData{update}) -} - -func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) { - var tx *gorp.Transaction - tx, err = s.db.Begin() - if err != nil { - return - } - - err = s.dbBolt.Update(func(tx *bolt.Tx) error { +func (s Store) AppendMany(updates []StatisticsData) (err error) { + return s.db.Update(func(tx *bolt.Tx) error { for _, update := range updates { du, cd, src := updateFromStatisticsData(update) if err := s.appendItem(tx, du, cd, src); err != nil { @@ -239,16 +174,13 @@ func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) { } return nil }) +} - if err != nil { - tx.Rollback() - return - } - - return tx.Commit() +func (s Store) Append(update StatisticsData) (err error) { + return s.AppendMany([]StatisticsData{update}) } -func (s sqliteStore) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { +func (s Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { b := tx.Bucket([]byte(sourcesRevBn)) jsonData := b.Get(itob(id)) @@ -262,9 +194,9 @@ func (s sqliteStore) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { return } -func (s sqliteStore) GetSources() (res []SourceId, err error) { +func (s Store) GetSources() (res []SourceId, err error) { res = []SourceId{} - err = s.dbBolt.View(func(tx *bolt.Tx) error { + err = s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(sourcesRevBn)) c := b.Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { @@ -281,8 +213,8 @@ func (s sqliteStore) GetSources() (res []SourceId, err error) { return } -func (s sqliteStore) GetSource(id int) (res SourceId, err error) { - err = s.dbBolt.View(func(tx *bolt.Tx) error { +func (s Store) GetSource(id int) (res SourceId, err error) { + err = s.db.View(func(tx *bolt.Tx) error { src, err := s.getSource(tx, id) if err != nil { return err @@ -293,12 +225,12 @@ func (s sqliteStore) GetSource(id int) (res SourceId, err error) { return } -func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { - err = s.db.SelectOne(&res, "select * from "+dataUpdatesTn+" where Id = ?", id) +func (s Store) GetUpdate(id int) (res dataUpdateDb, err error) { + // TODO: implement me return } -func (s sqliteStore) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) { +func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err error) { bc := tx.Bucket([]byte(clientDataBn)) bu := tx.Bucket([]byte(userAgentsRevBn)) @@ -321,9 +253,9 @@ func (s sqliteStore) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData return } -func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (res StatisticsData, err error) { +func (s Store) CreateStatisticsDataFrom(tx *bolt.Tx, duId int, dat dataUpdateDb) (res StatisticsData, err error) { var clients []ClientData - if clients, err = s.getClientsByUpdateId(tx, dat.Id); err != nil { + if clients, err = s.getClientsByUpdateId(tx, duId); err != nil { return } var src sourceDb @@ -331,7 +263,7 @@ func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (re return } - res.CopyFromDataUpdateDb(dat, s.hubId) + res.CopyFromDataUpdateDb(dat, duId, s.hubId) res.Hostname = src.Hostname res.StreamId.ContentId = src.StreamId.ContentId res.StreamId.Format = src.StreamId.Format @@ -341,35 +273,24 @@ func (s sqliteStore) CreateStatisticsDataFrom(tx *bolt.Tx, dat dataUpdateDb) (re return } -func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) { - err = s.dbBolt.View(func(tx *bolt.Tx) error { - for i := range dat { - sd, err := s.CreateStatisticsDataFrom(tx, *dat[i].(*dataUpdateDb)) - if err != nil { - return err - } - res = append(res, sd) - } - return nil - }) - return -} - -func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { - parameters := make(map[string]interface{}) - sql := "SELECT * FROM " + dataUpdatesTn + " WHERE Id > :afterUpdateId limit :limit" - parameters["afterUpdateId"] = id - parameters["limit"] = 5000 // TODO: hardcoded value - var updates []interface{} - updates, err = s.db.Select(dataUpdateDb{}, sql, parameters) - s5tl.Printf("sql: %s", sql) - if err == nil { - res, _ = s.CreateStatisticsDatasFrom(updates) - } +func (s Store) GetUpdatesAfter(id int) (res []StatisticsData, err error) { + // err = s.db.View(func(tx *bolt.Tx) error { + // // TODO: iterate over ids + // duId := 1 + + // for i := range dat { + // sd, err := s.CreateStatisticsDataFrom(tx, duId, du) + // if err != nil { + // return err + // } + // res = append(res, sd) + // } + // return nil + // }) return } -func (s sqliteStore) GetUpdates() (res []StatisticsData, err error) { +func (s Store) GetUpdates() (res []StatisticsData, err error) { return s.GetUpdatesAfter(-1) } @@ -377,50 +298,50 @@ type lastUpdateQueryResult struct { MaxDataUpdateId *int } -func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) { - result := lastUpdateQueryResult{} - err = s.db.SelectOne( - &result, - "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?", - uuid) - if err == nil { - updateId = result.MaxDataUpdateId - } else { - s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err) - } +func (s Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) { + // TODO: implement me! + updateId = -1 + + // result := lastUpdateQueryResult{} + // err = s.db.SelectOne( + // &result, + // "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?", + // uuid) + // if err == nil { + // updateId = result.MaxDataUpdateId + // } else { + // s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err) + // } + // return return } -func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) { - result := lastUpdateQueryResult{} - err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn) - if err == nil { - updateId = result.MaxDataUpdateId - } else { - s5l.Printf("db: failed to find max DataUpdateId: %v", err) - } +func (s Store) GetLastUpdateId() (updateId int, err error) { + // TODO: implement me! + updateId = -1 + + // result := lastUpdateQueryResult{} + // err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn) + // if err == nil { + // updateId = result.MaxDataUpdateId + // } else { + // s5l.Printf("db: failed to find max DataUpdateId: %v", err) + // } return } -func (s sqliteStore) GetStoreId() string { +func (s Store) GetStoreId() string { return s.hubId } -func NewStore(sqlitePath, boltPath string) (sqliteStore, error) { - boltDb, hubid, err := initDbBolt(boltPath) - if err != nil { - return sqliteStore{}, err - } - - db, err := initDbSqlite(sqlitePath) +func NewStore(dbPath string) (Store, error) { + db, hubid, err := initDb(dbPath) if err != nil { - boltDb.Close() - return sqliteStore{}, err + return Store{}, err } - return sqliteStore{db, boltDb, hubid}, nil + return Store{hubid, db}, nil } -func (s sqliteStore) Close() { - s.db.Db.Close() - s.dbBolt.Close() +func (s Store) Close() { + s.db.Close() } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index fe43bf5..38e7bf7 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -9,8 +9,7 @@ import ( ) var ( - __sqlitePath = "file:memdb1?mode=memory&cache=shared" - __boltPath = "/run/s5hub_testing_db.bolt" + __boltPath = "/run/s5hub_testing_db.bolt" ) func TestMain(m *testing.M) { @@ -24,7 +23,7 @@ func TestMain(m *testing.M) { func TestAppend(t *testing.T) { os.Remove(__boltPath) - store, err := NewStore(__sqlitePath, __boltPath) + store, err := NewStore(__boltPath) if err != nil { t.Errorf("Failed to initialize: %v", err) return @@ -35,7 +34,7 @@ func TestAppend(t *testing.T) { update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000} streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} - dat := StatisticsData{nil, nil, source, update} + dat := StatisticsData{"", -1, source, update} err = store.Append(dat) if err != nil { @@ -46,7 +45,7 @@ func TestAppend(t *testing.T) { func TestGetUpdatesAfter(t *testing.T) { os.Remove(__boltPath) - store, err := NewStore(__sqlitePath, __boltPath) + store, err := NewStore(__boltPath) if err != nil { t.Errorf("Failed to initialize: %v", err) return @@ -57,7 +56,7 @@ func TestGetUpdatesAfter(t *testing.T) { update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000} streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} - dat := StatisticsData{nil, nil, source, update} + dat := StatisticsData{"", -1, source, update} err = store.Append(dat) if err != nil { @@ -122,7 +121,7 @@ func generateStatisticsData(n int) (data []StatisticsData) { func BenchmarkAppendMany(b *testing.B) { os.Remove(__boltPath) - store, err := NewStore(__sqlitePath, __boltPath) + store, err := NewStore(__boltPath) if err != nil { b.Errorf("Failed to initialize: %v", err) } @@ -138,7 +137,7 @@ func BenchmarkAppendMany(b *testing.B) { func BenchmarkGetUpdatesAfter(b *testing.B) { os.Remove(__boltPath) - store, err := NewStore(__sqlitePath, __boltPath) + store, err := NewStore(__boltPath) if err != nil { b.Errorf("Failed to initialize: %v", err) } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 525b6d3..ad6deaf 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -41,8 +41,8 @@ type DataUpdate struct { } type StatisticsData struct { - SourceHubUuid *string - SourceHubDataUpdateId *int + SourceHubUuid string `json:"SourceHubUuid,omitempty"` + SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"` SourceId DataUpdate } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index fd21337..d6de68e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -12,15 +12,10 @@ var ( ErrNotFound = errors.New("not found") ) -// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections -// this is very much not normalized at all, because I'm too lazy to type - const ( - // sqlite table names - dataUpdatesTn = "DataUpdates" - - // bolt bucket names + // bucket names hubInfoBn = "HubInfo" + dataUpdatesBn = "DataUpdates" sourcesFwdBn = "SourcesFwd" sourcesRevBn = "SourcesRev" clientDataBn = "ClientData" @@ -44,6 +39,18 @@ type sourceDb struct { Tags []string `json:"t"` } +func NewSourceDb(value StatisticsData) sourceDb { + return sourceDb{ + Hostname: value.SourceId.Hostname, + StreamId: streamIdDb{ + ContentId: value.SourceId.StreamId.ContentId, + Format: value.SourceId.StreamId.Format, + Quality: value.SourceId.StreamId.Quality, + }, + Tags: value.SourceId.Tags, + } +} + func (s sourceDb) String() string { return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ",")) } @@ -66,30 +73,39 @@ type clientDataDb struct { // stored in dataUpdatesTn // in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs type dataUpdateDb struct { - Id int - SourceId int // foreign key to sourcesTn - SourceHubUuid *string - SourceHubDataUpdateId *int - StartTime int64 // time.Time - Duration int64 // duration in milliseconds - ClientCount uint - BytesReceived uint - BytesSent uint + SourceHubUuid string `json:"h,omitempty"` + SourceHubDataUpdateId int `json:"hi,omitempty"` + SourceId int `json:"si"` + StartTime int64 `json:"st"` // unix timestamp in milliseconds + Duration int64 `json:"du"` // duration in milliseconds + ClientCount uint `json:"cc"` + BytesReceived uint `json:"br"` + BytesSent uint `json:"bs"` } -func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, hubId string) { - if v.SourceHubUuid == nil { - s.SourceHubUuid = &hubId - } else { - s.SourceHubUuid = v.SourceHubUuid +func NewDataUpdateDb(v StatisticsData) dataUpdateDb { + return dataUpdateDb{ + v.SourceHubUuid, + v.SourceHubDataUpdateId, + -1, + int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000), + v.Duration, + v.Data.ClientCount, + v.Data.BytesReceived, + v.Data.BytesSent, } - if v.SourceHubDataUpdateId == nil { - s.SourceHubDataUpdateId = &v.Id +} + +func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) { + if v.SourceHubUuid == "" { + s.SourceHubUuid = hubId + s.SourceHubDataUpdateId = vId } else { + s.SourceHubUuid = v.SourceHubUuid s.SourceHubDataUpdateId = v.SourceHubDataUpdateId } - s.StartTime = time.Unix(v.StartTime, 0) + s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000) s.Duration = v.Duration s.Data.ClientCount = v.ClientCount s.Data.BytesReceived = v.BytesReceived diff --git a/src/hub/test-bolt b/src/hub/test-bolt index 7ac3dd2..9a26b04 100755 --- a/src/hub/test-bolt +++ b/src/hub/test-bolt @@ -1,6 +1,7 @@ #!/bin/sh TEST_D="./test" +TEST_DB="$TEST_D/db.bolt" BIN="$(go env GOPATH)/bin/bolt" if [ ! -x "$BIN" ]; then @@ -11,4 +12,4 @@ if [ ! -x "$BIN" ]; then exit 1 fi -exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_D/db.bolt" +exec "$(go env GOPATH)/bin/bolt" $@ "$TEST_DB" diff --git a/src/hub/test-bolter b/src/hub/test-bolter index dc2e98d..9093d51 100755 --- a/src/hub/test-bolter +++ b/src/hub/test-bolter @@ -1,6 +1,7 @@ #!/bin/sh TEST_D="./test" +TEST_DB="$TEST_D/db.bolt" BIN="$(go env GOPATH)/bin/bolter" if [ ! -x "$BIN" ]; then @@ -11,4 +12,4 @@ if [ ! -x "$BIN" ]; then exit 1 fi -exec $BIN --file "$TEST_D/db.bolt" $@ +exec $BIN --file "$TEST_DB" $@ diff --git a/src/hub/test-fwd b/src/hub/test-fwd index eff7605..ce6e0e3 100755 --- a/src/hub/test-fwd +++ b/src/hub/test-fwd @@ -1,5 +1,6 @@ #!/bin/sh TEST_D="./test" +TEST_DB="$TEST_D/db.bolt" -exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000" diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es index dc7979f..489fcdd 100755 --- a/src/hub/test-fwd-es +++ b/src/hub/test-fwd-es @@ -1,5 +1,6 @@ #!/bin/sh TEST_D="./test" +TEST_DB="$TEST_D/db.bolt" -exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/e14" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing" diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik index b6ac640..6143c70 100755 --- a/src/hub/test-fwd-piwik +++ b/src/hub/test-fwd-piwik @@ -1,5 +1,6 @@ #!/bin/sh TEST_D="./test" +TEST_DB="$TEST_D/db.bolt" -exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at" diff --git a/src/hub/test-sqlite b/src/hub/test-sqlite deleted file mode 100755 index 1c2dcb1..0000000 --- a/src/hub/test-sqlite +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh - -TEST_D="./test" - -exec sqlite3 "$TEST_D/db.sqlite" diff --git a/src/hub/test-srv b/src/hub/test-srv index 37d5897..803ac37 100755 --- a/src/hub/test-srv +++ b/src/hub/test-srv @@ -1,7 +1,8 @@ #!/bin/sh TEST_D="./test" +TEST_DB="$TEST_D/db.bolt" mkdir -p "$TEST_D" rm -f "$TEST_D/pipe" "$TEST_D/pipegram" -exec ./bin/sfive-hub -db "$TEST_D" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000" -- cgit v1.2.3