diff options
Diffstat (limited to 'src/hub')
28 files changed, 2154 insertions, 1171 deletions
diff --git a/src/hub/.gitignore b/src/hub/.gitignore index 16dc168..dd65bbd 100644 --- a/src/hub/.gitignore +++ b/src/hub/.gitignore @@ -5,3 +5,5 @@ /pkg *.a *.o +/test +/coverage.out diff --git a/src/hub/Makefile b/src/hub/Makefile index 0c34f14..87890a4 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -12,7 +12,7 @@ # live and recorded data. # # -# Copyright (C) 2014-2015 Christian Pointner <equinox@spreadspace.org> +# Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> # Markus Grüneis <gimpf@gimpf.org> # # This file is part of sfive. @@ -38,13 +38,10 @@ endif EXECUTEABLE := sfive-hub -LIBS := "gopkg.in/gorp.v2" \ - "github.com/mattn/go-sqlite3" \ +LIBS := "github.com/boltdb/bolt" \ "github.com/zenazn/goji" \ "github.com/pborman/uuid" \ "github.com/equinox0815/graphite-golang" -# "github.com/go-sql-driver/mysql" -# "github.com/ziutek/mymysql/godrv" all: build test @@ -83,6 +80,11 @@ bench: getlibs @echo "testing and benchmarking: sfive" @$(GOCMD) test -bench=. spreadspace.org/sfive +cover: getlibs + @echo "testing with coverage output" + @$(GOCMD) test -coverprofile=coverage.out spreadspace.org/sfive + @$(GOCMD) tool cover -html=coverage.out + clean: rm -rf pkg/*/spreadspace.org rm -rf bin diff --git a/src/hub/dump-test-db b/src/hub/dump-test-db deleted file mode 100755 index 9a7aae7..0000000 --- a/src/hub/dump-test-db +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -echo .dump | sqlite3 /var/lib/sfive/db.sqlite - diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 8897e42..4028f3f 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package main import ( @@ -12,8 +44,8 @@ import ( var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags) func main() { - db := flag.String("db", "/var/lib/sfive/db.sqlite", "path to the sqlite3 database file") - dbMysql := flag.Bool("db-mysql", false, "use MySQL connector") + db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file") + readOnly := flag.Bool("read-only", false, "open database in read-only mode") pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver") ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver") startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe") @@ -27,7 +59,6 @@ func main() { piwikSiteURL := flag.String("piwik-site-url", "", "use this base url for the site") piwikSiteID := flag.Uint("piwik-site-id", 1, "use this site-id for piwik") piwikToken := flag.String("piwik-token", "", "the auth token for piwik") - vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application") help := flag.Bool("help", false, "show usage") flag.Parse() @@ -38,11 +69,11 @@ func main() { return } - server, err := sfive.NewServer(*dbMysql, *db) + srv, err := sfive.NewServer(*db, *readOnly) if err != nil { - s5hl.Fatalf("failed to initialize: %v", err) + s5hl.Fatalf(err.Error()) } - defer server.Close() + defer srv.Close() var wg sync.WaitGroup @@ -51,7 +82,7 @@ func main() { go func() { defer wg.Done() s5hl.Printf("start pipe at %v\n", *pipe) - server.ServePipe(*pipe) + srv.ServePipe(*pipe) s5hl.Println("pipe finished") }() } @@ -60,8 +91,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Printf("start pipegram at %v\n", *ppipe) - server.ServeGramPipe(*ppipe) + s5hl.Printf("starting pipegram at %v\n", *ppipe) + srv.ServePipegram(*ppipe) s5hl.Println("pipegram finished") }() } @@ -70,8 +101,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Println("start web-srv") - server.ServeWeb(*vizAppDir) + s5hl.Println("starting web") + srv.ServeWeb() s5hl.Println("web finished") }() } @@ -80,8 +111,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Println("start forward") - server.RunForwarding(*forward) + s5hl.Println("starting forward") + srv.RunForwarding(*forward) s5hl.Println("forward finished") }() } @@ -90,8 +121,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Println("start elastic-search forward") - server.RunForwardingToElasticSearch(*forwardES) + s5hl.Println("starting elastic-search forward") + srv.RunForwardingEs(*forwardES) s5hl.Println("elastic-search forward finished") }() } @@ -100,8 +131,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Println("start graphite forward") - server.RunForwardingToGraphite(*forwardGraphite, *graphiteBasePath) + s5hl.Println("starting graphite forward") + srv.RunForwardingGraphite(*forwardGraphite, *graphiteBasePath) s5hl.Println("graphite forward finished") }() } @@ -110,8 +141,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - s5hl.Println("start piwik forward") - server.RunForwardingToPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken) + s5hl.Println("starting piwik forward") + srv.RunForwardingPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken) s5hl.Println("piwik forward finished") }() } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index cd65cf7..26a7c83 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -5,71 +37,80 @@ import ( "fmt" ) -type StatsDecoder interface { - Decode(jsonString []byte) (StatisticsData, error) -} - -type StatsEncoder interface { - Encode(data StatisticsData) []byte -} +const ( + ProtocolVersion = 1 +) -type FilterDecoder interface { - Decode(jsonString []byte) (StatsFilter, error) -} +// +// Decoder +// -type StatefulDecoder struct { - sourceId SourceId +type Decoder interface { + Decode(jsonString []byte) (DataUpdateFull, error) } -type PlainDecoder struct{} - -type PlainEncoder struct{} +// stateless protocol interfaces +type StatelessDecoder struct{} -type filterDecoder struct{} +func NewStatelessDecoder() Decoder { + return &StatelessDecoder{} +} -func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) { - res := new(StatefulDecoder) - err = json.Unmarshal(jsonString, &res.sourceId) - if err != nil { +func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { + if err = json.Unmarshal(jsonString, &res); err != nil { return } - if res.sourceId.Version != 1 { - err = fmt.Errorf("unsupported version, expected 1, actual %v", res.sourceId.Version) + if res.Version != ProtocolVersion { + err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion) } - decoder = res return } -func NewPlainDecoder() StatsDecoder { - return new(PlainDecoder) +// stateful protocol interfaces +type StatefulDecoder struct { + Version uint + SourceId } -func NewFilterDecoder() FilterDecoder { - return new(filterDecoder) +func NewStatefulDecoder(jsonString []byte) (Decoder, error) { + res := &StatefulDecoder{} + if err := json.Unmarshal(jsonString, &res); err != nil { + return nil, err + } + if res.Version != ProtocolVersion { + return nil, fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion) + } + return res, nil } -func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { - dat.CopyFromSourceId(&self.sourceId) - err = json.Unmarshal(jsonString, &dat) - // like in PlainDecoder, let the client decide how to use partial results - // (Unmarshal returns partial results in case of errors) +func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { + res.Version = sd.Version + res.CopyFromSourceId(&sd.SourceId) + if err = json.Unmarshal(jsonString, &res); err != nil { + return + } + if res.Version != sd.Version { + err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, sd.Version) + } return } -func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { - err = json.Unmarshal(jsonString, &dat) - return +// +// Encoder +// + +type Encoder interface { + Encode(data DataUpdateFull) ([]byte, error) } -func (self *PlainEncoder) Encode(data *StatisticsData) []byte { - res, err := json.Marshal(data) - if err != nil { - s5l.Panicln("failed to encode StatisticsData") - } - return res +type StatelessEncoder struct{} + +func NewStatelessEncoder() Encoder { + return &StatelessEncoder{} } -func (self *filterDecoder) Decode(jsonString []byte) (dat StatsFilter, err error) { - err = json.Unmarshal(jsonString, &dat) - return +func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) { + data.Version = ProtocolVersion + data.StartTime = data.StartTime.UTC() + return json.Marshal(data) } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 32f35dd..5a79d85 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -1,62 +1,150 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( + "fmt" "reflect" "testing" "time" ) var ( - sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"], "version": 1` - sourceIdData = `{` + sourceIdFields + `}` - sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}, Version: 1} - updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` - updateData = "{" + updateFields + "}" - updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} - testData = "{" + sourceIdFields + "," + updateFields + "}" + initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` + initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} + updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` + updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} ) -func GetExpected() *StatisticsData { - expected := new(StatisticsData) - expected.CopyFromSourceId(&sourceIdDataStruct) +func GetExpected() (expected DataUpdateFull) { + expected.Version = ProtocolVersion + expected.CopyFromSourceId(&initDataStruct) expected.CopyFromUpdate(&updateDataStruct) - return expected + return } func TestDecodeStateful(t *testing.T) { - dc, err := NewStatefulDecoder([]byte(sourceIdData)) + // invalid init message + if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil { + t.Fatalf("creating decoder with invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded) + if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("creating decoder with wrong protocol version should throw an error") + } + + // valid init message + statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) + dec, err := NewStatefulDecoder([]byte(statefulInitMsg)) if err != nil { - t.Errorf("Creating decoder failed with %v", err) - return + t.Fatalf("creating decoder failed: %v", err) + } + + // invalid data-update message + if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil { + t.Fatalf("decoding message with invalid json should throw an error") } - dat, err := dc.Decode([]byte(testData)) + + // wrong protocol version + wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded) + if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") + } + + // valid data-update message + statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded) + decoded, err := dec.Decode([]byte(statefulUpdateMsg)) if err != nil { - t.Errorf("Decode failed with %v", err) - return + t.Fatalf("decoding message failed: %v", err) } + + // compare with expected result expected := GetExpected() - if !reflect.DeepEqual(dat, *expected) { - t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } } -func TestDecodePlain(t *testing.T) { - ec := new(PlainDecoder) - dat, err := ec.Decode([]byte(testData)) +func TestDecodeStateless(t *testing.T) { + dec := NewStatelessDecoder() + + // invalid message + if _, err := dec.Decode([]byte("this is still not json")); err == nil { + t.Fatalf("decoding invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded) + if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") + } + + // valid message + statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded) + decoded, err := dec.Decode([]byte(statelessDataMsg)) if err != nil { - t.Errorf("Decode failed with %v", err) - return + t.Fatalf("decoding message failed: %v", err) } + + // compare with expected result expected := GetExpected() - if !reflect.DeepEqual(dat, *expected) { - t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } } -func TestEncode(t *testing.T) { - ec := new(PlainEncoder) - td := new(StatisticsData) - td.CopyFromSourceId(&sourceIdDataStruct) +func TestEncodeStateless(t *testing.T) { + enc := NewStatelessEncoder() + + var td DataUpdateFull + td.CopyFromSourceId(&initDataStruct) td.CopyFromUpdate(&updateDataStruct) - t.Logf("dada: %v", ec.Encode(td)) + + encoded, err := enc.Encode(td) + if err != nil { + t.Fatalf("encoding message failed: %v", err) + } + + decoded, err := NewStatelessDecoder().Decode(encoded) + if err != nil { + t.Fatalf("decoding message failed: %v", err) + } + + expected := td + expected.Version = ProtocolVersion + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("encoding failed:\n actual: %v\n expected: %v\n", decoded, expected) + } } diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go index 9b80f6f..4801902 100644 --- a/src/hub/src/spreadspace.org/sfive/s5log.go +++ b/src/hub/src/spreadspace.org/sfive/s5log.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -9,6 +41,6 @@ import ( var ( s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags) // use ioutil.Discard to switch that thing off - // s5tl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags) - s5tl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags) + // s5dl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags) + s5dl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags) ) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 9cf4c64..76805c3 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -1,40 +1,60 @@ -package sfive - -import "time" +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// -type appendManyToken struct { - data []StatisticsData - response chan bool -} +package sfive -type queryStatsResult struct { - stats StatsResult - err error +type appendToken struct { + data DataUpdateFull + response chan error } -type queryStatsToken struct { - filter *StatsFilter - response chan queryStatsResult +type appendManyToken struct { + data []DataUpdateFull + response chan error } type getUpdatesResult struct { - values []StatisticsData + values []DataUpdateFull err error } type getUpdatesAfterToken struct { id int - response chan getUpdatesResult -} - -type getUpdatesToken struct { - filter *StatsFilter + limit int response chan getUpdatesResult } type getHubIdResult struct { - id string - err error + id string } type getHubIdToken struct { @@ -50,141 +70,108 @@ type getLastUpdateIdToken struct { response chan getLastUpdateIdResult } -type StatsSinkServer struct { - store sqliteStore +type Server struct { + store Store quit chan bool done chan bool - appendData chan StatisticsData - appendManyData chan appendManyToken // chan []StatisticsData - getStatsChan chan queryStatsToken + appendChan chan appendToken + appendManyChan chan appendManyToken getUpdatesAfterChan chan getUpdatesAfterToken - getUpdatesChan chan getUpdatesToken getHubIdChan chan getHubIdToken getLastUpdateIdChan chan getLastUpdateIdToken } -func (self StatsSinkServer) appendActor() { - defer func() { self.done <- true }() +func (srv Server) appendActor() { + defer func() { srv.done <- true }() for { select { - case <-self.quit: + case <-srv.quit: return - case value := <-self.appendData: - var err error - for tryNum := 0; tryNum < 5; tryNum++ { - err = self.store.Append(value) - if err != nil { - time.Sleep(1 * time.Second) - } else { - break - } - } - if err != nil { - s5l.Printf("failed to store data: %v\n", err) - } - case token := <-self.appendManyData: - err := self.store.AppendMany(token.data) - if err != nil { - s5l.Printf("failed to store many data: %v\n", err) - token.response <- false - } else { - token.response <- true - } - case token := <-self.getStatsChan: - stats, err := self.store.GetStats(token.filter) - token.response <- queryStatsResult{stats, err} - case token := <-self.getUpdatesAfterChan: - values, err := self.store.GetUpdatesAfter(token.id) - token.response <- getUpdatesResult{values, err} - case token := <-self.getUpdatesChan: - values, err := self.store.GetUpdates(token.filter) + case token := <-srv.appendChan: + token.response <- srv.store.Append(token.data) + case token := <-srv.appendManyChan: + token.response <- srv.store.AppendMany(token.data) + case token := <-srv.getUpdatesAfterChan: + values, err := srv.store.GetUpdatesAfter(token.id, token.limit) token.response <- getUpdatesResult{values, err} - case token := <-self.getHubIdChan: - storeId, err := self.store.GetStoreId() - token.response <- getHubIdResult{storeId, err} - case token := <-self.getLastUpdateIdChan: - lastUpdateId, err := self.store.GetLastUpdateId() - if lastUpdateId != nil { - token.response <- getLastUpdateIdResult{*lastUpdateId, err} - } else { - token.response <- getLastUpdateIdResult{0, err} - } + case token := <-srv.getHubIdChan: + token.response <- getHubIdResult{srv.store.GetStoreId()} + case token := <-srv.getLastUpdateIdChan: + lastUpdateId, err := srv.store.GetLastUpdateId() + token.response <- getLastUpdateIdResult{lastUpdateId, err} } } } -func (self StatsSinkServer) getUpdatesAfterInvoke(id int) ([]StatisticsData, error) { - token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)} +func (srv Server) Append(data DataUpdateFull) error { + token := appendToken{data: data, response: make(chan error, 1)} defer close(token.response) - self.getUpdatesAfterChan <- token - res := <-token.response - return res.values, res.err + srv.appendChan <- token + return <-token.response } -func (self StatsSinkServer) getUpdatesInvoke(filter *StatsFilter) ([]StatisticsData, error) { - token := getUpdatesToken{filter: filter, response: make(chan getUpdatesResult, 1)} +func (srv Server) AppendMany(data []DataUpdateFull) error { + token := appendManyToken{data: data, response: make(chan error, 1)} defer close(token.response) - self.getUpdatesChan <- token - res := <-token.response - return res.values, res.err + srv.appendManyChan <- token + return <-token.response } -func (self StatsSinkServer) getStatsInvoke(filter *StatsFilter) (StatsResult, error) { - token := queryStatsToken{filter: filter, response: make(chan queryStatsResult, 1)} +func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) { + token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)} defer close(token.response) - self.getStatsChan <- token + srv.getUpdatesAfterChan <- token res := <-token.response - return res.stats, res.err + return res.values, res.err } -func (self StatsSinkServer) getHubIdInvoke() (string, error) { +func (srv Server) GetHubId() string { token := getHubIdToken{response: make(chan getHubIdResult, 1)} defer close(token.response) - self.getHubIdChan <- token + srv.getHubIdChan <- token res := <-token.response - return res.id, res.err + return res.id } -func (self StatsSinkServer) getLastUpdateIdInvoke() (int, error) { +func (srv Server) GetLastUpdateId() (int, error) { token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)} defer close(token.response) - self.getLastUpdateIdChan <- token + srv.getLastUpdateIdChan <- token res := <-token.response return res.id, res.err } -func (self StatsSinkServer) Close() { - self.quit <- true - <-self.done - close(self.quit) - close(self.done) - close(self.appendData) - close(self.appendManyData) - close(self.getStatsChan) - close(self.getUpdatesAfterChan) - close(self.getUpdatesChan) - close(self.getHubIdChan) - close(self.getLastUpdateIdChan) - self.store.Close() +func (srv Server) Close() { + s5l.Printf("server: shutting down\n") + srv.quit <- true + <-srv.done + close(srv.quit) + close(srv.done) + close(srv.appendChan) + close(srv.appendManyChan) + close(srv.getUpdatesAfterChan) + close(srv.getHubIdChan) + close(srv.getLastUpdateIdChan) + srv.store.Close() + s5l.Printf("server: finished\n") } -func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) { +func NewServer(dbPath string, readOnly bool) (server *Server, err error) { // TODO read configuration and create instance with correct settings - server = new(StatsSinkServer) - server.store, err = NewStore(mysql, dbPath) + server = &Server{} + server.store, err = NewStore(dbPath, readOnly) if err != nil { return } server.quit = make(chan bool) server.done = make(chan bool) - server.appendData = make(chan StatisticsData, 5) - server.appendManyData = make(chan appendManyToken, 5) - server.getStatsChan = make(chan queryStatsToken, 5) - server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) - server.getUpdatesChan = make(chan getUpdatesToken, 3) - server.getHubIdChan = make(chan getHubIdToken, 1) - server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1) + server.appendChan = make(chan appendToken, 32) + server.appendManyChan = make(chan appendManyToken, 32) + server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32) + server.getHubIdChan = make(chan getHubIdToken, 32) + server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32) go server.appendActor() + s5l.Printf("server: started\n") return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index a072b2a..d6874de 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -9,24 +41,19 @@ import ( "time" ) -func findMaxId(values []StatisticsData) int { +func findMaxId(values []DataUpdateFull) int { maxId := -1 for i := range values { id := values[i].SourceHubDataUpdateId - if id != nil && *id > maxId { - maxId = *id + if id > maxId { + maxId = id } } return maxId } -func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { - storeId, err = self.getHubIdInvoke() - - if err != nil { - s5l.Printf("fwd: failed to get own hubid: %v\n", err) - return - } +func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { + storeId = srv.GetHubId() var resp *http.Response resp, err = client.Get(baseurl + "/lastupdate/" + storeId) @@ -63,11 +90,11 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } -func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { +func (srv Server) forwardRun(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: for { - lastId, _, err := self.getLastUpdate(baseurl, client) + lastId, _, err := srv.forwardGetLastUpdate(baseurl, client) if err != nil { s5l.Printf("fwd: lastupdate returned err: %v", err) @@ -79,7 +106,7 @@ tryResync: nextBatch: for { - updates, err := self.getUpdatesAfterInvoke(lastId) + updates, err := srv.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -93,7 +120,7 @@ tryResync: continue nextBatch } - data, err := json.Marshal(StatisticsDataContainer{updates}) + data, err := json.Marshal(DataUpdateFullContainer{updates}) if err != nil { s5l.Panicf("fwd: encode failed: %v\n", err) @@ -116,11 +143,10 @@ tryResync: s5l.Printf("fwd: post OK") lastId = findMaxId(updates) s5l.Printf("fwd: new lastid: %d", lastId) - time.Sleep(1 * time.Second) } } } -func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { - self.handleForwarding(forwardBaseUrl, http.DefaultClient) +func (srv Server) RunForwarding(forwardBaseUrl string) { + srv.forwardRun(forwardBaseUrl, http.DefaultClient) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 19abb1e..8880b8a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -10,23 +42,18 @@ import ( "time" ) -const lastUpdateJson = `{ +const forwardEsLastUpdateJson = `{ "query": {"match": { "SourceHubUuid": "%s" } }, "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } }` -func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) { +func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { url := baseurl + "/dataupdate/_search?search_type=count" - storeId, err = self.getHubIdInvoke() - - if err != nil { - s5l.Printf("fwd-es: failed to get own hubid: %v\n", err) - return - } + storeId = srv.GetHubId() - queryJson := fmt.Sprintf(lastUpdateJson, storeId) - s5tl.Printf("fwd-es: query: %s", queryJson) + queryJson := fmt.Sprintf(forwardEsLastUpdateJson, storeId) + s5dl.Printf("fwd-es: query: %s", queryJson) var resp *http.Response resp, err = client.Post(url, "application/json", strings.NewReader(queryJson)) @@ -48,7 +75,7 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) return } - s5tl.Printf("fwd-es: lastupdate response: %s\n", body) + s5dl.Printf("fwd-es: lastupdate response: %s\n", body) if len(body) == 0 { latestId = -1 @@ -73,11 +100,11 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) return } -func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { +func (srv Server) forwardEsRun(baseurl string, client *http.Client) { url := baseurl + "/_bulk" tryResync: for { - lastId, _, err := self.getLastUpdateEs(baseurl, client) + lastId, _, err := srv.forwardEsGetLastUpdate(baseurl, client) if err != nil { s5l.Printf("fwd-es: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -87,7 +114,7 @@ tryResync: nextBatch: for { - updates, err := self.getUpdatesAfterInvoke(lastId) + updates, err := srv.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd-es: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -114,8 +141,6 @@ tryResync: postData.WriteRune('\n') } - //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes())) - s5l.Printf("fwd-es: marshal OK") resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) @@ -136,11 +161,10 @@ tryResync: s5l.Printf("fwd-es: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-es: new lastid: %d", lastId) - //time.Sleep(1 * time.Second) } } } -func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { - self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) +func (srv Server) RunForwardingEs(forwardBaseUrl string) { + srv.forwardEsRun(forwardBaseUrl, http.DefaultClient) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index 9779960..2c0043b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -7,8 +39,8 @@ import ( "github.com/equinox0815/graphite-golang" ) -func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) { - latestId, err = self.getLastUpdateIdInvoke() +func (srv Server) forwardGraphiteGetLastUpdate(conn *graphite.Graphite) (latestId int, storeId string, err error) { + latestId, err = srv.GetLastUpdateId() if err != nil { s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err) return @@ -17,7 +49,7 @@ func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (late return } -func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) { +func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) { tryResync: for { client, err := graphite.NewGraphiteFromAddress(forwardHost) @@ -27,7 +59,7 @@ tryResync: continue tryResync } - lastId, _, err := self.getLastUpdateGraphite(client) + lastId, _, err := srv.forwardGraphiteGetLastUpdate(client) if err != nil { s5l.Printf("fwd-graphite: lastupdate returned err: %v", err) client.Disconnect() @@ -38,7 +70,7 @@ tryResync: nextBatch: for { - updates, err := self.getUpdatesAfterInvoke(lastId) + updates, err := srv.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd-graphite: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -75,11 +107,10 @@ tryResync: s5l.Printf("fwd-graphite: all metrics sent") lastId = findMaxId(updates) s5l.Printf("fwd-graphite: new lastid: %d", lastId) - //time.Sleep(1 * time.Second) } } } -func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) { - self.handleForwardingToGraphite(forwardHost, basePath) +func (srv Server) RunForwardingGraphite(forwardHost string, basePath string) { + srv.forwardGraphiteRun(forwardHost, basePath) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 5a25622..3d44500 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -12,15 +44,15 @@ import ( "time" ) -type PiwikBulkRequest struct { +type forwardPiwikBulkRequest struct { Requests []string `json:"requests"` TokenAuth string `json:"token_auth"` } -func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) { +func (srv Server) forwardPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) { // TODO: ask piwik what the last update was... - latestId, err = 0, nil //self.getLastUpdateIdInvoke() + latestId, err = 0, nil //srv.getLastUpdateIdInvoke() if err != nil { s5l.Printf("fwd-piwik: failed to get own hubid: %v\n", err) return @@ -29,10 +61,10 @@ func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID return } -func (self StatsSinkServer) handleForwardingToPiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { +func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { tryResync: for { - lastId, _, err := self.getLastUpdatePiwik(piwikURL, siteURL, siteID, token, client) + lastId, _, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client) if err != nil { s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -42,7 +74,7 @@ tryResync: nextBatch: for { - updates, err := self.getUpdatesAfterInvoke(lastId) + updates, err := srv.GetUpdatesAfter(lastId, 5000) if err != nil { s5l.Printf("fwd-piwik: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -56,7 +88,7 @@ tryResync: continue nextBatch } - req := PiwikBulkRequest{TokenAuth: token} + req := forwardPiwikBulkRequest{TokenAuth: token} for _, update := range updates { if len(update.Data.Clients) == 0 { continue @@ -101,11 +133,10 @@ tryResync: s5l.Printf("fwd-piwik: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-piwik: new lastid: %d", lastId) - //time.Sleep(1 * time.Second) } } } -func (self StatsSinkServer) RunForwardingToPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) { - self.handleForwardingToPiwik(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient) +func (srv Server) RunForwardingPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) { + srv.forwardPiwikRun(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 8084461..4b48d99 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -1,15 +1,50 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( "bufio" + "io" "net" ) -func (self StatsSinkServer) handleConnection(conn net.Conn) { +func (srv Server) pipeHandle(conn net.Conn) { reader := bufio.NewReader(conn) buffer, err := reader.ReadBytes('\n') if err != nil { - s5l.Printf("pipe: failed to read from connection: %v\n", err) + if err != io.EOF { + s5l.Printf("pipe: failed to read from connection: %v\n", err) + } return } marshaller, err := NewStatefulDecoder(buffer) @@ -21,42 +56,47 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) { for { buffer, err := reader.ReadBytes('\n') if err != nil { - s5l.Printf("pipe: failed to read from connection: %v\n", err) + if err != io.EOF { + s5l.Printf("pipe: failed to read from connection: %v\n", err) + } return } - // s5l.Printf("msg: %v", string(buffer)) - value, err := marshaller.Decode(buffer) if err != nil { s5l.Printf("pipe: failed to decode message: %v\n", err) continue } - self.appendData <- value + if err = srv.Append(value); err != nil { + s5l.Printf("pipe: failed to store data: %v\n", err) + } } } -func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) { - decoder := NewPlainDecoder() +func (srv Server) pipegramHandle(pconn net.PacketConn) { + decoder := NewStatelessDecoder() buffer := make([]byte, 64*1024) for { n, _, err := pconn.ReadFrom(buffer) if err != nil { - s5l.Printf("p-pipe: failed read: %v", err) + s5l.Printf("pipegram: failed read: %v", err) continue } data := buffer[0:n] value, err := decoder.Decode(data) - if err == nil { - self.appendData <- value - } else { - s5l.Printf("p-pipe: failed to decode message: %v\n", err) + if err != nil { + s5l.Printf("pipegram: failed to decode message: %v\n", err) + continue + } + + if err = srv.Append(value); err != nil { + s5l.Printf("pipegram: failed to store data: %v\n", err) } } } -func (self StatsSinkServer) ServePipe(pipePath string) { +func (srv Server) ServePipe(pipePath string) { ln, err := net.Listen("unix", pipePath) if err != nil { s5l.Printf("pipe: failed to connect: %v", err) @@ -71,17 +111,17 @@ func (self StatsSinkServer) ServePipe(pipePath string) { // ignore continue } - go self.handleConnection(conn) + go srv.pipeHandle(conn) } } -func (self StatsSinkServer) ServeGramPipe(pipePath string) { +func (srv Server) ServePipegram(pipePath string) { pconn, err := net.ListenPacket("unixgram", pipePath) if err != nil { - s5l.Printf("p-pipe: failed to listen: %v", err) + s5l.Printf("pipegram: failed to listen: %v", err) return } defer pconn.Close() - self.handlePacketConn(pconn) + srv.pipegramHandle(pconn) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index af9e986..80af8c5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -1,3 +1,35 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( @@ -5,26 +37,25 @@ import ( "fmt" "io/ioutil" "net/http" - "os" "strconv" - "time" "github.com/zenazn/goji" "github.com/zenazn/goji/web" ) -func hello(c web.C, w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Hello, %s!", c.URLParams["name"]) +func (srv Server) webHealthz(c web.C, w http.ResponseWriter, r *http.Request) { + // TODO: do a more sophisticated check + fmt.Fprintf(w, "%s\n", srv.GetHubId()) } -func (self StatsSinkServer) getTagList(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "tags" - values, err := self.store.GetTags() +func (srv Server) webGetHubsList(c web.C, w http.ResponseWriter, r *http.Request) { + const resourceName = "hubs" + values, err := srv.store.GetHubs() if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - jsonString, err := json.Marshal(DataContainer{values}) + jsonString, err := json.Marshal(GenericDataContainer{values}) if err != nil { http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) return @@ -32,14 +63,14 @@ func (self StatsSinkServer) getTagList(c web.C, w http.ResponseWriter, r *http.R fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) getSourcesList(c web.C, w http.ResponseWriter, r *http.Request) { +func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "sources" - values, err := self.store.GetSources() + values, err := srv.store.GetSources() if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - jsonString, err := json.Marshal(DataContainer{values}) + jsonString, err := json.Marshal(GenericDataContainer{values}) if err != nil { http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) return @@ -47,16 +78,20 @@ func (self StatsSinkServer) getSourcesList(c web.C, w http.ResponseWriter, r *ht fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Request) { +func (srv Server) webGetSource(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "source" id, err := strconv.ParseInt(c.URLParams["id"], 10, 64) if err != nil { http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest) return } - value, err := self.store.GetSource(int(id)) + value, err := srv.store.GetSource(int(id)) if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + if err == ErrNotFound { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound) + } else { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + } return } jsonString, err := json.Marshal(value) @@ -67,78 +102,14 @@ func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Re fmt.Fprintf(w, "%s", jsonString) } -func getFilter(r *http.Request) (filter StatsFilter) { - from := r.FormValue("from") - if from != "" { - fromT, err := time.Parse(time.RFC3339, from) - if err == nil { - filter.start = &fromT - } - } - - to := r.FormValue("to") - if to != "" { - toT, err := time.Parse(time.RFC3339, to) - if err == nil { - filter.end = &toT - } - } - - hostname := r.FormValue("hostname") - if hostname != "" { - filter.hostname = &hostname - } - - contentId := r.FormValue("contentId") - if contentId != "" { - filter.contentId = &contentId - } - - format := r.FormValue("format") - if format != "" { - filter.format = &format - } - - quality := r.FormValue("quality") - if quality != "" { - filter.quality = &quality - } - - afterUpdateId := r.FormValue("afterUpdateId") - if afterUpdateId != "" { - id, err := strconv.ParseInt(afterUpdateId, 10, 32) - if err == nil { - idInt := int(id) - filter.afterUpdateId = &idInt - } - } - - limit := r.FormValue("limit") - if limit != "" { - limitInt, err := strconv.ParseInt(limit, 10, 32) - if err == nil { - limitIntInt := int(limitInt) - filter.limit = &limitIntInt - } - } - - sortOrder := r.FormValue("sortOrder") - if sortOrder != "" { - filter.sortOrder = &sortOrder - } - - return -} - -func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { +func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "updates" - filter := getFilter(r) - values, err := self.getUpdatesInvoke(&filter) + values, err := srv.GetUpdatesAfter(-1, 3) // TODO: get start and limit from param if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - jsonString, err := json.Marshal(DataContainer{values}) + jsonString, err := json.Marshal(GenericDataContainer{values}) if err != nil { http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) return @@ -146,16 +117,20 @@ func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *htt fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) { +func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" id, err := strconv.ParseInt(c.URLParams["id"], 10, 64) if err != nil { http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest) return } - value, err := self.store.GetUpdate(int(id)) + value, err := srv.store.GetUpdate(int(id)) if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + if err == ErrNotFound { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound) + } else { + http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) + } return } jsonString, err := json.Marshal(value) @@ -166,9 +141,9 @@ func (self StatsSinkServer) getUpdate(c web.C, w http.ResponseWriter, r *http.Re fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) { +func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" - decoder := NewPlainDecoder() + decoder := NewStatelessDecoder() buffer, err := ioutil.ReadAll(r.Body) if err != nil { @@ -177,17 +152,14 @@ func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.R return } - container := StatisticsDataContainer{} + // TODO: add different API endpoint called bulk + container := DataUpdateFullContainer{} err = json.Unmarshal(buffer, &container) if err == nil { - token := appendManyToken{ - data: container.Data, - response: make(chan bool, 2)} - defer close(token.response) - self.appendManyData <- token - success := <-token.response - if !success { - http.Error(w, "failed to store data", http.StatusInternalServerError) + if err = srv.AppendMany(container.Data); err != nil { + http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) + } else { + fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data)) } return } @@ -200,59 +172,43 @@ func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.R return } - self.appendData <- data - // TODO send response channel, wait for OK + if err = srv.Append(data); err != nil { + http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) + } else { + fmt.Fprintf(w, "1 update successfully stored.") + } } -func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { +func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "lastupdateid" - id := c.URLParams["id"] - value, err := self.store.GetLastUpdateForUuid(id) + value, err := srv.store.GetLastUpdateId() if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - if value != nil { - fmt.Fprintf(w, "%d", *value) - } + fmt.Fprintf(w, "%d", value) } -func (self StatsSinkServer) getStats(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "stats" - filter := getFilter(r) - values, err := self.getStatsInvoke(&filter) - +func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { + const resourceName = "lastupdateid" + id := c.URLParams["id"] + value, err := srv.store.GetLastUpdateForUuid(id) if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return } - jsonString, err := json.Marshal(values) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, "%s", jsonString) + fmt.Fprintf(w, "%d", value) } -func (self StatsSinkServer) ServeWeb(vizAppLocation string) { - if _, err := os.Stat(vizAppLocation); err != nil { - if os.IsNotExist(err) { - s5l.Panicf("web: viz-app at %s does not exist.", vizAppLocation) - } else { - s5l.Printf("web: failed to stat %s: %v", vizAppLocation, err) - } - } - - goji.Get("/hello/:name", hello) - goji.Get("/tags", self.getTagList) - goji.Get("/sources", self.getSourcesList) - goji.Get("/sources/:id", self.getSource) - goji.Get("/updates", self.getUpdateList) - goji.Get("/updates/:id", self.getUpdate) - goji.Post("/updates", self.postUpdate) - goji.Get("/lastupdate/:id", self.getLastUpdateIdForUuid) - goji.Get("/stats", self.getStats) - goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation)))) - +func (srv Server) ServeWeb() { + goji.Get("/healthz", srv.webHealthz) + goji.Get("/hubs", srv.webGetHubsList) + goji.Get("/sources", srv.webGetSourcesList) + goji.Get("/sources/:id", srv.webGetSource) + goji.Get("/updates", srv.webGetUpdateList) + goji.Get("/updates/:id", srv.webGetUpdate) + goji.Post("/updates", srv.webPostUpdate) + goji.Get("/lastupdate", srv.webGetLastUpdateId) + goji.Get("/lastupdate/:id", srv.webGetLastUpdateIdForUuid) goji.Serve() } diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 6fb5f8a..a59166f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -1,643 +1,562 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( - "database/sql" + "encoding/json" + "errors" "fmt" + "os" "time" - _ "github.com/mattn/go-sqlite3" + "github.com/boltdb/bolt" "github.com/pborman/uuid" - "gopkg.in/gorp.v2" ) -type sqliteStore struct { - db *gorp.DbMap - hubId string -} - -func tagsFromStatisticsData(value StatisticsData) []tagDb { - tags := make([]tagDb, len(value.SourceId.Tags)) - for i := range value.SourceId.Tags { - tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]} - } - return tags -} - -func sourceFromStatisticsData(value StatisticsData) sourceDb { - return sourceDb{ - -1, - StreamId{ - ContentId: value.SourceId.StreamId.ContentId, - Format: value.SourceId.StreamId.Format, - Quality: value.SourceId.StreamId.Quality, - }, - SourceId{ - Hostname: value.SourceId.Hostname}, - } -} - -func clientsFromStatisticsData(value StatisticsData) []clientDataDb { - res := make([]clientDataDb, len(value.Data.Clients)) - for i := range value.Data.Clients { - res[i] = clientDataDb{-1, -1, value.Data.Clients[i]} - } - return res -} - -func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { - return dataUpdateDb{ - -1, - -1, - value.SourceHubUuid, - value.SourceHubDataUpdateId, - value.StartTime.Unix(), - value.Duration, - value.Data.ClientCount, - value.Data.BytesReceived, - value.Data.BytesSent} -} +const ( + StoreGetUpdatesLimit = 42000 + StoreVersion = 1 +) -func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) { - du := dataUpdateFromStatisticsData(value) - cd := clientsFromStatisticsData(value) - src := sourceFromStatisticsData(value) - tags := tagsFromStatisticsData(value) +var ( + storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, dataUpdatesBn, + sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn} +) - return du, cd, src, tags +type Store struct { + version int + hubUuid string + db *bolt.DB + readOnly bool } -func initDb(mysql bool, path string) (res *gorp.DbMap, hubId string, err error) { - // connect to db using standard Go database/sql API - var db *sql.DB - var dialect gorp.Dialect +// +// Initialization and Destruction +// - if mysql { - db, err = sql.Open("mysql", path) - if err != nil { - return +func openDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid string, err error) { + if _, err = os.Stat(dbPath); err != nil { + if os.IsNotExist(err) { + err = nil } - dialect = gorp.MySQLDialect{Engine: "InnoDB", Encoding: "UTF8"} - } else { - db, err = sql.Open("sqlite3", path) - if err != nil { - return - } - dialect = gorp.SqliteDialect{} + return } - - dbmap := &gorp.DbMap{Db: db, Dialect: dialect} - // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds)) - - dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) - dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId") - dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname") - dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id") - dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id") - dbmap.AddTableWithName(hubInfoDb{}, hubInfoTn).SetKeys(false, "Name") - - // TODO use some real migration, yadda yadda - err = dbmap.CreateTablesIfNotExists() + opts := &bolt.Options{Timeout: 100 * time.Millisecond, ReadOnly: readOnly} + db, err = bolt.Open(dbPath, 0600, opts) if err != nil { return } - hubId, err = dbmap.SelectStr("select Value from " + hubInfoTn + " where Name = 'HubUuid'") - // TODO handle only not-found this way - if err != nil || hubId == "" { - hubId = uuid.New() - _, err = db.Exec("insert into "+hubInfoTn+" values ('HubUuid', ?)", hubId) - if err != nil { - hubId = "" - return + err = db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(hubInfoBn)) + if b == nil { + return errors.New("store: failed to open, bucket '" + hubInfoBn + "'does not exist.") + } + bVersion := b.Get([]byte(storeVersionKey)) + if bVersion == nil { + return errors.New("store: failed to open, no hub version found.") + } + version = btoi(bVersion) + if version != StoreVersion { + return fmt.Errorf("store: failed to open, wrong hub version: %d (expected: %d)", version, StoreVersion) } - } - - res = dbmap - return -} -func isEmptyFilter(filter *StatsFilter) bool { - if filter == nil { - return true - } - if filter.start == nil && - filter.end == nil && - filter.hostname == nil && - filter.contentId == nil && - filter.format == nil && - filter.quality == nil && - (filter.tagsAny == nil || len(filter.tagsAny) == 0) && - filter.afterUpdateId == nil && - filter.limit == nil { - return true - } - return false -} + bHubId := b.Get([]byte(hubUuidKey)) + if bHubId != nil { + hubUuid = string(bHubId) + } + if hubUuid == "" { + return errors.New("store: failed to open, UUID does not exist or is empty.") + } -func insertAnd(needsAnd *bool) (res string) { - if *needsAnd { - res = " and" - *needsAnd = false + for _, bn := range storeBuckets { + if b := tx.Bucket([]byte(bn)); b == nil { + return errors.New("store: failed to open, bucket '" + bn + "' does not exist.") + } + } + return nil + }) + if err != nil { + db.Close() } return } -type dataUpdateQueryResult struct { - dataUpdateDb - StreamId - SourceId -} - -func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) { - const baseQuery = "(select * from " + dataUpdatesTn + "," + sourcesTn + " on " + dataUpdatesTn + ".SourceId = " + sourcesTn + ".Id" - if isEmptyFilter(filter) { - return baseQuery + ")", nil +func createDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err error) { + db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}) + if err != nil { + return } - query := baseQuery - parameters := make(map[string]interface{}) - needsAnd := false - - if filter.start != nil || filter.end != nil || filter.afterUpdateId != nil { - query += " WHERE" - - if filter.start != nil { - query += insertAnd(&needsAnd) - query += " StartTime >= :filterstart" - parameters["filterstart"] = filter.start.Unix() - needsAnd = true + err = db.Update(func(tx *bolt.Tx) error { + for _, bn := range storeBuckets { + if _, err := tx.CreateBucket([]byte(bn)); err != nil { + return err + } } - if filter.end != nil { - query += insertAnd(&needsAnd) - query += " StartTime < :filterend" - parameters["filterend"] = filter.end.Unix() - needsAnd = true + + b, err := tx.CreateBucket([]byte(hubInfoBn)) + if err != nil { + return err } - if filter.afterUpdateId != nil { - query += insertAnd(&needsAnd) - query += " " + dataUpdatesTn + ".Id > :afterUpdateId" - parameters["afterUpdateId"] = *filter.afterUpdateId - needsAnd = true + version = StoreVersion + if err := b.Put([]byte(storeVersionKey), itob(version)); err != nil { + return err } - } - - if filter.sortOrder != nil { - if *filter.sortOrder == "desc" { - query += " ORDER BY " + dataUpdatesTn + ".Id DESC" + hubUuid = uuid.New() + if err := b.Put([]byte(hubUuidKey), []byte(hubUuid)); err != nil { + return err } + return nil + }) + if err != nil { + db.Close() } - if filter.limit != nil { - query += " LIMIT :limit" - parameters["limit"] = *filter.limit - } - - // TODO other fields - query += ")" - return query, parameters -} -func (s sqliteStore) findTag(name string) (tag *tagDb, err error) { - t := tagDb{} - err = s.db.SelectOne(&t, "select * from "+tagsTn+" where Name = ?", name) - if err == nil { - tag = &t - } return } -func (s sqliteStore) insertNewTags(tags []tagDb) (err error) { - for i := range tags { - var t *tagDb - t, err = s.findTag(tags[i].Name) - if err != nil { - _, err = s.db.Exec("insert into "+tagsTn+" VALUES (NULL, ?)", tags[i].Name) - } - t, err = s.findTag(tags[i].Name) +func NewStore(dbPath string, readOnly bool) (Store, error) { + db, version, hubid, err := openDb(dbPath, readOnly) + if err != nil { + return Store{}, err + } - if err == nil { - tags[i] = *t + if db != nil { + if readOnly { + s5l.Printf("store: opened read-only (UUID: %s)", hubid) } else { - break + s5l.Printf("store: opened (UUID: %s)", hubid) } + return Store{version, hubid, db, readOnly}, nil } - return -} - -func (s sqliteStore) findSource(src sourceDb) (res *sourceDb, err error) { - t := sourceDb{} - err = s.db.SelectOne( - &t, - "select Id from "+sourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?", - src.ContentId, - src.Format, - src.Quality, - src.Hostname) + if readOnly { + return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.") + } - if err == nil { - res = &t + db, version, hubid, err = createDb(dbPath) + if err != nil { + return Store{}, err } + s5l.Printf("store: initialized (UUID: %s)", hubid) + return Store{version, hubid, db, readOnly}, nil +} - return +func (st Store) Close() { + s5l.Printf("store: closing") + st.db.Close() } -func (s sqliteStore) insertNewSource(src *sourceDb) (err error) { - var t *sourceDb - t, err = s.findSource(*src) - if err == nil { - *src = *t - } else { - err = s.db.Insert(src) +// +// Append data +// + +// append key-value pairs to buckets + +func (st Store) insertNewHub(tx *bolt.Tx, hubUuid string) (hubId int, err error) { + if hubUuid == "" { + return } - return -} + bf := tx.Bucket([]byte(hubUuidsFwdBn)) + bf.FillPercent = 1.0 // we only do appends + br := tx.Bucket([]byte(hubUuidsRevBn)) + br.FillPercent = 1.0 // we only do appends -func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error) { - st := make([]sourceTagsDb, len(tags)) - for i := range tags { - st[i].TagId = tags[i].Id - st[i].SourceId = src.Id - } - for i := range st { - _, err = s.db.Exec( - "insert or ignore into "+sourceTagsTn+" values (?,?)", - st[i].TagId, - st[i].SourceId) - // err = s.db.Insert(&st[i]) - if err != nil { - // TODO - //fmt.Printf("st\n") - return - } + bHubId := bf.Get([]byte(hubUuid)) + if bHubId != nil { + return btoi(bHubId), nil } - return -} -func (s sqliteStore) insertDataUpdateEntry(src sourceDb, du *dataUpdateDb) (err error) { - du.SourceId = src.Id - err = s.db.Insert(du) - if err != nil { - //fmt.Printf("du\n") + next, _ := bf.NextSequence() + hubId = int(next) + if err = bf.Put([]byte(hubUuid), itob(hubId)); err != nil { return } - return -} - -func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpdateDb) (err error) { - for i := range cd { - cd[i].DataUpdatesId = du.Id - err = s.db.Insert(&cd[i]) - if err != nil { - // TODO - return - } + if err = br.Put(itob(hubId), []byte(hubUuid)); err != nil { + return } - return + + return hubId, err } -func (s sqliteStore) appendItem(du dataUpdateDb, cd []clientDataDb, src sourceDb, tags []tagDb) (err error) { - err = s.insertNewTags(tags) - if err != nil { - return +func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) { + bf := tx.Bucket([]byte(sourcesFwdBn)) + // br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id) + br := tx.Bucket([]byte(sourcesRevBn)) + br.FillPercent = 1.0 // we only do appends (with ever incrementing interger ids) + + slug := src.Slug() + bSrcId := bf.Get([]byte(slug)) + if bSrcId != nil { + return btoi(bSrcId), nil } - err = s.insertNewSource(&src) - if err != nil { - //fmt.Printf("src\n") + var jsonData []byte + if jsonData, err = json.Marshal(src); err != nil { return } - err = s.insertSourceTagLinks(src, tags) - if err != nil { + next, _ := bf.NextSequence() + srcId = int(next) + if err = bf.Put([]byte(slug), itob(srcId)); err != nil { return } - - err = s.insertDataUpdateEntry(src, &du) - if err != nil { + if err = br.Put(itob(srcId), jsonData); err != nil { return } - err = s.insertDataUpdateClientEntries(cd, du) - if err != nil { + return srcId, err +} + +func (st Store) insertDataUpdate(tx *bolt.Tx, du dataUpdateDb) (duId int, err error) { + b := tx.Bucket([]byte(dataUpdatesBn)) + b.FillPercent = 1.0 // we only do appends + + next, _ := b.NextSequence() + duId = int(next) + + var jsonData []byte + if jsonData, err = json.Marshal(du); err != nil { return } - + err = b.Put(itob(duId), jsonData) return } -// this function is the biggest pile of copy/pasted crap while sick that is still compilable. -func (s sqliteStore) Append(update StatisticsData) (err error) { - var tx *gorp.Transaction - tx, err = s.db.Begin() - if err != nil { - return +func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) { + bf := tx.Bucket([]byte(userAgentsFwdBn)) + bf.FillPercent = 1.0 // we only do appends + br := tx.Bucket([]byte(userAgentsRevBn)) + br.FillPercent = 1.0 // we only do appends + + bUaId := bf.Get([]byte(ua)) + if bUaId != nil { + return btoi(bUaId), nil } - du, cd, src, tags := updateFromStatisticsData(update) - //s5l.Printf("blah: %v", du) - err = s.appendItem(du, cd, src, tags) - if err != nil { - tx.Rollback() + next, _ := bf.NextSequence() + uaId = int(next) + if err = bf.Put([]byte(ua), itob(uaId)); err != nil { + return + } + if err = br.Put(itob(uaId), []byte(ua)); err != nil { return } - return tx.Commit() + return uaId, err } -func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) { - var tx *gorp.Transaction - tx, err = s.db.Begin() - if err != nil { - return +func (st Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error { + if len(cd) == 0 { + return nil } - for _, update := range updates { - du, cd, src, tags := updateFromStatisticsData(update) - err = s.appendItem(du, cd, src, tags) + b := tx.Bucket([]byte(clientDataBn)) + b.FillPercent = 1.0 // we only do appends + + data := []clientDataDb{} + for _, c := range cd { + uaId, err := st.insertNewUserAgent(tx, c.UserAgent) if err != nil { - tx.Rollback() - return + return err } + data = append(data, clientDataDb{c.Ip, uaId, c.BytesSent}) } - return tx.Commit() + jsonData, err := json.Marshal(data) + if err != nil { + return err + } + return b.Put(itob(duId), jsonData) } -func castArrayToString(value []interface{}) []string { - res := make([]string, len(value)) - for i := range value { - res[i] = value[i].(string) +func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error { + b := tx.Bucket([]byte(latestUpdatesBn)) + b.FillPercent = 1.0 // we only do appends + + last := b.Get([]byte(uuid)) + if last != nil && btoi(last) > duId { + return nil } - return res + return b.Put([]byte(uuid), itob(duId)) } -func (s sqliteStore) GetTags() ([]string, error) { - res, dbErr := s.db.Select("", "select Name from "+tagsTn) - if dbErr == nil { - sRes := castArrayToString(res) - return sRes, nil +// Split up the multidimensional dataupdate and append all the key-value pairs + +func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) { + du := NewDataUpdateDb(update) + + srcUuid := update.SourceHubUuid + if du.SourceHubId, err = st.insertNewHub(tx, srcUuid); err != nil { + return + } + if du.SourceId, err = st.insertNewSource(tx, NewSourceDb(update)); err != nil { + return + } + if duId, err = st.insertDataUpdate(tx, du); err != nil { + return + } + if err = st.insertClientData(tx, duId, update.Data.Clients); err != nil { + return } - return nil, dbErr -} -func (s sqliteStore) GetTagsByDataUpdateId(id int) (res []string, err error) { - var qres []interface{} - qres, err = s.db.Select( - tagDb{}, - "select * from "+tagsTn+" where Id in (select TagId from "+sourceTagsTn+" where SourceId = (select SourceId from "+dataUpdatesTn+" where Id = ?))", id) - if err == nil { - res = make([]string, len(qres)) - for i := range qres { - res[i] = qres[i].(*tagDb).Name - } + if srcUuid != "" { + err = st.setLastUpdateForUuid(tx, srcUuid, du.SourceHubDataUpdateId) + } + if fwdUuid := update.ForwardHubUuid; fwdUuid != "" { + err = st.setLastUpdateForUuid(tx, fwdUuid, update.ForwardHubDataUpdateId) } return } -func (s sqliteStore) GetSources() (res []sourceDb, err error) { - var qres []interface{} - qres, err = s.db.Select(sourceDb{}, "select * from "+sourcesTn) - if err == nil { - res = make([]sourceDb, len(qres)) - for i := range qres { - res[i] = *qres[i].(*sourceDb) - } +// Public Append Interface + +func (st Store) AppendMany(updates []DataUpdateFull) (err error) { + if st.readOnly { + return ErrReadOnly } - return + return st.db.Update(func(tx *bolt.Tx) error { + for _, update := range updates { + if _, err := st.appendItem(tx, update); err != nil { + return err + } + } + return nil + }) } -func (s sqliteStore) GetSource(id int) (res sourceDb, err error) { - err = s.db.SelectOne(&res, "select * from "+sourcesTn+" where Id = ?", id) - return +func (st Store) Append(update DataUpdateFull) (err error) { + return st.AppendMany([]DataUpdateFull{update}) } -func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { - err = s.db.SelectOne(&res, "select * from "+dataUpdatesTn+" where Id = ?", id) - return -} +// +// Fetch data +// -func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) { - var qres []interface{} - qres, err = s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id) - if err == nil { - res = make([]clientDataDb, len(qres)) - for i := range qres { - res[i] = *qres[i].(*clientDataDb) - } +// fetch key-value pairs from buckets + +func (st Store) getHub(tx *bolt.Tx, id int) string { + b := tx.Bucket([]byte(hubUuidsRevBn)) + uuid := b.Get(itob(id)) + if uuid == nil { + return "" } - return + return string(uuid) } -var ( - updateColumnSelect = ` - Id, - SourceHubUuid, - SourceHubDataUpdateId, - StartTime, - Duration, - ClientCount, - BytesReceived, - BytesSent, - Hostname, - ContentId, - Format, - Quality -` -) +func (st Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { + b := tx.Bucket([]byte(sourcesRevBn)) -func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) { - var clientsDb []clientDataDb - clientsDb, err = s.GetClientsByUpdateId(dat.Id) - if err != nil { - s5l.Printf("store GetClients failed: %v", err) + jsonData := b.Get(itob(id)) + if jsonData == nil { + err = ErrNotFound return } - tagsDb, err := s.GetTagsByDataUpdateId(dat.Id) - if err != nil { - s5l.Printf("store GetClients failed: %v", err) + if err = json.Unmarshal(jsonData, &res); err != nil { return } - res.CopyFromDataUpdateDb(dat.dataUpdateDb, s.hubId) - res.Hostname = dat.Hostname - res.StreamId.ContentId = dat.ContentId - res.StreamId.Format = dat.Format - res.StreamId.Quality = dat.Quality - res.CopyFromClientDataDb(clientsDb) - res.Tags = tagsDb return } -func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) { - res = make([]StatisticsData, len(dat)) - for i := range dat { - t := *dat[i].(*dataUpdateQueryResult) - res[i], _ = s.CreateStatisticsDataFrom(t) - } - return -} +func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) { + bc := tx.Bucket([]byte(clientDataBn)) + bu := tx.Bucket([]byte(userAgentsRevBn)) -func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { - limit := 5000 - sourceSql, parameters := getFilteredDataUpdateSelect(&StatsFilter{afterUpdateId: &id, limit: &limit}) - sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql - var updates []interface{} - updates, err = s.db.Select(dataUpdateQueryResult{}, sql, parameters) - s5tl.Printf("sql: %s", sql) - if err == nil { - res, _ = s.CreateStatisticsDatasFrom(updates) + jsonData := bc.Get(itob(id)) + if jsonData == nil { + return + } + data := []clientDataDb{} + if err = json.Unmarshal(jsonData, &data); err != nil { + return + } + for _, c := range data { + cd := ClientData{Ip: c.Ip, BytesSent: c.BytesSent} + ua := bu.Get(itob(c.UserAgentId)) + if ua != nil { + cd.UserAgent = string(ua) + } + res = append(res, cd) } return } -func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err error) { - sourceSql, parameters := getFilteredDataUpdateSelect(filter) - sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql - s5tl.Printf("store: sql: %s", sql) - var updates []interface{} - updates, err = s.db.Select(dataUpdateQueryResult{}, sql, parameters) - if err == nil { - res, _ = s.CreateStatisticsDatasFrom(updates) +// fetch all the key-value pairs and merge them into the multidimensional dataupdate + +func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { + res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId) + var src sourceDb + if src, err = st.getSource(tx, du.SourceId); err != nil { + return + } + res.CopyFromSourceDb(src) + if res.Data.Clients, err = st.getClients(tx, duId); err != nil { + return } return } -type lastUpdateQueryResult struct { - MaxDataUpdateId *int -} +// Public Fetch Interface -func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) { - result := lastUpdateQueryResult{} - err = s.db.SelectOne( - &result, - "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?", - uuid) - if err == nil { - updateId = result.MaxDataUpdateId - } else { - s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err) +func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) { + res = []DataUpdateFull{} + if id < 0 { // TODO: interpret negative values as last x values + id = 0 + } + if limit < 0 || limit > StoreGetUpdatesLimit { + s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit) + limit = StoreGetUpdatesLimit } + err = st.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte(dataUpdatesBn)).Cursor() + k, v := c.Seek(itob(id)) + if k == nil { + return nil + } + if btoi(k) == id { + k, v = c.Next() + } + for ; k != nil; k, v = c.Next() { + var d dataUpdateDb + if err := json.Unmarshal(v, &d); err != nil { + return err + } + + duf, err := st.fetchItem(tx, btoi(k), d) + if err != nil { + return err + } + res = append(res, duf) + if len(res) >= limit { + return nil + } + } + return nil + }) return } -func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) { - result := lastUpdateQueryResult{} - err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn) - if err == nil { - updateId = result.MaxDataUpdateId - } else { - s5l.Printf("db: failed to find max DataUpdateId: %v", err) - } +func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) { + err = st.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(dataUpdatesBn)) + + jsonData := b.Get(itob(id)) + if jsonData == nil { + return ErrNotFound + } + + var d dataUpdateDb + if err := json.Unmarshal(jsonData, &d); err != nil { + return err + } + + var err error + if res, err = st.fetchItem(tx, id, d); err != nil { + return err + } + return nil + }) return } -type statsResult struct { - UpdateCount *int - HubCount *int - SourcesCount *int - ClientCount *float32 - BytesSent *uint - BytesReceived *uint - StartTime *int64 - LastStartTime *int64 -} +// +// Auxilliary Data +// -type StatsResult struct { - UpdateCount int - HubCount int - SourcesCount int - ClientCount float32 - BytesSent uint - BytesReceived uint - StartTime time.Time - LastStartTime time.Time +func (st Store) GetStoreId() string { + return st.hubUuid } -func toApiStatsResult(value statsResult) (res StatsResult) { - if value.UpdateCount != nil { - res.UpdateCount = *value.UpdateCount - } - if value.HubCount != nil { - res.HubCount = *value.HubCount - } - if value.SourcesCount != nil { - res.SourcesCount = *value.SourcesCount - } - if value.ClientCount != nil { - res.ClientCount = *value.ClientCount - } - if value.BytesSent != nil { - res.BytesSent = *value.BytesSent - } - if value.BytesReceived != nil { - res.BytesReceived = *value.BytesReceived - } - if value.StartTime != nil { - res.StartTime = time.Unix(*value.StartTime, 0) - } - if value.LastStartTime != nil { - res.LastStartTime = time.Unix(*value.LastStartTime, 0) - } - return res +func (st Store) GetLastUpdateId() (updateId int, err error) { + err = st.db.View(func(tx *bolt.Tx) error { + updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence()) + return nil + }) + return } -var ( - statsGroupSelect = ` -SELECT - count(*) as UpdateCount, - SourceHubUuid as SourceHubUuid, - count(distinct SourceId) as SourcesCount, - avg(ClientCount) as ClientCount, - sum(BytesSent) as BytesSent, - sum(BytesReceived) as BytesReceived, - min(StartTime) as StartTime, - max(StartTime) as LastStartTime -FROM -` - statsGroupClause = ` -GROUP BY - SourceHubUuid -` - statsAggregateSelect = ` -SELECT - sum(UpdateCount) as UpdateCount, - count(distinct SourceHubUuid) as HubCount, - sum(SourcesCount) as SourcesCount, - sum(ClientCount) as ClientCount, - sum(BytesSent) as BytesSent, - sum(BytesReceived) as BytesReceived, - min(StartTime) as StartTime, - max(LastStartTime) as LastStartTime -FROM -` -) - -func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (map[string]interface{}, error) { - sourceSql, parameters := getFilteredDataUpdateSelect(filter) - _ = sourceSql - sql := fmt.Sprintf("%s (%s %s %s)", statsAggregateSelect, statsGroupSelect, sourceSql, statsGroupClause) - s5tl.Printf("store: stats sql: %s", sql) - res := statsResult{} - err := s.db.SelectOne(&res, sql, parameters) - if err == nil { - return toApiStatsResult(res), nil - } - return StatsResult{}, err +func (st Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) { + err = st.db.View(func(tx *bolt.Tx) error { + bUpdateId := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid)) + if bUpdateId == nil { + return nil + } + updateId = btoi(bUpdateId) + return nil + }) + return } -func (s sqliteStore) GetStoreId() (uuid string, err error) { - uuid, err = s.db.SelectStr("select Value from HubInfo where Name = ?", "HubUuid") +func (st Store) GetHubs() (res []string, err error) { + res = []string{st.hubUuid} + err = st.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte(hubUuidsRevBn)).Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + res = append(res, string(v)) + } + return nil + }) return } -func NewStore(mysql bool, path string) (sqliteStore, error) { - db, hubid, err := initDb(mysql, path) - if err != nil { - return sqliteStore{}, err - } - return sqliteStore{db, hubid}, nil +func (st Store) GetSource(id int) (res SourceId, err error) { + err = st.db.View(func(tx *bolt.Tx) error { + src, err := st.getSource(tx, id) + if err != nil { + return err + } + res.CopyFromSourceDb(src) + return nil + }) + return } -func (s sqliteStore) Close() { - s.db.Db.Close() +func (st Store) GetSources() (res []SourceId, err error) { + res = []SourceId{} + err = st.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte(sourcesRevBn)).Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var s sourceDb + if err := json.Unmarshal(v, &s); err != nil { + return err + } + var src SourceId + src.CopyFromSourceDb(s) + res = append(res, src) + } + return nil + }) + return } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 409bd08..5fcae5e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -1,179 +1,815 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( + "fmt" + "io" + "os" + "os/user" + "reflect" "testing" "time" + + "github.com/boltdb/bolt" ) -func TestAppend(t *testing.T) { - store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared") - if err != nil { - t.Errorf("Failed to initialize: %v", err) - return +var ( + testBoltPath = "/run/s5hub_testing_db.bolt" + testBoltPath2 = "/run/s5hub_testing_db2.bolt" + testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt" + testBoltPathFinal = "/run/s5hub_testing_db_final.bolt" + + streamIdData = StreamId{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"} + sourceData = SourceId{Hostname: "streamer", Tags: []string{"tag1", "master"}, StreamId: streamIdData} + updateData = DataUpdate{Data: SourceData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000} + clientsData = []ClientData{ + ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, + ClientData{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, + ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, + ClientData{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, + ClientData{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}} +) + +func generateTestData(n int) ([]DataUpdateFull, int) { + hostnames := []string{"streamer1", "streamer2"} + contents := []string{"av", "audio"} + formats := []string{"webm", "flash", "hls"} + qualities := []string{"high", "medium", "low"} + tags := []string{"2017", "elevate", "discourse"} + duration := int64(15000) + + starttime := time.Now() + numSrcs := len(hostnames) * len(contents) * len(formats) * len(qualities) + if n < 0 { + n = numSrcs } - defer store.Close() + var data []DataUpdateFull + for i := 0; i < n; i += numSrcs { + for _, hostname := range hostnames { + for _, content := range contents { + for _, format := range formats { + for _, quality := range qualities { + d := DataUpdateFull{} + d.Version = ProtocolVersion + + d.SourceId.Hostname = hostname + d.SourceId.StreamId.ContentId = content + d.SourceId.StreamId.Format = format + d.SourceId.StreamId.Quality = quality + d.SourceId.Tags = tags - startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC) - update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000} - streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} - source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} - dat := StatisticsData{nil, nil, source, update} + d.DataUpdate.StartTime = starttime + d.DataUpdate.Duration = duration + d.DataUpdate.Data.ClientCount = uint(len(clientsData)) + d.DataUpdate.Data.BytesSent = 6400 * uint(len(clientsData)) - err = store.Append(dat) + d.DataUpdate.Data.Clients = clientsData + data = append(data, d) + } + } + } + } + starttime = starttime.Add(time.Duration(duration) * time.Millisecond) + } + return data[:n], numSrcs +} + +func TestMain(m *testing.M) { + u, err := user.Current() if err != nil { - t.Errorf("Failed to append: %v", err) - return + os.Exit(-1) + } + testBoltPath = fmt.Sprintf("/run/user/%s/s5hub_testing_db.bolt", u.Uid) + testBoltPath2 = fmt.Sprintf("/run/user/%s/s5hub_testing_db2.bolt", u.Uid) + testBoltPathFwd = fmt.Sprintf("/run/user/%s/s5hub_testing_db_fwd.bolt", u.Uid) + testBoltPathFinal = fmt.Sprintf("/run/user/%s/s5hub_testing_db_final.bolt", u.Uid) + os.Exit(m.Run()) +} + +// +// Testing +// + +func TestOpen(t *testing.T) { + // non-existing directory + if _, err := NewStore("/nonexistend/db.bolt", false); err == nil { + t.Fatalf("opening store in nonexisting directory should throw an error") + } + + // store path is a directory + os.Remove(testBoltPath) + if err := os.MkdirAll(testBoltPath, 0700); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store using a directory should throw an error") } - stats, err := store.GetStats(nil) + // exisiting but non-database file + os.Remove(testBoltPath) + if f, err := os.Create(testBoltPath); err != nil { + t.Fatalf("unexpected error: %v", err) + } else { + io.WriteString(f, "this is not a bolt db.") + f.Close() + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store using a invalid database should throw an error") + } + + // bolt db without HubInfo Bucket + os.Remove(testBoltPath) + db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}) if err != nil { - t.Errorf("Failed to get stats: %v", err) + t.Fatalf("unexpected error: %v", err) + } else { + if err = db.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store without HubInfo Bucket should throw an error") + } + + // bolt db no version key + if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil { + t.Fatalf("unexpected error: %v", err) } else { - clientCount := int(stats.ClientCount) - updateCount := stats.UpdateCount - if 3 != clientCount { - t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount) + err = db.Update(func(tx *bolt.Tx) error { + if _, err := tx.CreateBucket([]byte(hubInfoBn)); err != nil { + return err + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) } - if 1 != updateCount { - t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount) + if err = db.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) } } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store without a database version should throw an error") + } - queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC) - filterStruct := StatsFilter{start: &queryStartTime} - stats, err = store.GetStats(&filterStruct) - if err != nil { - t.Errorf("Failed to get stats: %v", err) + // bolt db wrong version + if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil { + t.Fatalf("unexpected error: %v", err) + } else { + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(hubInfoBn)) + if err := b.Put([]byte(storeVersionKey), itob(0)); err != nil { + return err + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err = db.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store with wrong database version should throw an error") + } + + // bolt db no UUID key + if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil { + t.Fatalf("unexpected error: %v", err) } else { - updateCount := stats.UpdateCount - if 0 != updateCount { - t.Errorf("Failed to filter entries by start time, 0 != %v", updateCount) + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(hubInfoBn)) + if err := b.Put([]byte(storeVersionKey), itob(StoreVersion)); err != nil { + return err + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err = db.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store without a database UUID should throw an error") + } + + // bolt db empty UUID + if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil { + t.Fatalf("unexpected error: %v", err) + } else { + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(hubInfoBn)) + if err := b.Put([]byte(hubUuidKey), []byte("")); err != nil { + return err + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err = db.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store with empty UUID should throw an error") + } + + // bolt db with missing buckets + if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil { + t.Fatalf("unexpected error: %v", err) + } else { + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(hubInfoBn)) + if err := b.Put([]byte(hubUuidKey), []byte("6d1a2192-7cb6-404d-80fb-add9b40a8f33")); err != nil { + return err + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err = db.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening store with (some) buckets missing should throw an error") + } + + // create new bolt-db and reopen it + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("creating new store failed: %v", err) + } + createdUuid := store.hubUuid + store.Close() + + store, err = NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("re-opening existing store failed: %v", err) + } + if createdUuid != store.hubUuid { + t.Fatalf("UUID of opened store differs from the one previously generated: '%s' != '%s'", createdUuid, store.hubUuid) + } + + if _, err := NewStore(testBoltPath, false); err == nil { + t.Fatalf("opening already opened database should throw an error") + } + store.Close() +} + +func TestAppendAndFetch(t *testing.T) { + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer store.Close() + + if _, err := store.GetUpdate(17); err != ErrNotFound { + t.Fatalf("fetching not exisiting update should return not-found, error=%v", err) + } + + upd := updateData + upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) + upd.Data.Clients = clientsData + in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + + if err = store.Append(in); err != nil { + t.Fatalf("failed to append update: %v", err) + } + + out, err := store.GetUpdate(1) + if err != nil { + t.Fatalf("failed to fetch update: %v", err) + } + out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder + + expected := in + expected.SourceHubUuid = store.GetStoreId() + expected.SourceHubDataUpdateId = 1 + expected.ForwardHubUuid = "" + expected.ForwardHubDataUpdateId = 0 + + if !reflect.DeepEqual(expected, out) { + t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected) + } + + // append many + var ins []DataUpdateFull + upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) + ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd}) + upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) + upd.Data.Clients = nil + ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd}) + if err = store.AppendMany(ins); err != nil { + t.Fatalf("failed to append update: %v", err) + } + + for i := 0; i < 2; i = i + 1 { + out, err = store.GetUpdate(i + 2) + if err != nil { + t.Fatalf("failed to fetch update: %v", err) + + } + out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder + expected = ins[i] + expected.SourceHubUuid = store.GetStoreId() + expected.SourceHubDataUpdateId = i + 2 + expected.ForwardHubUuid = "" + expected.ForwardHubDataUpdateId = 0 + + if !reflect.DeepEqual(expected, out) { + t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected) } } } -func TestCount(t *testing.T) { - store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared") +func TestReadOnly(t *testing.T) { + // create read-only db from not-existing file must fail + os.Remove(testBoltPath) + if _, err := NewStore(testBoltPath, true); err == nil { + t.Fatalf("creating a read-only database should throw an error") + } + + // prepare a store with one data-update + store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + upd := updateData + upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) + upd.Data.Clients = clientsData + in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + if err = store.Append(in); err != nil { + t.Fatalf("unexpected error: %v", err) + } + store.Close() + + // read-only db from existing file must succeed + store, err = NewStore(testBoltPath, true) if err != nil { - t.Errorf("Failed to initialize: %v", err) + t.Fatalf("opening existing store in read-only mode failed: %v", err) } defer store.Close() - stats, err := store.GetStats(nil) - clientCount := int(stats.ClientCount) - if 0 != clientCount { - t.Errorf("Failed to count correctly.") + // fetching the date-update from read-only store must succeed + if _, err := store.GetUpdate(1); err != nil { + t.Fatalf("failed to fetch update: %v", err) + } + + // appending to read-only store must fail + if err = store.Append(in); err == nil { + t.Fatalf("appending to read-only store should throw an error") } } func TestGetUpdatesAfter(t *testing.T) { - store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared") + // prepare a store with 3 data-updates + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) if err != nil { - t.Errorf("Failed to initialize: %v", err) - return + t.Fatalf("unexpected error: %v", err) } defer store.Close() - startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC) - update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000} - streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} - source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} - dat := StatisticsData{nil, nil, source, update} + upd := updateData + upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) + upd.Data.Clients = clientsData - err = store.Append(dat) + expected := []DataUpdateFull{} + for i := 0; i < 3; i = i + 1 { + in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + if err = store.Append(in); err != nil { + t.Fatalf("unexpected error: %v", err) + } + e := in + e.SourceHubUuid = store.hubUuid + e.SourceHubDataUpdateId = i + 1 + e.ForwardHubUuid = "" + e.ForwardHubDataUpdateId = 0 + expected = append(expected, e) + upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) + } + + // check if there are 3 updates in store + lastId, err := store.GetLastUpdateId() if err != nil { - t.Errorf("Failed to retrieve: %v", err) - return + t.Fatalf("unexpected error: %v", err) + } + if lastId != 3 { + t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) } - res, err := store.GetUpdatesAfter(2) - t.Logf("got updates (err %v):\n%#v", err, res) -} + // all the updates + updList, err := store.GetUpdatesAfter(-1, -1) + if err != nil { + t.Fatalf("failed to fetch updates: %v", err) + } + for i, _ := range updList { + updList[i].StartTime = updList[i].StartTime.UTC() // this would normally be handled by the protocol encoder + } -func generateStatisticsData(n int) (data []StatisticsData) { - hostnames := []string{"streamer1", "streamer2"} - contents := []string{"av", "audio"} - formats := []string{"webm", "flash", "hls"} - qualities := []string{"high", "medium", "low"} + if !reflect.DeepEqual(expected, updList) { + t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected) + } - numcombis := len(hostnames) * len(contents) * len(formats) * len(qualities) + // first 2 + updList, err = store.GetUpdatesAfter(0, 2) + if err != nil { + t.Fatalf("failed to fetch updates: %v", err) + } + for i, _ := range updList { + updList[i].StartTime = updList[i].StartTime.UTC() // this would normally be handled by the protocol encoder + } + if len(updList) != 2 { + t.Fatalf("failed to fetch updates: got %d updates, expected 1", len(updList)) + } + if !reflect.DeepEqual(expected[:2], updList) { + t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected[:2]) + } - clients := []ClientData{ - ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, - ClientData{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, - ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, - ClientData{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, - ClientData{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}} - starttime := time.Now() - duration := int64(15000) - tags := []string{"2017", "elevate", "discourse"} + // all after id 2 + updList, err = store.GetUpdatesAfter(2, -1) + if err != nil { + t.Fatalf("failed to fetch updates: %v", err) + } + for i, _ := range updList { + updList[i].StartTime = updList[i].StartTime.UTC() // this would normally be handled by the protocol encoder + } - for i := 0; i < n; i += numcombis { - for _, hostname := range hostnames { - for _, content := range contents { - for _, format := range formats { - for _, quality := range qualities { - d := StatisticsData{} - d.SourceId.Version = 1 - d.SourceId.Hostname = hostname - d.SourceId.Tags = tags + if !reflect.DeepEqual(expected[2:], updList) { + t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected[2:]) + } - d.SourceId.StreamId.ContentId = content - d.SourceId.StreamId.Format = format - d.SourceId.StreamId.Quality = quality + // all after id 3 -> should return nothing + updList, err = store.GetUpdatesAfter(3, -1) + if err != nil { + t.Fatalf("failed to fetch updates: %v", err) + } + if len(updList) != 0 { + t.Fatalf("failed to fetch updates: got %d updates, expected 0", len(updList)) + } +} - d.DataUpdate.StartTime = starttime - d.DataUpdate.Duration = duration - d.DataUpdate.Data.ClientCount = uint(len(clients)) - d.DataUpdate.Data.BytesSent = 6400 * uint(len(clients)) +func TestForwardedDataUpdates(t *testing.T) { + // prepare a new store + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer store.Close() - for _, client := range clients { - c := ClientData{client.Ip, client.UserAgent, client.BytesSent} - d.DataUpdate.Data.Clients = append(d.DataUpdate.Data.Clients, c) - } - data = append(data, d) - } - } - } + // generate/append some local updates + data, _ := generateTestData(10) + if err := store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + myLastId := 10 + forwardedHub := "05defdfa-e7d1-4ca8-8b5c-02abb0088d29" + + // check if there are no updates for this hub in store + lastId, err := store.GetLastUpdateForUuid(forwardedHub) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != 0 { + t.Fatalf("failed to get last update ID: %d, expected 0", lastId) + } + + // get list of all hubs + hubs, err := store.GetHubs() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(hubs) != 1 { + t.Fatalf("failed to get hub UUIDs: got %d hubs, expected 1", len(hubs)) + } + if hubs[0] != store.hubUuid { + t.Fatalf("fist hub should be the own stores UUID but is: %s", hubs[0]) + } + + // append 3 forwarded data-updates + upd := updateData + upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) + upd.Data.Clients = clientsData + + expected := []DataUpdateFull{} + for i := 0; i < 3; i = i + 1 { + in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd} + in.SourceHubUuid = forwardedHub + in.SourceHubDataUpdateId = 3 - i // out of order + if err = store.Append(in); err != nil { + t.Fatalf("unexpected error: %v", err) } - starttime = starttime.Add(time.Duration(duration) * time.Millisecond) + myLastId = myLastId + 1 + in.ForwardHubUuid = store.GetStoreId() + in.ForwardHubDataUpdateId = myLastId + expected = append(expected, in) + upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) + } + + out, err := store.GetUpdatesAfter(10, 3) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + for i, _ := range out { + out[i].StartTime = out[i].StartTime.UTC() // this would normally be handled by the protocol encoder + } + if !reflect.DeepEqual(out, expected) { + t.Fatalf("failed to fetch source\nactual: %v\nexpected: %v\n", out, expected) + } + + // check if the last update for this hub is 3 + lastId, err = store.GetLastUpdateForUuid(forwardedHub) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != 3 { + t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) + } + + // get list of all hubs + hubs, err = store.GetHubs() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(hubs) != 2 { + t.Fatalf("failed to get hub UUIDs: got %d hubs, expected 2", len(hubs)) + } + if hubs[0] != store.hubUuid { + t.Fatalf("fist hub should be the own stores UUID but is: %s", hubs[0]) + } + if hubs[1] != forwardedHub { + t.Fatalf("second hub UUID is wrong: %s, expected: %s", hubs[1], forwardedHub) + } + + // check if the last update is now 13 + lastId, _ = store.GetLastUpdateId() + if lastId != 13 { + t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) } - return } +func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) { + lastId, err := fwdStore.GetLastUpdateForUuid(src1Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != fwdSrc1Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id) + } + lastId, err = fwdStore.GetLastUpdateForUuid(src2Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != fwdSrc2Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id) + } + lastId, err = finalStore.GetLastUpdateForUuid(src1Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != finalSrc1Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id) + } + lastId, err = finalStore.GetLastUpdateForUuid(src2Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != finalSrc2Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id) + } + lastId, err = finalStore.GetLastUpdateForUuid(fwdStore.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != finalFwdId { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalFwdId) + } +} + +func TestForwardedDataUpdates2(t *testing.T) { + // prepare 4 new stores + os.Remove(testBoltPath) + src1Store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer src1Store.Close() + + os.Remove(testBoltPath2) + src2Store, err := NewStore(testBoltPath2, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer src2Store.Close() + + os.Remove(testBoltPathFwd) + fwdStore, err := NewStore(testBoltPathFwd, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer fwdStore.Close() + + os.Remove(testBoltPathFinal) + finalStore, err := NewStore(testBoltPathFinal, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer finalStore.Close() + + // generate/append some updates to src + data, _ := generateTestData(10) + if err := src1Store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + data, _ = generateTestData(7) + if err := src2Store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0) + + // forward 5 updates from src1 to fwd + if data, err = src1Store.GetUpdatesAfter(0, 5); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0) + + // forward 3 updates from src2 to fwd + if data, err = src2Store.GetUpdatesAfter(0, 3); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0) + + // forward 7 updates from fwd to final + if data, err = fwdStore.GetUpdatesAfter(0, 7); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := finalStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7) + + // forward remaining updates from src1 and src2 to fwd + if data, err = src1Store.GetUpdatesAfter(5, -1); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if data, err = src2Store.GetUpdatesAfter(3, -1); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7) + + // forward remainging from fwd to final + if data, err = fwdStore.GetUpdatesAfter(7, -1); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := finalStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17) +} + +func TestGetSources(t *testing.T) { + // prepare a new store + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer store.Close() + + if _, err := store.GetSource(17); err != ErrNotFound { + t.Fatalf("fetching not exisiting source should return not-found, error=%v", err) + } + + // generate/append some data + data, numSrcs := generateTestData(-1) + if err := store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // fetch first source + src, err := store.GetSource(1) + if err != nil { + t.Fatalf("fetching source failed: %v", err) + } + if !reflect.DeepEqual(data[0].SourceId, src) { + t.Fatalf("failed to fetch source\nactual: %v\nexpected: %v\n", src, data[0].SourceId) + } + + // fetch all the sources + srcList, err := store.GetSources() + if err != nil { + t.Fatalf("fetching sources failed: %v", err) + } + if len(srcList) != numSrcs { + t.Fatalf("wrong number of sources: %d, expected %d", len(srcList), numSrcs) + } + // the result will be orderd using the slug so doing a DeepEqual doesn't work here +} + +// +// Benchmarking +// + func BenchmarkAppendMany(b *testing.B) { - store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared") + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) if err != nil { - b.Errorf("Failed to initialize: %v", err) + b.Fatalf("unexpected error: %v", err) } defer store.Close() - data := generateStatisticsData(b.N) + data, _ := generateTestData(b.N) b.ResetTimer() if err := store.AppendMany(data); err != nil { - b.Errorf("Failed to append: %v", err) + b.Fatalf("unexpected error: %v", err) } } func BenchmarkGetUpdatesAfter(b *testing.B) { - store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared") + os.Remove(testBoltPath) + store, err := NewStore(testBoltPath, false) if err != nil { - b.Errorf("Failed to initialize: %v", err) + b.Fatalf("unexpected error: %v", err) } defer store.Close() - data := generateStatisticsData(b.N) + data, _ := generateTestData(b.N) if err := store.AppendMany(data); err != nil { - b.Errorf("Failed to append: %v", err) + b.Fatalf("unexpected error: %v", err) } b.ResetTimer() latestId := -1 for { - updates, err := store.GetUpdatesAfter(latestId) + updates, err := store.GetUpdatesAfter(latestId, -1) if err != nil { - b.Errorf("Failed to retrieve: %v", err) + b.Fatalf("failed to retrieve: %v", err) } if len(updates) == 0 { break diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 5b2b29f..12e92c8 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -1,13 +1,39 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import "time" -const ( - QualityLow = "low" - QualityMedium = "medium" - QualityHigh = "high" -) - type StreamId struct { ContentId string `json:"content-id"` Format string `json:"format"` @@ -15,23 +41,22 @@ type StreamId struct { } type SourceId struct { - Version uint `json:"version" db:"-"` Hostname string `json:"hostname"` - StreamId StreamId `json:"streamer-id" db:"-"` - Tags []string `json:"tags" db:"-"` + StreamId StreamId `json:"streamer-id"` + Tags []string `json:"tags,omitempty"` } type ClientData struct { Ip string `json:"ip"` - UserAgent string `json:"user-agent"` + UserAgent string `json:"user-agent,omitempty"` BytesSent uint `json:"bytes-sent"` } type SourceData struct { ClientCount uint `json:"client-count"` - BytesReceived uint `json:"bytes-received"` + BytesReceived uint `json:"bytes-received,omitempty"` BytesSent uint `json:"bytes-sent"` - Clients []ClientData `json:"clients"` + Clients []ClientData `json:"clients,omitempty"` } type DataUpdate struct { @@ -40,43 +65,32 @@ type DataUpdate struct { Data SourceData `json:"data"` } -type StatisticsData struct { - SourceHubUuid *string - SourceHubDataUpdateId *int +type DataUpdateFull struct { + Version uint `json:"version"` + SourceHubUuid string `json:"SourceHubUuid,omitempty"` + SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"` + ForwardHubUuid string `json:"ForwardHubUuid,omitempty"` + ForwardHubDataUpdateId int `json:"ForwardHubDataUpdateId,omitempty"` SourceId DataUpdate } -type DataContainer struct { - Data interface{} `json:"data"` -} - -type StatisticsDataContainer struct { - Data []StatisticsData `json:"data"` +type DataUpdateFullContainer struct { + Data []DataUpdateFull `json:"data"` } -type StatsFilter struct { - start *time.Time - end *time.Time - hostname *string - contentId *string - format *string - quality *string - tagsAny []string - afterUpdateId *int - limit *int - sortOrder *string +func (duf *DataUpdateFull) CopyFromSourceId(src *SourceId) { + duf.Hostname = src.Hostname + duf.StreamId = src.StreamId + duf.Tags = src.Tags } -func (self *StatisticsData) CopyFromSourceId(id *SourceId) { - self.Hostname = id.Hostname - self.StreamId = id.StreamId - self.Tags = id.Tags - self.Version = id.Version +func (duf *DataUpdateFull) CopyFromUpdate(du *DataUpdate) { + duf.StartTime = du.StartTime + duf.Duration = du.Duration + duf.Data = du.Data } -func (self *StatisticsData) CopyFromUpdate(id *DataUpdate) { - self.StartTime = id.StartTime - self.Duration = id.Duration - self.Data = id.Data +type GenericDataContainer struct { + Data interface{} `json:"data"` } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index f06e953..bba4433 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -1,120 +1,161 @@ +// +// sfive +// +// sfive - spreadspace streaming statistics suite is a generic +// statistic collection tool for streaming server infrastuctures. +// The system collects and stores meta data like number of views +// and throughput from a number of streaming servers and stores +// it in a global data store. +// The data acquisition is designed to be generic and extensible in +// order to support different streaming software. +// sfive also contains tools and applications to filter and visualize +// live and recorded data. +// +// +// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org> +// Markus Grüneis <gimpf@gimpf.org> +// +// This file is part of sfive. +// +// sfive is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 +// as published by the Free Software Foundation. +// +// sfive is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with sfive. If not, see <http://www.gnu.org/licenses/>. +// + package sfive import ( + "encoding/binary" + "errors" + "fmt" + "strings" "time" ) -// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections -// this is very much not normalized at all, because I'm too lazy to type +var ( + ErrNotFound = errors.New("not found") + ErrReadOnly = errors.New("store is in read-only mode") +) -// table names const ( - tagsTn = "Tags" - sourceTagsTn = "StreamToTagMap" - sourcesTn = "Sources" - clientdataUpdatesTn = "ClientDataUpdates" - dataUpdatesTn = "DataUpdates" - hubInfoTn = "HubInfo" + // bucket names + hubInfoBn = "HubInfo" + latestUpdatesBn = "LatestUpdates" + hubUuidsFwdBn = "HubUUIDsFwd" + hubUuidsRevBn = "HubUUIDsRev" + dataUpdatesBn = "DataUpdates" + sourcesFwdBn = "SourcesFwd" + sourcesRevBn = "SourcesRev" + clientDataBn = "ClientData" + userAgentsFwdBn = "UserAgentsFwd" + userAgentsRevBn = "UserAgentsRev" + + // well-known keys + hubUuidKey = "HubUUID" + storeVersionKey = "Version" ) -type hubInfoDb struct { - Name string - Value string +// stored in sourcesRevBn +type streamIdDb struct { + ContentId string `json:"c"` + Format string `json:"f"` + Quality string `json:"q"` } -// stored in tagsTn -type tagDb struct { - Id int - Name string +type sourceDb struct { + Hostname string `json:"h"` + StreamId streamIdDb `json:"s"` + Tags []string `json:"t"` } -// stored in sourceTagsTn -// Stream m:n Tag -type sourceTagsDb struct { - TagId int // foreign key to tagsTn - SourceId int // foreign key to sourcesTn +func NewSourceDb(value DataUpdateFull) sourceDb { + return sourceDb{ + Hostname: value.SourceId.Hostname, + StreamId: streamIdDb{ + ContentId: value.SourceId.StreamId.ContentId, + Format: value.SourceId.StreamId.Format, + Quality: value.SourceId.StreamId.Quality, + }, + Tags: value.SourceId.Tags, + } } -// stored in sourcesTn -type sourceDb struct { - Id int - StreamId - SourceId +func (s sourceDb) Slug() string { + return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ",")) } -// stored in clientdataUpdatesTn -// ClientData n:1 DataUpdate -type clientDataDb struct { - Id int - DataUpdatesId int // foreign key to dataUpdatesTn - ClientData +func (s *SourceId) CopyFromSourceDb(v sourceDb) { + s.Hostname = v.Hostname + s.StreamId.ContentId = v.StreamId.ContentId + s.StreamId.Format = v.StreamId.Format + s.StreamId.Quality = v.StreamId.Quality + s.Tags = v.Tags } -// stored in dataUpdatesTn -// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs -type dataUpdateDb struct { - Id int - SourceId int // foreign key to sourcesTn - SourceHubUuid *string - SourceHubDataUpdateId *int - StartTime int64 // time.Time - Duration int64 // duration in milliseconds - ClientCount uint - BytesReceived uint - BytesSent uint +// stored in clientDataBn +type clientDataDb struct { + Ip string `json:"ip"` + UserAgentId int `json:"ua"` + BytesSent uint `json:"bs"` } -func (self *SourceId) CopyFromSourceDb(value sourceDb) { - self.Version = value.Version - self.Hostname = value.Hostname - self.StreamId.ContentId = value.ContentId - self.StreamId.Format = value.Format - self.StreamId.Quality = value.Quality +// stored in dataUpdatesBn +type dataUpdateDb struct { + SourceHubId int `json:"h,omitempty"` + SourceHubDataUpdateId int `json:"hi,omitempty"` + SourceId int `json:"si"` + StartTime int64 `json:"st"` // unix timestamp in milliseconds + Duration int64 `json:"du"` // duration in milliseconds + ClientCount uint `json:"cc,omitempty"` + BytesReceived uint `json:"br,omitempty"` + BytesSent uint `json:"bs,omitempty"` } -func (self *SourceId) CopyFromTagsDb(values []tagDb) { - tags := make([]string, len(values)) - for i := range values { - tags[i] = values[i].Name +func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb { + return dataUpdateDb{ + -1, + v.SourceHubDataUpdateId, + -1, + int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000), + v.Duration, + v.Data.ClientCount, + v.Data.BytesReceived, + v.Data.BytesSent, } - self.Tags = tags } -func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId string) { - if value.SourceHubUuid == nil { - self.SourceHubUuid = &hubId +func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, srcHubUuid, hubUuid string, id int) { + if srcHubUuid == "" { + s.SourceHubUuid = hubUuid + s.SourceHubDataUpdateId = id } else { - self.SourceHubUuid = value.SourceHubUuid - } - if value.SourceHubDataUpdateId == nil { - self.SourceHubDataUpdateId = &value.Id - } else { - self.SourceHubDataUpdateId = value.SourceHubDataUpdateId + s.SourceHubUuid = srcHubUuid + s.SourceHubDataUpdateId = v.SourceHubDataUpdateId + s.ForwardHubUuid = hubUuid + s.ForwardHubDataUpdateId = id } - self.StartTime = time.Unix(value.StartTime, 0) - self.Duration = value.Duration - self.Data.ClientCount = value.ClientCount - self.Data.BytesReceived = value.BytesReceived - self.Data.BytesSent = value.BytesSent + s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000) + s.Duration = v.Duration + s.Data.ClientCount = v.ClientCount + s.Data.BytesReceived = v.BytesReceived + s.Data.BytesSent = v.BytesSent } -func (self *StatisticsData) CopyFromClientDataDb(values []clientDataDb) { - clients := make([]ClientData, len(values)) - for i := range values { - clients[i].Ip = values[i].Ip - clients[i].UserAgent = values[i].UserAgent - clients[i].BytesSent = values[i].BytesSent - } - self.Data.Clients = clients +func itob(v int) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(v)) + return b } -func cvtToApiStatisticsData( - hubId string, source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData { - res := StatisticsData{} - res.CopyFromSourceDb(source) - res.CopyFromDataUpdateDb(update, hubId) - res.CopyFromClientDataDb(clients) - res.CopyFromTagsDb(tags) - return res +func btoi(b []byte) int { + return int(binary.BigEndian.Uint64(b)) } diff --git a/src/hub/test-bolt b/src/hub/test-bolt new file mode 100755 index 0000000..1787181 --- /dev/null +++ b/src/hub/test-bolt @@ -0,0 +1,21 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 <db-name> [ ... args-to-bolt ... ]" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" +shift + +BIN="$(go env GOPATH)/bin/bolt" +if [ ! -x "$BIN" ]; then + echo "bolt not found. Please run:" + echo "" + echo " go get -u github.com/boltdb/bolt/..." + echo "" + exit 1 +fi + +exec "$BIN" $@ "$TEST_DB" diff --git a/src/hub/test-bolter b/src/hub/test-bolter new file mode 100755 index 0000000..24c8901 --- /dev/null +++ b/src/hub/test-bolter @@ -0,0 +1,21 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 <db-name> [ ... args-to-bolter ... ]" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" +shift + +BIN="$(go env GOPATH)/bin/bolter" +if [ ! -x "$BIN" ]; then + echo "bolter not found. Please run:" + echo "" + echo " go get -u github.com/hasit/bolter" + echo "" + exit 1 +fi + +exec "$BIN" --file "$TEST_DB" $@ diff --git a/src/hub/test-client b/src/hub/test-client index 8a24c7d..fcb9a98 100755 --- a/src/hub/test-client +++ b/src/hub/test-client @@ -1,23 +1,22 @@ #!/bin/sh + +TEST_D="./test" + echo pipe: import sample.json echo ------------------------ -socat file:../../dat/sample.json,rdonly unix-client:/run/sfive/pipe +socat file:../../dat/sample.json,rdonly "unix-client:$TEST_D/pipe" echo pipe-gram: import sample-gram.json echo ---------------------------------- -while read x; do echo "$x" | socat stdio unix-sendto:/run/sfive/pipegram; done < ../../dat/sample-gram.json +while read x; do echo "$x" | socat stdio "unix-sendto:$TEST_D/pipegram"; done < ../../dat/sample-gram.json + +echo post update +echo ----------- +curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates' echo show query result echo ----------------- -curl -i 'http://localhost:8000/updates?from=2013-10-21T00:00:00Z&to=2013-10-21T12:31:00Z' +curl -i 'http://localhost:8000/updates' -echo '\npost update' -echo ------------ -curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates' - -echo show stats -echo ---------- -curl -i 'http://localhost:8000/stats' echo '\n\ndone' - diff --git a/src/hub/test-collector b/src/hub/test-collector deleted file mode 100755 index e6ca367..0000000 --- a/src/hub/test-collector +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -./bin/sfive-hub -db "file:memdb1?mode=memory&cache=shared" -start-pipe-server=false -start-pipegram-server=false -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000" - diff --git a/src/hub/test-fwd b/src/hub/test-fwd new file mode 100755 index 0000000..371aa55 --- /dev/null +++ b/src/hub/test-fwd @@ -0,0 +1,11 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000" diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es index 3f3eb12..61bbcf7 100755 --- a/src/hub/test-fwd-es +++ b/src/hub/test-fwd-es @@ -1,5 +1,11 @@ #!/bin/sh -rm -f /run/sfive/pipe /run/sfive/pipegram -./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-es-url="http://stream.elevate.at:9200/e14" +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing" diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik index 1d45219..8a2ae33 100755 --- a/src/hub/test-fwd-piwik +++ b/src/hub/test-fwd-piwik @@ -1,4 +1,11 @@ #!/bin/sh -rm -f /run/sfive/pipe /run/sfive/pipegram -./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at" +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at" diff --git a/src/hub/test-import b/src/hub/test-import index d1a5044..a6c2942 100755 --- a/src/hub/test-import +++ b/src/hub/test-import @@ -1,10 +1,12 @@ #!/bin/sh + +TEST_D="./test" + echo pipe: import sample.json echo ------------------------ [ -f ../../dat/sample-access.json ] || zcat ../../dat/sample-accesslog.json.gz > ../../dat/sample-accesslog.json -socat file:../../dat/sample-accesslog.json,rdonly unix-client:/run/sfive/pipe +socat file:../../dat/sample-accesslog.json,rdonly "unix-client:$TEST_D/pipe" echo '\n\ndone' - diff --git a/src/hub/test-srv b/src/hub/test-srv index 254549a..064fa3a 100755 --- a/src/hub/test-srv +++ b/src/hub/test-srv @@ -1,4 +1,13 @@ #!/bin/sh -rm -f /run/sfive/pipe /run/sfive/pipegram -./bin/sfive-hub -db /var/lib/sfive/db.sqlite -start-pipe-server -pipe /var/run/sfive/pipe -start-pipegram-server -pipegram /var/run/sfive/pipegram -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8001" +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000" diff --git a/src/hub/test-srv-ro b/src/hub/test-srv-ro new file mode 100755 index 0000000..56fe440 --- /dev/null +++ b/src/hub/test-srv-ro @@ -0,0 +1,13 @@ +#!/bin/sh + +if [ -z "$1" ]; then + echo "Usage: $0 <db-name>" + exit 1 +fi + +TEST_D="./test" +TEST_DB="$TEST_D/$1.bolt" + +mkdir -p "$TEST_D" +rm -f "$TEST_D/pipe" "$TEST_D/pipegram" +exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000" |