diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5store.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 275 |
1 files changed, 254 insertions, 21 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 604a9ff..9d3d57e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -2,9 +2,141 @@ package sfive import ( "database/sql" + "log" + "os" "time" + + _ "github.com/mattn/go-sqlite3" + + "github.com/coopernurse/gorp" +) + +// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections +// this is very much not normalized at all, because I'm too lazy to type + +const ( + TagsTn = "Tags" + SourceTagsTn = "StreamToTagMap" + SourcesTn = "Sources" + ClientDataUpdatesTn = "ClientDataUpdates" + DataUpdatesTn = "DataUpdates" ) +// stored in TagsTn +type tagDb struct { + Id int + Name string +} + +func tagsFromStatisticsData(value StatisticsData) []tagDb { + tags := make([]tagDb, len(value.SourceId.Tags)) + for i := range value.SourceId.Tags { + tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]} + } + return tags +} + +// stored in SourceTagsTn +// Stream m:n Tag +type sourceTagsDb struct { + TagId int // foreign key to TagsTn + SourceId int // foreign key to SourcesTn +} + +// stored in SourcesTn +type sourceDb struct { + Id int + StreamId + SourceId +} + +func sourceFromStatisticsData(value StatisticsData) sourceDb { + return sourceDb{ + -1, + StreamId{ + ContentId: value.SourceId.StreamId.ContentId, + Format: value.SourceId.StreamId.Format, + Quality: value.SourceId.StreamId.Quality, + }, + SourceId{ + Hostname: value.SourceId.Hostname}, + } +} + +// stored in ClientDataUpdatesTn +// ClientData n:1 DataUpdate +type clientDataDb struct { + Id int + DataUpdatesId int // foreign key to DataUpdatesTn + ClientData +} + +func clientsFromStatisticsData(value StatisticsData) []clientDataDb { + res := make([]clientDataDb, len(value.Data.Clients)) + for i := range value.Data.Clients { + res[i] = clientDataDb{-1, -1, value.Data.Clients[i]} + } + return res +} + +// stored in DataUpdatesTn +// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs +type dataUpdateDb struct { + Id int + SourceId int // foreign key to SourcesTn + StartTime time.Time + Duration time.Duration + ClientCount uint + BytesReceived uint + BytesSent uint +} + +func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { + return dataUpdateDb{ + -1, + -1, + value.StartTime, + value.Duration, + value.Data.ClientCount, + value.Data.BytesReceived, + value.Data.BytesSent} +} + +func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) { + du := dataUpdateFromStatisticsData(value) + cd := clientsFromStatisticsData(value) + src := sourceFromStatisticsData(value) + tags := tagsFromStatisticsData(value) + + return du, cd, src, tags +} + +func initDb() *gorp.DbMap { + // connect to db using standard Go database/sql API + db, err := sql.Open("sqlite3", "/home/gimpf/test.sqlite") + checkErr(err, "sql.Open failed") + + dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} + + dbmap.AddTableWithName(tagDb{}, TagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) + dbmap.AddTableWithName(sourceTagsDb{}, SourceTagsTn).SetKeys(false, "TagId", "SourceId") + dbmap.AddTableWithName(sourceDb{}, SourcesTn).SetKeys(true, "Id") + dbmap.AddTableWithName(clientDataDb{}, ClientDataUpdatesTn).SetKeys(true, "Id") + dbmap.AddTableWithName(dataUpdateDb{}, DataUpdatesTn).SetKeys(true, "Id") + + // TODO use some real migration, yadda yadda + err = dbmap.CreateTablesIfNotExists() + checkErr(err, "Create tables failed") + + return dbmap +} + +func checkErr(err error, msg string) { + if err != nil { + log.Fatalln(msg, err) + } +} + type StatsFilter struct { start *time.Time end *time.Time @@ -15,47 +147,148 @@ type StatsFilter struct { tagsAny []string } +type Closer interface { + Close() +} + type StatsContainer interface { Append(update StatisticsData) error - ClientCount(filter StatsFilter) uint - AverageBps(filter StatsFilter) uint - Locations(filter StatsFilter) map[string]int + CountUpdateEntries() (int64, error) + GetTags() ([]string, error) + ClientCount(filter *StatsFilter) uint + AverageBps(filter *StatsFilter) (uint, error) + Locations(filter *StatsFilter) map[string]int } -type SqliteStore struct { - db *sql.DB +type sqliteStore struct { + db *gorp.DbMap } -func InitSqlDb(db *sql.DB) { +func (s sqliteStore) Append(update StatisticsData) (err error) { + du, cd, src, tags := updateFromStatisticsData(update) -} + s.db.TraceOn("", log.New(os.Stdout, "gorptest: ", log.Lmicroseconds)) + tx, err := s.db.Begin() + if err != nil { + //fmt.Printf("tx\n") + return + } -func NewStore() (store StatsContainer, err error) { - db, err := sql.Open("sqlite3", ":memory:") + for i := range tags { + err = s.db.Insert(&tags[i]) + if err != nil { + //fmt.Printf("tags\n") + return + } + } + + err = s.db.Insert(&src) if err != nil { + //fmt.Printf("src\n") return } - res := &SqliteStore{db} - store = res + + st := make([]sourceTagsDb, len(tags)) + for i := range tags { + st[i].TagId = tags[i].Id + st[i].SourceId = src.Id + } + for i := range st { + err = s.db.Insert(&st[i]) + if err != nil { + //fmt.Printf("st\n") + return + } + } + + du.SourceId = src.Id + err = s.db.Insert(&du) + if err != nil { + //fmt.Printf("du\n") + return + } + + for i := range cd { + cd[i].DataUpdatesId = du.Id + err = s.db.Insert(&cd) + if err != nil { + return + } + + } + return tx.Commit() +} + +func (s sqliteStore) CountUpdateEntries() (count int64, err error) { + count, err = s.db.SelectInt("select count(*) from " + DataUpdatesTn) return } -func (s *SqliteStore) Append(update StatisticsData) (err error) { - // TODO - return nil +func (s sqliteStore) GetTags() ([]string, error) { + res, dbErr := s.db.Select("", "select Name from "+TagsTn) + if dbErr == nil { + sRes := ToString(res) + return sRes, nil + } + return nil, dbErr } -func (s *SqliteStore) ClientCount(filter StatsFilter) uint { - return 0 - // TODO +func ToString(value []interface{}) []string { + res := make([]string, len(value)) + for i := range value { + res[i] = value[i].(string) + } + return res } -func (s *SqliteStore) AverageBps(filter StatsFilter) uint { - return 0 - // TODO +func (s sqliteStore) ClientCount(filter *StatsFilter) uint { + count, _ := s.db.SelectInt( + "select count(distict (Ip, UserAgent)) from " + ClientDataUpdatesTn) + return uint(count) +} + +type bpsQueryResult struct { + BytesReceived uint + BytesSent uint + StartTime time.Time + LastStartTime time.Time + LastDuration time.Duration } -func (s *SqliteStore) Locations(filter StatsFilter) map[string]int { +func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) { + res := bpsQueryResult{} + err := s.db.SelectOne(res, "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+DataUpdatesTn) + if err == nil { + bps := (res.BytesSent + res.BytesReceived) / uint(res.StartTime.Sub(res.LastStartTime).Seconds()) + return bps, nil + } + return 0, err +} + +func (s sqliteStore) Locations(filter *StatsFilter) map[string]int { return nil // TODO } + +func NewStore() (store StatsContainer, err error) { + db := initDb() + if db == nil { + return + } + res := sqliteStore{db} + store = res + return +} + +func EatDataAndClose(sc StatsContainer) { + s := sc.(sqliteStore) + // if s == nil { + // return + // } + s.db.TruncateTables() + s.Close() +} + +func (s *sqliteStore) Close() { + s.db.Db.Close() +} |