summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-23 02:36:44 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-23 02:36:44 +0200
commitee795c82891f2554bfbed4f485b8883292910d6a (patch)
tree8101de0205da8b99121f17072639c66a9a769edd /src
parentdrop no needed mysql support (diff)
moved client data to bolt db
Diffstat (limited to 'src')
-rw-r--r--src/hub/Makefile1
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go10
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go107
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go32
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go35
6 files changed, 108 insertions, 79 deletions
diff --git a/src/hub/Makefile b/src/hub/Makefile
index d03dc7d..43b4d9d 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -40,6 +40,7 @@ EXECUTEABLE := sfive-hub
LIBS := "gopkg.in/gorp.v2" \
"github.com/mattn/go-sqlite3" \
+ "github.com/boltdb/bolt" \
"github.com/zenazn/goji" \
"github.com/pborman/uuid" \
"github.com/equinox0815/graphite-golang"
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index ccdd4e0..7ff58b5 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -12,7 +12,7 @@ import (
var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags)
func main() {
- db := flag.String("db", "/var/lib/sfive/db.sqlite", "path to the sqlite3 database file")
+ db := flag.String("db", "/var/lib/sfive/", "directory to store the database files")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 8f9093d..3dbd6e7 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -1,6 +1,9 @@
package sfive
-import "time"
+import (
+ "path/filepath"
+ "time"
+)
type appendManyToken struct {
data []StatisticsData
@@ -170,8 +173,11 @@ func (self StatsSinkServer) Close() {
func NewServer(dbPath string) (server *StatsSinkServer, err error) {
// TODO read configuration and create instance with correct settings
+ sqlitePath := filepath.Join(dbPath, "db.sqlite")
+ boltPath := filepath.Join(dbPath, "db.bolt")
+
server = new(StatsSinkServer)
- server.store, err = NewStore(dbPath)
+ server.store, err = NewStore(sqlitePath, boltPath)
if err != nil {
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 8611661..a3bd8ce 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -2,17 +2,21 @@ package sfive
import (
"database/sql"
+ "encoding/binary"
+ "encoding/json"
"fmt"
"time"
+ "github.com/boltdb/bolt"
_ "github.com/mattn/go-sqlite3"
"github.com/pborman/uuid"
"gopkg.in/gorp.v2"
)
type sqliteStore struct {
- db *gorp.DbMap
- hubId string
+ db *gorp.DbMap
+ dbBolt *bolt.DB
+ hubId string
}
func tagsFromStatisticsData(value StatisticsData) []tagDb {
@@ -36,14 +40,6 @@ func sourceFromStatisticsData(value StatisticsData) sourceDb {
}
}
-func clientsFromStatisticsData(value StatisticsData) []clientDataDb {
- res := make([]clientDataDb, len(value.Data.Clients))
- for i := range value.Data.Clients {
- res[i] = clientDataDb{-1, -1, value.Data.Clients[i]}
- }
- return res
-}
-
func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
return dataUpdateDb{
-1,
@@ -57,37 +53,48 @@ func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
value.Data.BytesSent}
}
-func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) {
+func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb, []tagDb) {
du := dataUpdateFromStatisticsData(value)
- cd := clientsFromStatisticsData(value)
+ cd := value.Data.Clients
src := sourceFromStatisticsData(value)
tags := tagsFromStatisticsData(value)
return du, cd, src, tags
}
-func initDb(path string) (res *gorp.DbMap, hubId string, err error) {
- // connect to db using standard Go database/sql API
- var db *sql.DB
+func initDbBolt(boltPath string) (boltDb *bolt.DB, err error) {
+ boltDb, err = bolt.Open(boltPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
+ if err != nil {
+ return
+ }
- db, err = sql.Open("sqlite3", path)
+ err = boltDb.Update(func(tx *bolt.Tx) error {
+ _, err := tx.CreateBucketIfNotExists([]byte(clientDataBn))
+ return err
+ })
+
+ return
+}
+
+func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, hubId string, err error) {
+
+ var db *sql.DB
+ db, err = sql.Open("sqlite3", sqlitePath)
if err != nil {
return
}
- dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}}
+ dbmap = &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}}
// dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds))
dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true)
dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId")
dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname")
- dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id")
dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id")
dbmap.AddTableWithName(hubInfoDb{}, hubInfoTn).SetKeys(false, "Name")
// TODO use some real migration, yadda yadda
- err = dbmap.CreateTablesIfNotExists()
- if err != nil {
+ if err = dbmap.CreateTablesIfNotExists(); err != nil {
return
}
@@ -102,7 +109,6 @@ func initDb(path string) (res *gorp.DbMap, hubId string, err error) {
}
}
- res = dbmap
return
}
@@ -274,19 +280,25 @@ func (s sqliteStore) insertDataUpdateEntry(src sourceDb, du *dataUpdateDb) (err
return
}
-func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpdateDb) (err error) {
- for i := range cd {
- cd[i].DataUpdatesId = du.Id
- err = s.db.Insert(&cd[i])
+// itob returns an 8-byte big endian representation of v.
+func itob(v int) []byte {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(v))
+ return b
+}
+
+func (s sqliteStore) insertDataUpdateClientEntries(cd []ClientData, du dataUpdateDb) error {
+ return s.dbBolt.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(clientDataBn))
+ jsonData, err := json.Marshal(cd)
if err != nil {
- // TODO
- return
+ return err
}
- }
- return
+ return b.Put(itob(du.Id), jsonData)
+ })
}
-func (s sqliteStore) appendItem(du dataUpdateDb, cd []clientDataDb, src sourceDb, tags []tagDb) (err error) {
+func (s sqliteStore) appendItem(du dataUpdateDb, cd []ClientData, src sourceDb, tags []tagDb) (err error) {
err = s.insertNewTags(tags)
if err != nil {
return
@@ -407,15 +419,13 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
return
}
-func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) {
- var qres []interface{}
- qres, err = s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id)
- if err == nil {
- res = make([]clientDataDb, len(qres))
- for i := range qres {
- res[i] = *qres[i].(*clientDataDb)
- }
- }
+func (s sqliteStore) GetClientsByUpdateId(id int) (res []ClientData, err error) {
+ err = s.dbBolt.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(clientDataBn))
+
+ jsonData := b.Get(itob(id))
+ return json.Unmarshal(jsonData, &res)
+ })
return
}
@@ -437,8 +447,8 @@ var (
)
func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) {
- var clientsDb []clientDataDb
- clientsDb, err = s.GetClientsByUpdateId(dat.Id)
+ var clients []ClientData
+ clients, err = s.GetClientsByUpdateId(dat.Id)
if err != nil {
s5l.Printf("store GetClients failed: %v", err)
return
@@ -453,7 +463,7 @@ func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res St
res.StreamId.ContentId = dat.ContentId
res.StreamId.Format = dat.Format
res.StreamId.Quality = dat.Quality
- res.CopyFromClientDataDb(clientsDb)
+ res.Data.Clients = clients
res.Tags = tagsDb
return
}
@@ -620,14 +630,21 @@ func (s sqliteStore) GetStoreId() (uuid string, err error) {
return
}
-func NewStore(path string) (sqliteStore, error) {
- db, hubid, err := initDb(path)
+func NewStore(sqlitePath, boltPath string) (sqliteStore, error) {
+ boltDb, err := initDbBolt(boltPath)
+ if err != nil {
+ return sqliteStore{}, err
+ }
+
+ db, hubid, err := initDbSqlite(sqlitePath)
if err != nil {
+ boltDb.Close()
return sqliteStore{}, err
}
- return sqliteStore{db, hubid}, nil
+ return sqliteStore{db, boltDb, hubid}, nil
}
func (s sqliteStore) Close() {
s.db.Db.Close()
+ s.dbBolt.Close()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 6c9e302..74721c4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -1,12 +1,30 @@
package sfive
import (
+ "fmt"
+ "os"
+ "os/user"
"testing"
"time"
)
+var (
+ __sqlitePath = "file:memdb1?mode=memory&cache=shared"
+ __boltPath = "/run/s5hub_testing_db.bolt"
+)
+
+func TestMain(m *testing.M) {
+ u, err := user.Current()
+ if err != nil {
+ os.Exit(-1)
+ }
+ __boltPath = fmt.Sprintf("/run/user/%s/s5hub_testing_db.bolt", u.Uid)
+ os.Exit(m.Run())
+}
+
func TestAppend(t *testing.T) {
- store, err := NewStore("file:memdb1?mode=memory&cache=shared")
+ os.Remove(__boltPath)
+ store, err := NewStore(__sqlitePath, __boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
return
@@ -53,7 +71,8 @@ func TestAppend(t *testing.T) {
}
func TestCount(t *testing.T) {
- store, err := NewStore("file:memdb1?mode=memory&cache=shared")
+ os.Remove(__boltPath)
+ store, err := NewStore(__sqlitePath, __boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
}
@@ -67,7 +86,8 @@ func TestCount(t *testing.T) {
}
func TestGetUpdatesAfter(t *testing.T) {
- store, err := NewStore("file:memdb1?mode=memory&cache=shared")
+ os.Remove(__boltPath)
+ store, err := NewStore(__sqlitePath, __boltPath)
if err != nil {
t.Errorf("Failed to initialize: %v", err)
return
@@ -142,7 +162,8 @@ func generateStatisticsData(n int) (data []StatisticsData) {
}
func BenchmarkAppendMany(b *testing.B) {
- store, err := NewStore("file:memdb1?mode=memory&cache=shared")
+ os.Remove(__boltPath)
+ store, err := NewStore(__sqlitePath, __boltPath)
if err != nil {
b.Errorf("Failed to initialize: %v", err)
}
@@ -157,7 +178,8 @@ func BenchmarkAppendMany(b *testing.B) {
}
func BenchmarkGetUpdatesAfter(b *testing.B) {
- store, err := NewStore("file:memdb1?mode=memory&cache=shared")
+ os.Remove(__boltPath)
+ store, err := NewStore(__sqlitePath, __boltPath)
if err != nil {
b.Errorf("Failed to initialize: %v", err)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index f06e953..19c9404 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -9,12 +9,13 @@ import (
// table names
const (
- tagsTn = "Tags"
- sourceTagsTn = "StreamToTagMap"
- sourcesTn = "Sources"
- clientdataUpdatesTn = "ClientDataUpdates"
- dataUpdatesTn = "DataUpdates"
- hubInfoTn = "HubInfo"
+ tagsTn = "Tags"
+ sourceTagsTn = "StreamToTagMap"
+ sourcesTn = "Sources"
+ dataUpdatesTn = "DataUpdates"
+ hubInfoTn = "HubInfo"
+
+ clientDataBn = "ClientData"
)
type hubInfoDb struct {
@@ -42,14 +43,6 @@ type sourceDb struct {
SourceId
}
-// stored in clientdataUpdatesTn
-// ClientData n:1 DataUpdate
-type clientDataDb struct {
- Id int
- DataUpdatesId int // foreign key to dataUpdatesTn
- ClientData
-}
-
// stored in dataUpdatesTn
// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
type dataUpdateDb struct {
@@ -99,22 +92,12 @@ func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId strin
self.Data.BytesSent = value.BytesSent
}
-func (self *StatisticsData) CopyFromClientDataDb(values []clientDataDb) {
- clients := make([]ClientData, len(values))
- for i := range values {
- clients[i].Ip = values[i].Ip
- clients[i].UserAgent = values[i].UserAgent
- clients[i].BytesSent = values[i].BytesSent
- }
- self.Data.Clients = clients
-}
-
func cvtToApiStatisticsData(
- hubId string, source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData {
+ hubId string, source sourceDb, update dataUpdateDb, clients []ClientData, tags []tagDb) StatisticsData {
res := StatisticsData{}
res.CopyFromSourceDb(source)
res.CopyFromDataUpdateDb(update, hubId)
- res.CopyFromClientDataDb(clients)
+ res.Data.Clients = clients
res.CopyFromTagsDb(tags)
return res
}