From 721b96afe0aea0ad39d20d9c4873ae6b5469d693 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sun, 19 Oct 2014 16:41:56 +0200 Subject: hub: refactor server and db initialization * pass the paths to DB and pipe as arguments * move the connection to sqlite DB into the constructor of the server * return error instead of panicking when initializing DB fails * add an Actor for appending data to the sqlite store, start it in the server constructor * add a somewhat proper Close for the server * split s5srv.go, extracting pipe-specific code * make more constants etc. private * remove unnecessary StatsContainer interface * use an explicit s5 logger with prefix * use an in-memory sqlite db for unit tests --- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 17 +++- src/hub/src/spreadspace.org/sfive/s5cvt.go | 4 +- src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 4 +- src/hub/src/spreadspace.org/sfive/s5log.go | 8 ++ src/hub/src/spreadspace.org/sfive/s5srv.go | 87 ++++++------------ src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 56 ++++++++++++ src/hub/src/spreadspace.org/sfive/s5store.go | 106 +++++++++------------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 8 +- src/hub/src/spreadspace.org/sfive/s5types.go | 2 +- 9 files changed, 159 insertions(+), 133 deletions(-) create mode 100644 src/hub/src/spreadspace.org/sfive/s5log.go create mode 100644 src/hub/src/spreadspace.org/sfive/s5srvPipe.go diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 370e209..c1802eb 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -4,13 +4,24 @@ import ( "fmt" "log" "spreadspace.org/sfive" + "sync" ) func main() { fmt.Printf("s5: Hello, world.\n") - server, err := sfive.NewSfiveServer() + server, err := sfive.NewServer("/tmp/sfive.sqlite") if err != nil { - log.Fatalf("failed to start S5: %v", err) + log.Fatalf("failed to initialize S5: %v", err) } - server.ListenAndServe() + defer server.Close() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + server.ServePipe("/run/sfive/pipe") + }() + + wg.Wait() } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index e908bc6..330e2c1 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -49,7 +49,7 @@ func NewFilterDecoder() (decoder FilterDecoder) { } func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { - dat.CopyFrom(&self.sourceId) + dat.CopyFromSourceId(&self.sourceId) err = json.Unmarshal(jsonString, &dat) // like in PlainDecoder, let the client decide how to use partial results // (Unmarshal returns partial results in case of errors) @@ -64,7 +64,7 @@ func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err err func (self *PlainEncoder) Encode(data *StatisticsData) []byte { res, err := json.Marshal(data) if err != nil { - panic("oh fuck I cannot event marshal my own data") + s5l.Panicln("failed to encode StatisticsData") } return res } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 2326911..32f35dd 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -18,7 +18,7 @@ var ( func GetExpected() *StatisticsData { expected := new(StatisticsData) - expected.CopyFrom(&sourceIdDataStruct) + expected.CopyFromSourceId(&sourceIdDataStruct) expected.CopyFromUpdate(&updateDataStruct) return expected } @@ -56,7 +56,7 @@ func TestDecodePlain(t *testing.T) { func TestEncode(t *testing.T) { ec := new(PlainEncoder) td := new(StatisticsData) - td.CopyFrom(&sourceIdDataStruct) + td.CopyFromSourceId(&sourceIdDataStruct) td.CopyFromUpdate(&updateDataStruct) t.Logf("dada: %v", ec.Encode(td)) } diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go new file mode 100644 index 0000000..fea0b10 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5log.go @@ -0,0 +1,8 @@ +package sfive + +import ( + "log" + "os" +) + +var s5l = log.New(os.Stderr, "s5", log.LstdFlags) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 8bd7904..7eab0eb 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -1,77 +1,46 @@ package sfive -import ( - "bufio" - "fmt" - "net" -) - type StatsSinkServer struct { + store sqliteStore + quit chan bool + done chan bool + appendData chan StatisticsData } -func handleConnection(store StatsContainer, conn net.Conn) { - reader := bufio.NewReader(conn) - buffer, err := reader.ReadBytes('\n') - if err != nil { - fmt.Printf("s5 failed to read from connection: %v\n", err) - return - } - marshaller, err := NewStatefulDecoder(buffer) - if err != nil { - fmt.Printf("s5 failed initializing decoder with init message: %v\n", err) - return - } - +func (self StatsSinkServer) appendActor() { + defer func() { self.done <- true }() for { - buffer, err := reader.ReadBytes('\n') - if err != nil { - fmt.Printf("s5 failed to read from connection: %v\n", err) - return - } - - fmt.Printf("s5 msg: %v", string(buffer)) - - value, err := marshaller.Decode(buffer) - if err != nil { - fmt.Printf("s5 failed to decode message: %v\n", err) - continue - } - - err = store.Append(value) - if err != nil { - fmt.Printf("s5 failed to store data: %v\n", err) + select { + case <-self.quit: return + case value := <-self.appendData: + err := self.store.Append(value) + if err != nil { + s5l.Printf("failed to store data: %v\n", err) + } } } } -func (self StatsSinkServer) ListenAndServe() { - store, err := NewStore() - if err != nil { - fmt.Printf("s5 failed to connect to persistence layer: %v\n", err) - return - } - defer store.Close() +func (self StatsSinkServer) Close() { + self.quit <- true + <-self.done + close(self.quit) + close(self.done) + close(self.appendData) + self.store.Close() +} - ln, err := net.Listen("unix", "/run/sfive/receiver") +func NewServer(dbPath string) (server *StatsSinkServer, err error) { + // TODO read configuration and create instance with correct settings + server = new(StatsSinkServer) + server.store, err = NewStore(dbPath) if err != nil { - fmt.Printf("s5 failed to connect to unix pipe: %v", err) return } - defer ln.Close() - for { - conn, err := ln.Accept() - if err != nil { - // ignore - continue - } - go handleConnection(store, conn) - } -} - -func NewSfiveServer() (server *StatsSinkServer, err error) { - // TODO read configuration and create instance with correct settings - server = new(StatsSinkServer) + server.quit = make(chan bool) + server.done = make(chan bool) + server.appendData = make(chan StatisticsData) return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go new file mode 100644 index 0000000..e024e2d --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -0,0 +1,56 @@ +package sfive + +import ( + "bufio" + "net" +) + +func (self StatsSinkServer) handleConnection(conn net.Conn) { + reader := bufio.NewReader(conn) + buffer, err := reader.ReadBytes('\n') + if err != nil { + s5l.Printf("failed to read from connection: %v\n", err) + return + } + marshaller, err := NewStatefulDecoder(buffer) + if err != nil { + s5l.Printf("failed initializing decoder with init message: %v\n", err) + return + } + + for { + buffer, err := reader.ReadBytes('\n') + if err != nil { + s5l.Printf("failed to read from connection: %v\n", err) + return + } + + s5l.Printf("msg: %v", string(buffer)) + + value, err := marshaller.Decode(buffer) + if err != nil { + s5l.Printf("failed to decode message: %v\n", err) + continue + } + + self.appendData <- value + } +} + +func (self StatsSinkServer) ServePipe(pipePath string) { + ln, err := net.Listen("unix", pipePath) + if err != nil { + s5l.Printf("failed to connect to unix pipe: %v", err) + return + } + defer ln.Close() + + for { + conn, err := ln.Accept() + if err != nil { + // ignore + continue + } + go self.handleConnection(conn) + } +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 89b449f..57a5b82 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -2,7 +2,6 @@ package sfive import ( "database/sql" - "log" "time" _ "github.com/mattn/go-sqlite3" @@ -13,47 +12,48 @@ import ( // compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections // this is very much not normalized at all, because I'm too lazy to type +// table names const ( - TagsTn = "Tags" - SourceTagsTn = "StreamToTagMap" - SourcesTn = "Sources" - ClientDataUpdatesTn = "ClientDataUpdates" - DataUpdatesTn = "DataUpdates" + tagsTn = "Tags" + sourceTagsTn = "StreamToTagMap" + sourcesTn = "Sources" + clientdataUpdatesTn = "ClientDataUpdates" + dataUpdatesTn = "DataUpdates" ) -// stored in TagsTn +// stored in tagsTn type tagDb struct { Id int Name string } -// stored in SourceTagsTn +// stored in sourceTagsTn // Stream m:n Tag type sourceTagsDb struct { - TagId int // foreign key to TagsTn - SourceId int // foreign key to SourcesTn + TagId int // foreign key to tagsTn + SourceId int // foreign key to sourcesTn } -// stored in SourcesTn +// stored in sourcesTn type sourceDb struct { Id int StreamId SourceId } -// stored in ClientDataUpdatesTn +// stored in clientdataUpdatesTn // ClientData n:1 DataUpdate type clientDataDb struct { Id int - DataUpdatesId int // foreign key to DataUpdatesTn + DataUpdatesId int // foreign key to dataUpdatesTn ClientData } -// stored in DataUpdatesTn +// stored in dataUpdatesTn // in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs type dataUpdateDb struct { Id int - SourceId int // foreign key to SourcesTn + SourceId int // foreign key to sourcesTn StartTime time.Time Duration time.Duration ClientCount uint @@ -61,15 +61,6 @@ type dataUpdateDb struct { BytesSent uint } -type StatsContainer interface { - Close() - Append(update StatisticsData) error - GetTags() ([]string, error) - CountUpdateEntries(filter *StatsFilter) (int64, error) - CountClients(filter *StatsFilter) uint - GetAverageBps(filter *StatsFilter) (uint, error) -} - type sqliteStore struct { db *gorp.DbMap } @@ -123,31 +114,30 @@ func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataD return du, cd, src, tags } -func initDb() *gorp.DbMap { +func initDb(path string) (res *gorp.DbMap, err error) { // connect to db using standard Go database/sql API - db, err := sql.Open("sqlite3", "/tmp/s5-test.sqlite") - checkErr(err, "sql.Open failed") + db, err := sql.Open("sqlite3", path) + if err != nil { + return + } dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds)) - dbmap.AddTableWithName(tagDb{}, TagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) - dbmap.AddTableWithName(sourceTagsDb{}, SourceTagsTn).SetKeys(false, "TagId", "SourceId") - dbmap.AddTableWithName(sourceDb{}, SourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname") - dbmap.AddTableWithName(clientDataDb{}, ClientDataUpdatesTn).SetKeys(true, "Id") - dbmap.AddTableWithName(dataUpdateDb{}, DataUpdatesTn).SetKeys(true, "Id") + dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) + dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId") + dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname") + dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id") + dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id") // TODO use some real migration, yadda yadda err = dbmap.CreateTablesIfNotExists() - checkErr(err, "Create tables failed") - - return dbmap -} - -func checkErr(err error, msg string) { if err != nil { - log.Panicln(msg, err) + return } + + res = dbmap + return } func isEmptyFilter(filter *StatsFilter) bool { @@ -176,10 +166,10 @@ func insertAnd(needsAnd *bool) (res string) { func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) { if isEmptyFilter(filter) { - return DataUpdatesTn, nil + return dataUpdatesTn, nil } - query := "(select * from " + DataUpdatesTn + " where" + query := "(select * from " + dataUpdatesTn + " where" parameters := make(map[string]interface{}) needsAnd := false @@ -201,7 +191,7 @@ func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interf func (s sqliteStore) findTag(name string) (tag *tagDb, err error) { t := tagDb{} - err = s.db.SelectOne(&t, "select * from "+TagsTn+" where Name = ?", name) + err = s.db.SelectOne(&t, "select * from "+tagsTn+" where Name = ?", name) if err == nil { tag = &t } @@ -212,7 +202,7 @@ func (s sqliteStore) insertNewTags(tags []tagDb) (err error) { for i := range tags { t, err := s.findTag(tags[i].Name) if err != nil { - _, err = s.db.Exec("insert into "+TagsTn+" VALUES (NULL, ?)", tags[i].Name) + _, err = s.db.Exec("insert into "+tagsTn+" VALUES (NULL, ?)", tags[i].Name) } t, err = s.findTag(tags[i].Name) @@ -230,7 +220,7 @@ func (s sqliteStore) findSource(src sourceDb) (res *sourceDb, err error) { t := sourceDb{} err = s.db.SelectOne( &t, - "select Id from "+SourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?", + "select Id from "+sourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?", src.ContentId, src.Format, src.Quality, @@ -262,11 +252,12 @@ func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error } for i := range st { _, err = s.db.Exec( - "insert or ignore into "+SourceTagsTn+" values (?,?)", + "insert or ignore into "+sourceTagsTn+" values (?,?)", st[i].SourceId, st[i].TagId) // err = s.db.Insert(&st[i]) if err != nil { + // TODO //fmt.Printf("st\n") return } @@ -289,6 +280,7 @@ func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpd cd[i].DataUpdatesId = du.Id err = s.db.Insert(&cd[i]) if err != nil { + // TODO return } } @@ -341,15 +333,15 @@ func (s sqliteStore) CountUpdateEntries(filter *StatsFilter) (count int64, err e } func (s sqliteStore) GetTags() ([]string, error) { - res, dbErr := s.db.Select("", "select Name from "+TagsTn) + res, dbErr := s.db.Select("", "select Name from "+tagsTn) if dbErr == nil { - sRes := ToString(res) + sRes := castArrayToString(res) return sRes, nil } return nil, dbErr } -func ToString(value []interface{}) []string { +func castArrayToString(value []interface{}) []string { res := make([]string, len(value)) for i := range value { res[i] = value[i].(string) @@ -384,25 +376,15 @@ func (s sqliteStore) GetAverageBps(filter *StatsFilter) (uint, error) { return 0, err } -func NewStore() (store StatsContainer, err error) { - db := initDb() - if db == nil { +func NewStore(path string) (store sqliteStore, err error) { + db, err := initDb(path) + if err != nil { return } - res := sqliteStore{db} - store = res + store = sqliteStore{db} return } -func EatDataAndClose(sc StatsContainer) { - s := sc.(sqliteStore) - // if s == nil { - // return - // } - s.db.TruncateTables() - s.Close() -} - func (s sqliteStore) Close() { s.db.Db.Close() } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index c3609da..ce4ce0f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -18,11 +18,11 @@ func TestGetFilter(t *testing.T) { } func TestAppend(t *testing.T) { - store, err := NewStore() + store, err := NewStore("file:memdb1?mode=memory&cache=shared") if err != nil { t.Errorf("Failed to initialize: %v", err) } - defer EatDataAndClose(store) + defer store.Close() startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC) update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond} @@ -54,11 +54,11 @@ func TestAppend(t *testing.T) { } func IgnoreTestCount(t *testing.T) { - store, err := NewStore() + store, err := NewStore("file:memdb1?mode=memory&cache=shared") if err != nil { t.Errorf("Failed to initialize: %v", err) } - defer EatDataAndClose(store) + defer store.Close() if 0 != store.CountClients(nil) { t.Errorf("Failed to count correctly.") diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go index 78a877b..7b0fb52 100644 --- a/src/hub/src/spreadspace.org/sfive/s5types.go +++ b/src/hub/src/spreadspace.org/sfive/s5types.go @@ -55,7 +55,7 @@ type StatsFilter struct { tagsAny []string } -func (self *StatisticsData) CopyFrom(id *SourceId) { +func (self *StatisticsData) CopyFromSourceId(id *SourceId) { self.Hostname = id.Hostname self.StreamId = id.StreamId self.Tags = id.Tags -- cgit v1.2.3