summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-19 16:41:56 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-19 16:41:56 +0200
commit721b96afe0aea0ad39d20d9c4873ae6b5469d693 (patch)
treee4f6d586ac62d7fd869bc96c1895c0c231a1e0fb /src
parenthub: change ClientData field BytesTransferred (diff)
hub: refactor server and db initialization
* pass the paths to DB and pipe as arguments * move the connection to sqlite DB into the constructor of the server * return error instead of panicking when initializing DB fails * add an Actor for appending data to the sqlite store, start it in the server constructor * add a somewhat proper Close for the server * split s5srv.go, extracting pipe-specific code * make more constants etc. private * remove unnecessary StatsContainer interface * use an explicit s5 logger with prefix * use an in-memory sqlite db for unit tests
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go17
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5log.go8
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go87
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go56
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go106
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go8
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5types.go2
9 files changed, 159 insertions, 133 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 370e209..c1802eb 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -4,13 +4,24 @@ import (
"fmt"
"log"
"spreadspace.org/sfive"
+ "sync"
)
func main() {
fmt.Printf("s5: Hello, world.\n")
- server, err := sfive.NewSfiveServer()
+ server, err := sfive.NewServer("/tmp/sfive.sqlite")
if err != nil {
- log.Fatalf("failed to start S5: %v", err)
+ log.Fatalf("failed to initialize S5: %v", err)
}
- server.ListenAndServe()
+ defer server.Close()
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ server.ServePipe("/run/sfive/pipe")
+ }()
+
+ wg.Wait()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index e908bc6..330e2c1 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -49,7 +49,7 @@ func NewFilterDecoder() (decoder FilterDecoder) {
}
func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) {
- dat.CopyFrom(&self.sourceId)
+ dat.CopyFromSourceId(&self.sourceId)
err = json.Unmarshal(jsonString, &dat)
// like in PlainDecoder, let the client decide how to use partial results
// (Unmarshal returns partial results in case of errors)
@@ -64,7 +64,7 @@ func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err err
func (self *PlainEncoder) Encode(data *StatisticsData) []byte {
res, err := json.Marshal(data)
if err != nil {
- panic("oh fuck I cannot event marshal my own data")
+ s5l.Panicln("failed to encode StatisticsData")
}
return res
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 2326911..32f35dd 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -18,7 +18,7 @@ var (
func GetExpected() *StatisticsData {
expected := new(StatisticsData)
- expected.CopyFrom(&sourceIdDataStruct)
+ expected.CopyFromSourceId(&sourceIdDataStruct)
expected.CopyFromUpdate(&updateDataStruct)
return expected
}
@@ -56,7 +56,7 @@ func TestDecodePlain(t *testing.T) {
func TestEncode(t *testing.T) {
ec := new(PlainEncoder)
td := new(StatisticsData)
- td.CopyFrom(&sourceIdDataStruct)
+ td.CopyFromSourceId(&sourceIdDataStruct)
td.CopyFromUpdate(&updateDataStruct)
t.Logf("dada: %v", ec.Encode(td))
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go
new file mode 100644
index 0000000..fea0b10
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5log.go
@@ -0,0 +1,8 @@
+package sfive
+
+import (
+ "log"
+ "os"
+)
+
+var s5l = log.New(os.Stderr, "s5", log.LstdFlags)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 8bd7904..7eab0eb 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -1,77 +1,46 @@
package sfive
-import (
- "bufio"
- "fmt"
- "net"
-)
-
type StatsSinkServer struct {
+ store sqliteStore
+ quit chan bool
+ done chan bool
+ appendData chan StatisticsData
}
-func handleConnection(store StatsContainer, conn net.Conn) {
- reader := bufio.NewReader(conn)
- buffer, err := reader.ReadBytes('\n')
- if err != nil {
- fmt.Printf("s5 failed to read from connection: %v\n", err)
- return
- }
- marshaller, err := NewStatefulDecoder(buffer)
- if err != nil {
- fmt.Printf("s5 failed initializing decoder with init message: %v\n", err)
- return
- }
-
+func (self StatsSinkServer) appendActor() {
+ defer func() { self.done <- true }()
for {
- buffer, err := reader.ReadBytes('\n')
- if err != nil {
- fmt.Printf("s5 failed to read from connection: %v\n", err)
- return
- }
-
- fmt.Printf("s5 msg: %v", string(buffer))
-
- value, err := marshaller.Decode(buffer)
- if err != nil {
- fmt.Printf("s5 failed to decode message: %v\n", err)
- continue
- }
-
- err = store.Append(value)
- if err != nil {
- fmt.Printf("s5 failed to store data: %v\n", err)
+ select {
+ case <-self.quit:
return
+ case value := <-self.appendData:
+ err := self.store.Append(value)
+ if err != nil {
+ s5l.Printf("failed to store data: %v\n", err)
+ }
}
}
}
-func (self StatsSinkServer) ListenAndServe() {
- store, err := NewStore()
- if err != nil {
- fmt.Printf("s5 failed to connect to persistence layer: %v\n", err)
- return
- }
- defer store.Close()
+func (self StatsSinkServer) Close() {
+ self.quit <- true
+ <-self.done
+ close(self.quit)
+ close(self.done)
+ close(self.appendData)
+ self.store.Close()
+}
- ln, err := net.Listen("unix", "/run/sfive/receiver")
+func NewServer(dbPath string) (server *StatsSinkServer, err error) {
+ // TODO read configuration and create instance with correct settings
+ server = new(StatsSinkServer)
+ server.store, err = NewStore(dbPath)
if err != nil {
- fmt.Printf("s5 failed to connect to unix pipe: %v", err)
return
}
- defer ln.Close()
- for {
- conn, err := ln.Accept()
- if err != nil {
- // ignore
- continue
- }
- go handleConnection(store, conn)
- }
-}
-
-func NewSfiveServer() (server *StatsSinkServer, err error) {
- // TODO read configuration and create instance with correct settings
- server = new(StatsSinkServer)
+ server.quit = make(chan bool)
+ server.done = make(chan bool)
+ server.appendData = make(chan StatisticsData)
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
new file mode 100644
index 0000000..e024e2d
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -0,0 +1,56 @@
+package sfive
+
+import (
+ "bufio"
+ "net"
+)
+
+func (self StatsSinkServer) handleConnection(conn net.Conn) {
+ reader := bufio.NewReader(conn)
+ buffer, err := reader.ReadBytes('\n')
+ if err != nil {
+ s5l.Printf("failed to read from connection: %v\n", err)
+ return
+ }
+ marshaller, err := NewStatefulDecoder(buffer)
+ if err != nil {
+ s5l.Printf("failed initializing decoder with init message: %v\n", err)
+ return
+ }
+
+ for {
+ buffer, err := reader.ReadBytes('\n')
+ if err != nil {
+ s5l.Printf("failed to read from connection: %v\n", err)
+ return
+ }
+
+ s5l.Printf("msg: %v", string(buffer))
+
+ value, err := marshaller.Decode(buffer)
+ if err != nil {
+ s5l.Printf("failed to decode message: %v\n", err)
+ continue
+ }
+
+ self.appendData <- value
+ }
+}
+
+func (self StatsSinkServer) ServePipe(pipePath string) {
+ ln, err := net.Listen("unix", pipePath)
+ if err != nil {
+ s5l.Printf("failed to connect to unix pipe: %v", err)
+ return
+ }
+ defer ln.Close()
+
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ // ignore
+ continue
+ }
+ go self.handleConnection(conn)
+ }
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 89b449f..57a5b82 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -2,7 +2,6 @@ package sfive
import (
"database/sql"
- "log"
"time"
_ "github.com/mattn/go-sqlite3"
@@ -13,47 +12,48 @@ import (
// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections
// this is very much not normalized at all, because I'm too lazy to type
+// table names
const (
- TagsTn = "Tags"
- SourceTagsTn = "StreamToTagMap"
- SourcesTn = "Sources"
- ClientDataUpdatesTn = "ClientDataUpdates"
- DataUpdatesTn = "DataUpdates"
+ tagsTn = "Tags"
+ sourceTagsTn = "StreamToTagMap"
+ sourcesTn = "Sources"
+ clientdataUpdatesTn = "ClientDataUpdates"
+ dataUpdatesTn = "DataUpdates"
)
-// stored in TagsTn
+// stored in tagsTn
type tagDb struct {
Id int
Name string
}
-// stored in SourceTagsTn
+// stored in sourceTagsTn
// Stream m:n Tag
type sourceTagsDb struct {
- TagId int // foreign key to TagsTn
- SourceId int // foreign key to SourcesTn
+ TagId int // foreign key to tagsTn
+ SourceId int // foreign key to sourcesTn
}
-// stored in SourcesTn
+// stored in sourcesTn
type sourceDb struct {
Id int
StreamId
SourceId
}
-// stored in ClientDataUpdatesTn
+// stored in clientdataUpdatesTn
// ClientData n:1 DataUpdate
type clientDataDb struct {
Id int
- DataUpdatesId int // foreign key to DataUpdatesTn
+ DataUpdatesId int // foreign key to dataUpdatesTn
ClientData
}
-// stored in DataUpdatesTn
+// stored in dataUpdatesTn
// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
type dataUpdateDb struct {
Id int
- SourceId int // foreign key to SourcesTn
+ SourceId int // foreign key to sourcesTn
StartTime time.Time
Duration time.Duration
ClientCount uint
@@ -61,15 +61,6 @@ type dataUpdateDb struct {
BytesSent uint
}
-type StatsContainer interface {
- Close()
- Append(update StatisticsData) error
- GetTags() ([]string, error)
- CountUpdateEntries(filter *StatsFilter) (int64, error)
- CountClients(filter *StatsFilter) uint
- GetAverageBps(filter *StatsFilter) (uint, error)
-}
-
type sqliteStore struct {
db *gorp.DbMap
}
@@ -123,31 +114,30 @@ func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataD
return du, cd, src, tags
}
-func initDb() *gorp.DbMap {
+func initDb(path string) (res *gorp.DbMap, err error) {
// connect to db using standard Go database/sql API
- db, err := sql.Open("sqlite3", "/tmp/s5-test.sqlite")
- checkErr(err, "sql.Open failed")
+ db, err := sql.Open("sqlite3", path)
+ if err != nil {
+ return
+ }
dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}}
// dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds))
- dbmap.AddTableWithName(tagDb{}, TagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true)
- dbmap.AddTableWithName(sourceTagsDb{}, SourceTagsTn).SetKeys(false, "TagId", "SourceId")
- dbmap.AddTableWithName(sourceDb{}, SourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname")
- dbmap.AddTableWithName(clientDataDb{}, ClientDataUpdatesTn).SetKeys(true, "Id")
- dbmap.AddTableWithName(dataUpdateDb{}, DataUpdatesTn).SetKeys(true, "Id")
+ dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true)
+ dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId")
+ dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname")
+ dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id")
+ dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id")
// TODO use some real migration, yadda yadda
err = dbmap.CreateTablesIfNotExists()
- checkErr(err, "Create tables failed")
-
- return dbmap
-}
-
-func checkErr(err error, msg string) {
if err != nil {
- log.Panicln(msg, err)
+ return
}
+
+ res = dbmap
+ return
}
func isEmptyFilter(filter *StatsFilter) bool {
@@ -176,10 +166,10 @@ func insertAnd(needsAnd *bool) (res string) {
func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) {
if isEmptyFilter(filter) {
- return DataUpdatesTn, nil
+ return dataUpdatesTn, nil
}
- query := "(select * from " + DataUpdatesTn + " where"
+ query := "(select * from " + dataUpdatesTn + " where"
parameters := make(map[string]interface{})
needsAnd := false
@@ -201,7 +191,7 @@ func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interf
func (s sqliteStore) findTag(name string) (tag *tagDb, err error) {
t := tagDb{}
- err = s.db.SelectOne(&t, "select * from "+TagsTn+" where Name = ?", name)
+ err = s.db.SelectOne(&t, "select * from "+tagsTn+" where Name = ?", name)
if err == nil {
tag = &t
}
@@ -212,7 +202,7 @@ func (s sqliteStore) insertNewTags(tags []tagDb) (err error) {
for i := range tags {
t, err := s.findTag(tags[i].Name)
if err != nil {
- _, err = s.db.Exec("insert into "+TagsTn+" VALUES (NULL, ?)", tags[i].Name)
+ _, err = s.db.Exec("insert into "+tagsTn+" VALUES (NULL, ?)", tags[i].Name)
}
t, err = s.findTag(tags[i].Name)
@@ -230,7 +220,7 @@ func (s sqliteStore) findSource(src sourceDb) (res *sourceDb, err error) {
t := sourceDb{}
err = s.db.SelectOne(
&t,
- "select Id from "+SourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?",
+ "select Id from "+sourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?",
src.ContentId,
src.Format,
src.Quality,
@@ -262,11 +252,12 @@ func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error
}
for i := range st {
_, err = s.db.Exec(
- "insert or ignore into "+SourceTagsTn+" values (?,?)",
+ "insert or ignore into "+sourceTagsTn+" values (?,?)",
st[i].SourceId,
st[i].TagId)
// err = s.db.Insert(&st[i])
if err != nil {
+ // TODO
//fmt.Printf("st\n")
return
}
@@ -289,6 +280,7 @@ func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpd
cd[i].DataUpdatesId = du.Id
err = s.db.Insert(&cd[i])
if err != nil {
+ // TODO
return
}
}
@@ -341,15 +333,15 @@ func (s sqliteStore) CountUpdateEntries(filter *StatsFilter) (count int64, err e
}
func (s sqliteStore) GetTags() ([]string, error) {
- res, dbErr := s.db.Select("", "select Name from "+TagsTn)
+ res, dbErr := s.db.Select("", "select Name from "+tagsTn)
if dbErr == nil {
- sRes := ToString(res)
+ sRes := castArrayToString(res)
return sRes, nil
}
return nil, dbErr
}
-func ToString(value []interface{}) []string {
+func castArrayToString(value []interface{}) []string {
res := make([]string, len(value))
for i := range value {
res[i] = value[i].(string)
@@ -384,25 +376,15 @@ func (s sqliteStore) GetAverageBps(filter *StatsFilter) (uint, error) {
return 0, err
}
-func NewStore() (store StatsContainer, err error) {
- db := initDb()
- if db == nil {
+func NewStore(path string) (store sqliteStore, err error) {
+ db, err := initDb(path)
+ if err != nil {
return
}
- res := sqliteStore{db}
- store = res
+ store = sqliteStore{db}
return
}
-func EatDataAndClose(sc StatsContainer) {
- s := sc.(sqliteStore)
- // if s == nil {
- // return
- // }
- s.db.TruncateTables()
- s.Close()
-}
-
func (s sqliteStore) Close() {
s.db.Db.Close()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index c3609da..ce4ce0f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -18,11 +18,11 @@ func TestGetFilter(t *testing.T) {
}
func TestAppend(t *testing.T) {
- store, err := NewStore()
+ store, err := NewStore("file:memdb1?mode=memory&cache=shared")
if err != nil {
t.Errorf("Failed to initialize: %v", err)
}
- defer EatDataAndClose(store)
+ defer store.Close()
startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC)
update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond}
@@ -54,11 +54,11 @@ func TestAppend(t *testing.T) {
}
func IgnoreTestCount(t *testing.T) {
- store, err := NewStore()
+ store, err := NewStore("file:memdb1?mode=memory&cache=shared")
if err != nil {
t.Errorf("Failed to initialize: %v", err)
}
- defer EatDataAndClose(store)
+ defer store.Close()
if 0 != store.CountClients(nil) {
t.Errorf("Failed to count correctly.")
diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go
index 78a877b..7b0fb52 100644
--- a/src/hub/src/spreadspace.org/sfive/s5types.go
+++ b/src/hub/src/spreadspace.org/sfive/s5types.go
@@ -55,7 +55,7 @@ type StatsFilter struct {
tagsAny []string
}
-func (self *StatisticsData) CopyFrom(id *SourceId) {
+func (self *StatisticsData) CopyFromSourceId(id *SourceId) {
self.Hostname = id.Hostname
self.StreamId = id.StreamId
self.Tags = id.Tags