package sfive import ( "database/sql" "time" _ "github.com/mattn/go-sqlite3" "github.com/coopernurse/gorp" ) // compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections // this is very much not normalized at all, because I'm too lazy to type // table names const ( tagsTn = "Tags" sourceTagsTn = "StreamToTagMap" sourcesTn = "Sources" clientdataUpdatesTn = "ClientDataUpdates" dataUpdatesTn = "DataUpdates" ) // stored in tagsTn type tagDb struct { Id int Name string } // stored in sourceTagsTn // Stream m:n Tag type sourceTagsDb struct { TagId int // foreign key to tagsTn SourceId int // foreign key to sourcesTn } // stored in sourcesTn type sourceDb struct { Id int StreamId SourceId } // stored in clientdataUpdatesTn // ClientData n:1 DataUpdate type clientDataDb struct { Id int DataUpdatesId int // foreign key to dataUpdatesTn ClientData } // stored in dataUpdatesTn // in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs type dataUpdateDb struct { Id int SourceId int // foreign key to sourcesTn StartTime int64 // time.Time Duration int64 // time.Duration ClientCount uint BytesReceived uint BytesSent uint } type sqliteStore struct { db *gorp.DbMap } func tagsFromStatisticsData(value StatisticsData) []tagDb { tags := make([]tagDb, len(value.SourceId.Tags)) for i := range value.SourceId.Tags { tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]} } return tags } func sourceFromStatisticsData(value StatisticsData) sourceDb { return sourceDb{ -1, StreamId{ ContentId: value.SourceId.StreamId.ContentId, Format: value.SourceId.StreamId.Format, Quality: value.SourceId.StreamId.Quality, }, SourceId{ Hostname: value.SourceId.Hostname}, } } func clientsFromStatisticsData(value StatisticsData) []clientDataDb { res := make([]clientDataDb, len(value.Data.Clients)) for i := range value.Data.Clients { res[i] = clientDataDb{-1, -1, value.Data.Clients[i]} } return res } func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { return dataUpdateDb{ -1, -1, value.StartTime.Unix(), int64(value.Duration.Seconds()), value.Data.ClientCount, value.Data.BytesReceived, value.Data.BytesSent} } func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) { du := dataUpdateFromStatisticsData(value) cd := clientsFromStatisticsData(value) src := sourceFromStatisticsData(value) tags := tagsFromStatisticsData(value) return du, cd, src, tags } func initDb(path string) (res *gorp.DbMap, err error) { // connect to db using standard Go database/sql API db, err := sql.Open("sqlite3", path) if err != nil { return } dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds)) dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId") dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname") dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id") dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id") // TODO use some real migration, yadda yadda err = dbmap.CreateTablesIfNotExists() if err != nil { return } res = dbmap return } func isEmptyFilter(filter *StatsFilter) bool { if filter == nil { return true } if filter.start == nil && filter.end == nil && filter.hostname == nil && filter.contentId == nil && filter.format == nil && filter.quality == nil && (filter.tagsAny == nil || len(filter.tagsAny) == 0) { return true } return false } func insertAnd(needsAnd *bool) (res string) { if *needsAnd { res = " and" *needsAnd = false } return } func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) { if isEmptyFilter(filter) { return dataUpdatesTn, nil } query := "(select * from " + dataUpdatesTn + " where" parameters := make(map[string]interface{}) needsAnd := false if filter.start != nil { query += insertAnd(&needsAnd) query += " StartTime >= :filterstart" parameters["filterstart"] = filter.start.Unix() needsAnd = true } if filter.end != nil { query += insertAnd(&needsAnd) query += " StartTime < :filterend" parameters["filterend"] = filter.end.Unix() needsAnd = true } // TODO other fields query += ")" return query, parameters } func (s sqliteStore) findTag(name string) (tag *tagDb, err error) { t := tagDb{} err = s.db.SelectOne(&t, "select * from "+tagsTn+" where Name = ?", name) if err == nil { tag = &t } return } func (s sqliteStore) insertNewTags(tags []tagDb) (err error) { for i := range tags { t, err := s.findTag(tags[i].Name) if err != nil { _, err = s.db.Exec("insert into "+tagsTn+" VALUES (NULL, ?)", tags[i].Name) } t, err = s.findTag(tags[i].Name) if err == nil { tags[i] = *t } else { break } } return } func (s sqliteStore) findSource(src sourceDb) (res *sourceDb, err error) { t := sourceDb{} err = s.db.SelectOne( &t, "select Id from "+sourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?", src.ContentId, src.Format, src.Quality, src.Hostname) if err == nil { res = &t } return } func (s sqliteStore) insertNewSource(src *sourceDb) (err error) { t, err := s.findSource(*src) if err == nil { *src = *t } else { err = s.db.Insert(src) } return } func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error) { st := make([]sourceTagsDb, len(tags)) for i := range tags { st[i].TagId = tags[i].Id st[i].SourceId = src.Id } for i := range st { _, err = s.db.Exec( "insert or ignore into "+sourceTagsTn+" values (?,?)", st[i].SourceId, st[i].TagId) // err = s.db.Insert(&st[i]) if err != nil { // TODO //fmt.Printf("st\n") return } } return } func (s sqliteStore) insertDataUpdateEntry(src sourceDb, du *dataUpdateDb) (err error) { du.SourceId = src.Id err = s.db.Insert(du) if err != nil { //fmt.Printf("du\n") return } return } func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpdateDb) (err error) { for i := range cd { cd[i].DataUpdatesId = du.Id err = s.db.Insert(&cd[i]) if err != nil { // TODO return } } return } // this function is the biggest pile of copy/pasted crap while sick that is still compilable. func (s sqliteStore) Append(update StatisticsData) (err error) { du, cd, src, tags := updateFromStatisticsData(update) // s.db.TraceOn("", log.New(os.Stdout, "gorptest: ", log.Lmicroseconds)) tx, err := s.db.Begin() if err != nil { return } err = s.insertNewTags(tags) if err != nil { return } err = s.insertNewSource(&src) if err != nil { //fmt.Printf("src\n") return } err = s.insertSourceTagLinks(src, tags) if err != nil { return } err = s.insertDataUpdateEntry(src, &du) if err != nil { return } err = s.insertDataUpdateClientEntries(cd, du) if err != nil { return } return tx.Commit() } func castArrayToString(value []interface{}) []string { res := make([]string, len(value)) for i := range value { res[i] = value[i].(string) } return res } func (s sqliteStore) GetTags() ([]string, error) { res, dbErr := s.db.Select("", "select Name from "+tagsTn) if dbErr == nil { sRes := castArrayToString(res) return sRes, nil } return nil, dbErr } func (s sqliteStore) GetSources() (res []sourceDb, err error) { qres, err := s.db.Select(sourceDb{}, "select * from "+sourcesTn) if err == nil { res = make([]sourceDb, len(qres)) for i := range qres { res[i] = *qres[i].(*sourceDb) } } return } func (s sqliteStore) GetSource(id int) (res sourceDb, err error) { err = s.db.SelectOne(&res, "select * from "+sourcesTn+" where Id = ?", id) return } func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { err = s.db.SelectOne(&res, "select * from "+dataUpdatesTn+" where Id = ?", id) return } func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) { updates, err := s.db.Select( dataUpdateDb{}, "select * from "+dataUpdatesTn+" where Id > ?", id) if err == nil { res = make([]dataUpdateDb, len(updates)) for i := range updates { res[i] = *updates[i].(*dataUpdateDb) } } return } func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []dataUpdateDb, err error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) updates, err := s.db.Select( dataUpdateDb{}, "select Id, SourceId, StartTime, ClientCount, BytesReceived, BytesSent from "+sourceSql, parameters) if err == nil { res = make([]dataUpdateDb, len(updates)) for i := range updates { res[i] = *updates[i].(*dataUpdateDb) } } return } func getCountEntriesSql() string { return "count(*) as UpdateCount" } // TODO this is totally utterly wrong func getClientCountSql() string { return "avg(ClientCount) as ClientCount" } func getBpsStatsSql() string { return "sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime" } func (s sqliteStore) CountUpdateEntries(filter *StatsFilter) (count int64, err error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) count, err = s.db.SelectInt("select "+getCountEntriesSql()+" from "+sourceSql, parameters) return } func (s sqliteStore) CountClients(filter *StatsFilter) int { sourceSql, parameters := getFilteredDataUpdateSelect(filter) count, err := s.db.SelectInt("select "+getClientCountSql()+" from "+sourceSql, parameters) if err != nil { s5l.Printf("CountClients: %v", err) return -1 } return int(count) } type bpsQueryResult struct { BytesSent uint BytesReceived uint StartTime time.Time LastStartTime time.Time } func (s sqliteStore) GetAverageBps(filter *StatsFilter) (uint, error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) res := bpsQueryResult{} err := s.db.SelectOne( res, "select "+getBpsStatsSql()+" from "+sourceSql, parameters) if err == nil { bps := (res.BytesSent + res.BytesReceived) / uint(res.StartTime.Sub(res.LastStartTime).Seconds()) return bps, nil } return 0, err } type statsResult struct { UpdateCount *int ClientCount *float32 BytesSent *uint BytesReceived *uint StartTime *int64 LastStartTime *int64 } func (s sqliteStore) GetStats(filter *StatsFilter) (statsResult, error) { // (map[string]interface{}, error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) _ = sourceSql sql := "select " + getCountEntriesSql() + "," + getClientCountSql() + "," + getBpsStatsSql() + " from " + sourceSql s5l.Printf("stats sql: %v", sql) res := statsResult{} err := s.db.SelectOne(&res, sql, parameters) return res, err } func NewStore(path string) (store sqliteStore, err error) { db, err := initDb(path) if err != nil { return } store = sqliteStore{db} return } func (s sqliteStore) Close() { s.db.Db.Close() }