From ee795c82891f2554bfbed4f485b8883292910d6a Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 23 Apr 2017 02:36:44 +0200 Subject: moved client data to bolt db --- src/hub/Makefile | 1 + src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 2 +- src/hub/src/spreadspace.org/sfive/s5srv.go | 10 +- src/hub/src/spreadspace.org/sfive/s5store.go | 107 +++++++++++++--------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 32 ++++++- src/hub/src/spreadspace.org/sfive/s5typesStore.go | 35 ++----- 6 files changed, 108 insertions(+), 79 deletions(-) (limited to 'src/hub') diff --git a/src/hub/Makefile b/src/hub/Makefile index d03dc7d..43b4d9d 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -40,6 +40,7 @@ EXECUTEABLE := sfive-hub LIBS := "gopkg.in/gorp.v2" \ "github.com/mattn/go-sqlite3" \ + "github.com/boltdb/bolt" \ "github.com/zenazn/goji" \ "github.com/pborman/uuid" \ "github.com/equinox0815/graphite-golang" diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index ccdd4e0..7ff58b5 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -12,7 +12,7 @@ import ( var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags) func main() { - db := flag.String("db", "/var/lib/sfive/db.sqlite", "path to the sqlite3 database file") + db := flag.String("db", "/var/lib/sfive/", "directory to store the database files") pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver") ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe") diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 8f9093d..3dbd6e7 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -1,6 +1,9 @@ package sfive -import "time" +import ( + "path/filepath" + "time" +) type appendManyToken struct { data []StatisticsData @@ -170,8 +173,11 @@ func (self StatsSinkServer) Close() { func NewServer(dbPath string) (server *StatsSinkServer, err error) { // TODO read configuration and create instance with correct settings + sqlitePath := filepath.Join(dbPath, "db.sqlite") + boltPath := filepath.Join(dbPath, "db.bolt") + server = new(StatsSinkServer) - server.store, err = NewStore(dbPath) + server.store, err = NewStore(sqlitePath, boltPath) if err != nil { return } diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 8611661..a3bd8ce 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -2,17 +2,21 @@ package sfive import ( "database/sql" + "encoding/binary" + "encoding/json" "fmt" "time" + "github.com/boltdb/bolt" _ "github.com/mattn/go-sqlite3" "github.com/pborman/uuid" "gopkg.in/gorp.v2" ) type sqliteStore struct { - db *gorp.DbMap - hubId string + db *gorp.DbMap + dbBolt *bolt.DB + hubId string } func tagsFromStatisticsData(value StatisticsData) []tagDb { @@ -36,14 +40,6 @@ func sourceFromStatisticsData(value StatisticsData) sourceDb { } } -func clientsFromStatisticsData(value StatisticsData) []clientDataDb { - res := make([]clientDataDb, len(value.Data.Clients)) - for i := range value.Data.Clients { - res[i] = clientDataDb{-1, -1, value.Data.Clients[i]} - } - return res -} - func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { return dataUpdateDb{ -1, @@ -57,37 +53,48 @@ func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { value.Data.BytesSent} } -func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) { +func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb, []tagDb) { du := dataUpdateFromStatisticsData(value) - cd := clientsFromStatisticsData(value) + cd := value.Data.Clients src := sourceFromStatisticsData(value) tags := tagsFromStatisticsData(value) return du, cd, src, tags } -func initDb(path string) (res *gorp.DbMap, hubId string, err error) { - // connect to db using standard Go database/sql API - var db *sql.DB +func initDbBolt(boltPath string) (boltDb *bolt.DB, err error) { + boltDb, err = bolt.Open(boltPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return + } - db, err = sql.Open("sqlite3", path) + err = boltDb.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(clientDataBn)) + return err + }) + + return +} + +func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, hubId string, err error) { + + var db *sql.DB + db, err = sql.Open("sqlite3", sqlitePath) if err != nil { return } - dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} + dbmap = &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds)) dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId") dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname") - dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id") dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id") dbmap.AddTableWithName(hubInfoDb{}, hubInfoTn).SetKeys(false, "Name") // TODO use some real migration, yadda yadda - err = dbmap.CreateTablesIfNotExists() - if err != nil { + if err = dbmap.CreateTablesIfNotExists(); err != nil { return } @@ -102,7 +109,6 @@ func initDb(path string) (res *gorp.DbMap, hubId string, err error) { } } - res = dbmap return } @@ -274,19 +280,25 @@ func (s sqliteStore) insertDataUpdateEntry(src sourceDb, du *dataUpdateDb) (err return } -func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpdateDb) (err error) { - for i := range cd { - cd[i].DataUpdatesId = du.Id - err = s.db.Insert(&cd[i]) +// itob returns an 8-byte big endian representation of v. +func itob(v int) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(v)) + return b +} + +func (s sqliteStore) insertDataUpdateClientEntries(cd []ClientData, du dataUpdateDb) error { + return s.dbBolt.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(clientDataBn)) + jsonData, err := json.Marshal(cd) if err != nil { - // TODO - return + return err } - } - return + return b.Put(itob(du.Id), jsonData) + }) } -func (s sqliteStore) appendItem(du dataUpdateDb, cd []clientDataDb, src sourceDb, tags []tagDb) (err error) { +func (s sqliteStore) appendItem(du dataUpdateDb, cd []ClientData, src sourceDb, tags []tagDb) (err error) { err = s.insertNewTags(tags) if err != nil { return @@ -407,15 +419,13 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { return } -func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) { - var qres []interface{} - qres, err = s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id) - if err == nil { - res = make([]clientDataDb, len(qres)) - for i := range qres { - res[i] = *qres[i].(*clientDataDb) - } - } +func (s sqliteStore) GetClientsByUpdateId(id int) (res []ClientData, err error) { + err = s.dbBolt.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(clientDataBn)) + + jsonData := b.Get(itob(id)) + return json.Unmarshal(jsonData, &res) + }) return } @@ -437,8 +447,8 @@ var ( ) func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) { - var clientsDb []clientDataDb - clientsDb, err = s.GetClientsByUpdateId(dat.Id) + var clients []ClientData + clients, err = s.GetClientsByUpdateId(dat.Id) if err != nil { s5l.Printf("store GetClients failed: %v", err) return @@ -453,7 +463,7 @@ func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res St res.StreamId.ContentId = dat.ContentId res.StreamId.Format = dat.Format res.StreamId.Quality = dat.Quality - res.CopyFromClientDataDb(clientsDb) + res.Data.Clients = clients res.Tags = tagsDb return } @@ -620,14 +630,21 @@ func (s sqliteStore) GetStoreId() (uuid string, err error) { return } -func NewStore(path string) (sqliteStore, error) { - db, hubid, err := initDb(path) +func NewStore(sqlitePath, boltPath string) (sqliteStore, error) { + boltDb, err := initDbBolt(boltPath) + if err != nil { + return sqliteStore{}, err + } + + db, hubid, err := initDbSqlite(sqlitePath) if err != nil { + boltDb.Close() return sqliteStore{}, err } - return sqliteStore{db, hubid}, nil + return sqliteStore{db, boltDb, hubid}, nil } func (s sqliteStore) Close() { s.db.Db.Close() + s.dbBolt.Close() } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 6c9e302..74721c4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -1,12 +1,30 @@ package sfive import ( + "fmt" + "os" + "os/user" "testing" "time" ) +var ( + __sqlitePath = "file:memdb1?mode=memory&cache=shared" + __boltPath = "/run/s5hub_testing_db.bolt" +) + +func TestMain(m *testing.M) { + u, err := user.Current() + if err != nil { + os.Exit(-1) + } + __boltPath = fmt.Sprintf("/run/user/%s/s5hub_testing_db.bolt", u.Uid) + os.Exit(m.Run()) +} + func TestAppend(t *testing.T) { - store, err := NewStore("file:memdb1?mode=memory&cache=shared") + os.Remove(__boltPath) + store, err := NewStore(__sqlitePath, __boltPath) if err != nil { t.Errorf("Failed to initialize: %v", err) return @@ -53,7 +71,8 @@ func TestAppend(t *testing.T) { } func TestCount(t *testing.T) { - store, err := NewStore("file:memdb1?mode=memory&cache=shared") + os.Remove(__boltPath) + store, err := NewStore(__sqlitePath, __boltPath) if err != nil { t.Errorf("Failed to initialize: %v", err) } @@ -67,7 +86,8 @@ func TestCount(t *testing.T) { } func TestGetUpdatesAfter(t *testing.T) { - store, err := NewStore("file:memdb1?mode=memory&cache=shared") + os.Remove(__boltPath) + store, err := NewStore(__sqlitePath, __boltPath) if err != nil { t.Errorf("Failed to initialize: %v", err) return @@ -142,7 +162,8 @@ func generateStatisticsData(n int) (data []StatisticsData) { } func BenchmarkAppendMany(b *testing.B) { - store, err := NewStore("file:memdb1?mode=memory&cache=shared") + os.Remove(__boltPath) + store, err := NewStore(__sqlitePath, __boltPath) if err != nil { b.Errorf("Failed to initialize: %v", err) } @@ -157,7 +178,8 @@ func BenchmarkAppendMany(b *testing.B) { } func BenchmarkGetUpdatesAfter(b *testing.B) { - store, err := NewStore("file:memdb1?mode=memory&cache=shared") + os.Remove(__boltPath) + store, err := NewStore(__sqlitePath, __boltPath) if err != nil { b.Errorf("Failed to initialize: %v", err) } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index f06e953..19c9404 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -9,12 +9,13 @@ import ( // table names const ( - tagsTn = "Tags" - sourceTagsTn = "StreamToTagMap" - sourcesTn = "Sources" - clientdataUpdatesTn = "ClientDataUpdates" - dataUpdatesTn = "DataUpdates" - hubInfoTn = "HubInfo" + tagsTn = "Tags" + sourceTagsTn = "StreamToTagMap" + sourcesTn = "Sources" + dataUpdatesTn = "DataUpdates" + hubInfoTn = "HubInfo" + + clientDataBn = "ClientData" ) type hubInfoDb struct { @@ -42,14 +43,6 @@ type sourceDb struct { SourceId } -// stored in clientdataUpdatesTn -// ClientData n:1 DataUpdate -type clientDataDb struct { - Id int - DataUpdatesId int // foreign key to dataUpdatesTn - ClientData -} - // stored in dataUpdatesTn // in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs type dataUpdateDb struct { @@ -99,22 +92,12 @@ func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId strin self.Data.BytesSent = value.BytesSent } -func (self *StatisticsData) CopyFromClientDataDb(values []clientDataDb) { - clients := make([]ClientData, len(values)) - for i := range values { - clients[i].Ip = values[i].Ip - clients[i].UserAgent = values[i].UserAgent - clients[i].BytesSent = values[i].BytesSent - } - self.Data.Clients = clients -} - func cvtToApiStatisticsData( - hubId string, source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData { + hubId string, source sourceDb, update dataUpdateDb, clients []ClientData, tags []tagDb) StatisticsData { res := StatisticsData{} res.CopyFromSourceDb(source) res.CopyFromDataUpdateDb(update, hubId) - res.CopyFromClientDataDb(clients) + res.Data.Clients = clients res.CopyFromTagsDb(tags) return res } -- cgit v1.2.3