summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/.gitignore2
-rw-r--r--src/hub/Makefile12
-rwxr-xr-xsrc/hub/dump-test-db3
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go69
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go129
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go150
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5log.go36
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go209
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go60
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go62
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go47
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go51
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go76
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go224
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go917
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go840
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go96
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go211
-rwxr-xr-xsrc/hub/test-bolt21
-rwxr-xr-xsrc/hub/test-bolter21
-rwxr-xr-xsrc/hub/test-client21
-rwxr-xr-xsrc/hub/test-collector4
-rwxr-xr-xsrc/hub/test-fwd11
-rwxr-xr-xsrc/hub/test-fwd-es10
-rwxr-xr-xsrc/hub/test-fwd-piwik11
-rwxr-xr-xsrc/hub/test-import6
-rwxr-xr-xsrc/hub/test-srv13
-rwxr-xr-xsrc/hub/test-srv-ro13
28 files changed, 2154 insertions, 1171 deletions
diff --git a/src/hub/.gitignore b/src/hub/.gitignore
index 16dc168..dd65bbd 100644
--- a/src/hub/.gitignore
+++ b/src/hub/.gitignore
@@ -5,3 +5,5 @@
/pkg
*.a
*.o
+/test
+/coverage.out
diff --git a/src/hub/Makefile b/src/hub/Makefile
index 0c34f14..87890a4 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -12,7 +12,7 @@
# live and recorded data.
#
#
-# Copyright (C) 2014-2015 Christian Pointner <equinox@spreadspace.org>
+# Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
# Markus Grüneis <gimpf@gimpf.org>
#
# This file is part of sfive.
@@ -38,13 +38,10 @@ endif
EXECUTEABLE := sfive-hub
-LIBS := "gopkg.in/gorp.v2" \
- "github.com/mattn/go-sqlite3" \
+LIBS := "github.com/boltdb/bolt" \
"github.com/zenazn/goji" \
"github.com/pborman/uuid" \
"github.com/equinox0815/graphite-golang"
-# "github.com/go-sql-driver/mysql"
-# "github.com/ziutek/mymysql/godrv"
all: build test
@@ -83,6 +80,11 @@ bench: getlibs
@echo "testing and benchmarking: sfive"
@$(GOCMD) test -bench=. spreadspace.org/sfive
+cover: getlibs
+ @echo "testing with coverage output"
+ @$(GOCMD) test -coverprofile=coverage.out spreadspace.org/sfive
+ @$(GOCMD) tool cover -html=coverage.out
+
clean:
rm -rf pkg/*/spreadspace.org
rm -rf bin
diff --git a/src/hub/dump-test-db b/src/hub/dump-test-db
deleted file mode 100755
index 9a7aae7..0000000
--- a/src/hub/dump-test-db
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/sh
-echo .dump | sqlite3 /var/lib/sfive/db.sqlite
-
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 8897e42..4028f3f 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package main
import (
@@ -12,8 +44,8 @@ import (
var s5hl = log.New(os.Stderr, "[s5hub]\t", log.LstdFlags)
func main() {
- db := flag.String("db", "/var/lib/sfive/db.sqlite", "path to the sqlite3 database file")
- dbMysql := flag.Bool("db-mysql", false, "use MySQL connector")
+ db := flag.String("db", "/var/lib/sfive/db.bolt", "path to the database file")
+ readOnly := flag.Bool("read-only", false, "open database in read-only mode")
pipe := flag.String("pipe", "/var/run/sfive/pipe", "path to the unix pipe for the pipeserver")
ppipe := flag.String("pipegram", "/var/run/sfive/pipegram", "path to the unix datagram pipe for the pipeserver")
startPipe := flag.Bool("start-pipe-server", true, "start a connection oriented pipe server; see option pipe")
@@ -27,7 +59,6 @@ func main() {
piwikSiteURL := flag.String("piwik-site-url", "", "use this base url for the site")
piwikSiteID := flag.Uint("piwik-site-id", 1, "use this site-id for piwik")
piwikToken := flag.String("piwik-token", "", "the auth token for piwik")
- vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application")
help := flag.Bool("help", false, "show usage")
flag.Parse()
@@ -38,11 +69,11 @@ func main() {
return
}
- server, err := sfive.NewServer(*dbMysql, *db)
+ srv, err := sfive.NewServer(*db, *readOnly)
if err != nil {
- s5hl.Fatalf("failed to initialize: %v", err)
+ s5hl.Fatalf(err.Error())
}
- defer server.Close()
+ defer srv.Close()
var wg sync.WaitGroup
@@ -51,7 +82,7 @@ func main() {
go func() {
defer wg.Done()
s5hl.Printf("start pipe at %v\n", *pipe)
- server.ServePipe(*pipe)
+ srv.ServePipe(*pipe)
s5hl.Println("pipe finished")
}()
}
@@ -60,8 +91,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Printf("start pipegram at %v\n", *ppipe)
- server.ServeGramPipe(*ppipe)
+ s5hl.Printf("starting pipegram at %v\n", *ppipe)
+ srv.ServePipegram(*ppipe)
s5hl.Println("pipegram finished")
}()
}
@@ -70,8 +101,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start web-srv")
- server.ServeWeb(*vizAppDir)
+ s5hl.Println("starting web")
+ srv.ServeWeb()
s5hl.Println("web finished")
}()
}
@@ -80,8 +111,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start forward")
- server.RunForwarding(*forward)
+ s5hl.Println("starting forward")
+ srv.RunForwarding(*forward)
s5hl.Println("forward finished")
}()
}
@@ -90,8 +121,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start elastic-search forward")
- server.RunForwardingToElasticSearch(*forwardES)
+ s5hl.Println("starting elastic-search forward")
+ srv.RunForwardingEs(*forwardES)
s5hl.Println("elastic-search forward finished")
}()
}
@@ -100,8 +131,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start graphite forward")
- server.RunForwardingToGraphite(*forwardGraphite, *graphiteBasePath)
+ s5hl.Println("starting graphite forward")
+ srv.RunForwardingGraphite(*forwardGraphite, *graphiteBasePath)
s5hl.Println("graphite forward finished")
}()
}
@@ -110,8 +141,8 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- s5hl.Println("start piwik forward")
- server.RunForwardingToPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken)
+ s5hl.Println("starting piwik forward")
+ srv.RunForwardingPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken)
s5hl.Println("piwik forward finished")
}()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index cd65cf7..26a7c83 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -5,71 +37,80 @@ import (
"fmt"
)
-type StatsDecoder interface {
- Decode(jsonString []byte) (StatisticsData, error)
-}
-
-type StatsEncoder interface {
- Encode(data StatisticsData) []byte
-}
+const (
+ ProtocolVersion = 1
+)
-type FilterDecoder interface {
- Decode(jsonString []byte) (StatsFilter, error)
-}
+//
+// Decoder
+//
-type StatefulDecoder struct {
- sourceId SourceId
+type Decoder interface {
+ Decode(jsonString []byte) (DataUpdateFull, error)
}
-type PlainDecoder struct{}
-
-type PlainEncoder struct{}
+// stateless protocol interfaces
+type StatelessDecoder struct{}
-type filterDecoder struct{}
+func NewStatelessDecoder() Decoder {
+ return &StatelessDecoder{}
+}
-func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) {
- res := new(StatefulDecoder)
- err = json.Unmarshal(jsonString, &res.sourceId)
- if err != nil {
+func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+ if err = json.Unmarshal(jsonString, &res); err != nil {
return
}
- if res.sourceId.Version != 1 {
- err = fmt.Errorf("unsupported version, expected 1, actual %v", res.sourceId.Version)
+ if res.Version != ProtocolVersion {
+ err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion)
}
- decoder = res
return
}
-func NewPlainDecoder() StatsDecoder {
- return new(PlainDecoder)
+// stateful protocol interfaces
+type StatefulDecoder struct {
+ Version uint
+ SourceId
}
-func NewFilterDecoder() FilterDecoder {
- return new(filterDecoder)
+func NewStatefulDecoder(jsonString []byte) (Decoder, error) {
+ res := &StatefulDecoder{}
+ if err := json.Unmarshal(jsonString, &res); err != nil {
+ return nil, err
+ }
+ if res.Version != ProtocolVersion {
+ return nil, fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion)
+ }
+ return res, nil
}
-func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) {
- dat.CopyFromSourceId(&self.sourceId)
- err = json.Unmarshal(jsonString, &dat)
- // like in PlainDecoder, let the client decide how to use partial results
- // (Unmarshal returns partial results in case of errors)
+func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+ res.Version = sd.Version
+ res.CopyFromSourceId(&sd.SourceId)
+ if err = json.Unmarshal(jsonString, &res); err != nil {
+ return
+ }
+ if res.Version != sd.Version {
+ err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, sd.Version)
+ }
return
}
-func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) {
- err = json.Unmarshal(jsonString, &dat)
- return
+//
+// Encoder
+//
+
+type Encoder interface {
+ Encode(data DataUpdateFull) ([]byte, error)
}
-func (self *PlainEncoder) Encode(data *StatisticsData) []byte {
- res, err := json.Marshal(data)
- if err != nil {
- s5l.Panicln("failed to encode StatisticsData")
- }
- return res
+type StatelessEncoder struct{}
+
+func NewStatelessEncoder() Encoder {
+ return &StatelessEncoder{}
}
-func (self *filterDecoder) Decode(jsonString []byte) (dat StatsFilter, err error) {
- err = json.Unmarshal(jsonString, &dat)
- return
+func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) {
+ data.Version = ProtocolVersion
+ data.StartTime = data.StartTime.UTC()
+ return json.Marshal(data)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 32f35dd..5a79d85 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -1,62 +1,150 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
+ "fmt"
"reflect"
"testing"
"time"
)
var (
- sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"], "version": 1`
- sourceIdData = `{` + sourceIdFields + `}`
- sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}, Version: 1}
- updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
- updateData = "{" + updateFields + "}"
- updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
- testData = "{" + sourceIdFields + "," + updateFields + "}"
+ initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
+ initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
+ updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
+ updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
)
-func GetExpected() *StatisticsData {
- expected := new(StatisticsData)
- expected.CopyFromSourceId(&sourceIdDataStruct)
+func GetExpected() (expected DataUpdateFull) {
+ expected.Version = ProtocolVersion
+ expected.CopyFromSourceId(&initDataStruct)
expected.CopyFromUpdate(&updateDataStruct)
- return expected
+ return
}
func TestDecodeStateful(t *testing.T) {
- dc, err := NewStatefulDecoder([]byte(sourceIdData))
+ // invalid init message
+ if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil {
+ t.Fatalf("creating decoder with invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded)
+ if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("creating decoder with wrong protocol version should throw an error")
+ }
+
+ // valid init message
+ statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
+ dec, err := NewStatefulDecoder([]byte(statefulInitMsg))
if err != nil {
- t.Errorf("Creating decoder failed with %v", err)
- return
+ t.Fatalf("creating decoder failed: %v", err)
+ }
+
+ // invalid data-update message
+ if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil {
+ t.Fatalf("decoding message with invalid json should throw an error")
}
- dat, err := dc.Decode([]byte(testData))
+
+ // wrong protocol version
+ wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded)
+ if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
+ }
+
+ // valid data-update message
+ statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded)
+ decoded, err := dec.Decode([]byte(statefulUpdateMsg))
if err != nil {
- t.Errorf("Decode failed with %v", err)
- return
+ t.Fatalf("decoding message failed: %v", err)
}
+
+ // compare with expected result
expected := GetExpected()
- if !reflect.DeepEqual(dat, *expected) {
- t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
}
-func TestDecodePlain(t *testing.T) {
- ec := new(PlainDecoder)
- dat, err := ec.Decode([]byte(testData))
+func TestDecodeStateless(t *testing.T) {
+ dec := NewStatelessDecoder()
+
+ // invalid message
+ if _, err := dec.Decode([]byte("this is still not json")); err == nil {
+ t.Fatalf("decoding invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)
+ if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
+ }
+
+ // valid message
+ statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)
+ decoded, err := dec.Decode([]byte(statelessDataMsg))
if err != nil {
- t.Errorf("Decode failed with %v", err)
- return
+ t.Fatalf("decoding message failed: %v", err)
}
+
+ // compare with expected result
expected := GetExpected()
- if !reflect.DeepEqual(dat, *expected) {
- t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
}
-func TestEncode(t *testing.T) {
- ec := new(PlainEncoder)
- td := new(StatisticsData)
- td.CopyFromSourceId(&sourceIdDataStruct)
+func TestEncodeStateless(t *testing.T) {
+ enc := NewStatelessEncoder()
+
+ var td DataUpdateFull
+ td.CopyFromSourceId(&initDataStruct)
td.CopyFromUpdate(&updateDataStruct)
- t.Logf("dada: %v", ec.Encode(td))
+
+ encoded, err := enc.Encode(td)
+ if err != nil {
+ t.Fatalf("encoding message failed: %v", err)
+ }
+
+ decoded, err := NewStatelessDecoder().Decode(encoded)
+ if err != nil {
+ t.Fatalf("decoding message failed: %v", err)
+ }
+
+ expected := td
+ expected.Version = ProtocolVersion
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("encoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
+ }
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go
index 9b80f6f..4801902 100644
--- a/src/hub/src/spreadspace.org/sfive/s5log.go
+++ b/src/hub/src/spreadspace.org/sfive/s5log.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -9,6 +41,6 @@ import (
var (
s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags)
// use ioutil.Discard to switch that thing off
- // s5tl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags)
- s5tl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags)
+ // s5dl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags)
+ s5dl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags)
)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 9cf4c64..76805c3 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -1,40 +1,60 @@
-package sfive
-
-import "time"
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
-type appendManyToken struct {
- data []StatisticsData
- response chan bool
-}
+package sfive
-type queryStatsResult struct {
- stats StatsResult
- err error
+type appendToken struct {
+ data DataUpdateFull
+ response chan error
}
-type queryStatsToken struct {
- filter *StatsFilter
- response chan queryStatsResult
+type appendManyToken struct {
+ data []DataUpdateFull
+ response chan error
}
type getUpdatesResult struct {
- values []StatisticsData
+ values []DataUpdateFull
err error
}
type getUpdatesAfterToken struct {
id int
- response chan getUpdatesResult
-}
-
-type getUpdatesToken struct {
- filter *StatsFilter
+ limit int
response chan getUpdatesResult
}
type getHubIdResult struct {
- id string
- err error
+ id string
}
type getHubIdToken struct {
@@ -50,141 +70,108 @@ type getLastUpdateIdToken struct {
response chan getLastUpdateIdResult
}
-type StatsSinkServer struct {
- store sqliteStore
+type Server struct {
+ store Store
quit chan bool
done chan bool
- appendData chan StatisticsData
- appendManyData chan appendManyToken // chan []StatisticsData
- getStatsChan chan queryStatsToken
+ appendChan chan appendToken
+ appendManyChan chan appendManyToken
getUpdatesAfterChan chan getUpdatesAfterToken
- getUpdatesChan chan getUpdatesToken
getHubIdChan chan getHubIdToken
getLastUpdateIdChan chan getLastUpdateIdToken
}
-func (self StatsSinkServer) appendActor() {
- defer func() { self.done <- true }()
+func (srv Server) appendActor() {
+ defer func() { srv.done <- true }()
for {
select {
- case <-self.quit:
+ case <-srv.quit:
return
- case value := <-self.appendData:
- var err error
- for tryNum := 0; tryNum < 5; tryNum++ {
- err = self.store.Append(value)
- if err != nil {
- time.Sleep(1 * time.Second)
- } else {
- break
- }
- }
- if err != nil {
- s5l.Printf("failed to store data: %v\n", err)
- }
- case token := <-self.appendManyData:
- err := self.store.AppendMany(token.data)
- if err != nil {
- s5l.Printf("failed to store many data: %v\n", err)
- token.response <- false
- } else {
- token.response <- true
- }
- case token := <-self.getStatsChan:
- stats, err := self.store.GetStats(token.filter)
- token.response <- queryStatsResult{stats, err}
- case token := <-self.getUpdatesAfterChan:
- values, err := self.store.GetUpdatesAfter(token.id)
- token.response <- getUpdatesResult{values, err}
- case token := <-self.getUpdatesChan:
- values, err := self.store.GetUpdates(token.filter)
+ case token := <-srv.appendChan:
+ token.response <- srv.store.Append(token.data)
+ case token := <-srv.appendManyChan:
+ token.response <- srv.store.AppendMany(token.data)
+ case token := <-srv.getUpdatesAfterChan:
+ values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
token.response <- getUpdatesResult{values, err}
- case token := <-self.getHubIdChan:
- storeId, err := self.store.GetStoreId()
- token.response <- getHubIdResult{storeId, err}
- case token := <-self.getLastUpdateIdChan:
- lastUpdateId, err := self.store.GetLastUpdateId()
- if lastUpdateId != nil {
- token.response <- getLastUpdateIdResult{*lastUpdateId, err}
- } else {
- token.response <- getLastUpdateIdResult{0, err}
- }
+ case token := <-srv.getHubIdChan:
+ token.response <- getHubIdResult{srv.store.GetStoreId()}
+ case token := <-srv.getLastUpdateIdChan:
+ lastUpdateId, err := srv.store.GetLastUpdateId()
+ token.response <- getLastUpdateIdResult{lastUpdateId, err}
}
}
}
-func (self StatsSinkServer) getUpdatesAfterInvoke(id int) ([]StatisticsData, error) {
- token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)}
+func (srv Server) Append(data DataUpdateFull) error {
+ token := appendToken{data: data, response: make(chan error, 1)}
defer close(token.response)
- self.getUpdatesAfterChan <- token
- res := <-token.response
- return res.values, res.err
+ srv.appendChan <- token
+ return <-token.response
}
-func (self StatsSinkServer) getUpdatesInvoke(filter *StatsFilter) ([]StatisticsData, error) {
- token := getUpdatesToken{filter: filter, response: make(chan getUpdatesResult, 1)}
+func (srv Server) AppendMany(data []DataUpdateFull) error {
+ token := appendManyToken{data: data, response: make(chan error, 1)}
defer close(token.response)
- self.getUpdatesChan <- token
- res := <-token.response
- return res.values, res.err
+ srv.appendManyChan <- token
+ return <-token.response
}
-func (self StatsSinkServer) getStatsInvoke(filter *StatsFilter) (StatsResult, error) {
- token := queryStatsToken{filter: filter, response: make(chan queryStatsResult, 1)}
+func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) {
+ token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)}
defer close(token.response)
- self.getStatsChan <- token
+ srv.getUpdatesAfterChan <- token
res := <-token.response
- return res.stats, res.err
+ return res.values, res.err
}
-func (self StatsSinkServer) getHubIdInvoke() (string, error) {
+func (srv Server) GetHubId() string {
token := getHubIdToken{response: make(chan getHubIdResult, 1)}
defer close(token.response)
- self.getHubIdChan <- token
+ srv.getHubIdChan <- token
res := <-token.response
- return res.id, res.err
+ return res.id
}
-func (self StatsSinkServer) getLastUpdateIdInvoke() (int, error) {
+func (srv Server) GetLastUpdateId() (int, error) {
token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)}
defer close(token.response)
- self.getLastUpdateIdChan <- token
+ srv.getLastUpdateIdChan <- token
res := <-token.response
return res.id, res.err
}
-func (self StatsSinkServer) Close() {
- self.quit <- true
- <-self.done
- close(self.quit)
- close(self.done)
- close(self.appendData)
- close(self.appendManyData)
- close(self.getStatsChan)
- close(self.getUpdatesAfterChan)
- close(self.getUpdatesChan)
- close(self.getHubIdChan)
- close(self.getLastUpdateIdChan)
- self.store.Close()
+func (srv Server) Close() {
+ s5l.Printf("server: shutting down\n")
+ srv.quit <- true
+ <-srv.done
+ close(srv.quit)
+ close(srv.done)
+ close(srv.appendChan)
+ close(srv.appendManyChan)
+ close(srv.getUpdatesAfterChan)
+ close(srv.getHubIdChan)
+ close(srv.getLastUpdateIdChan)
+ srv.store.Close()
+ s5l.Printf("server: finished\n")
}
-func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) {
+func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
// TODO read configuration and create instance with correct settings
- server = new(StatsSinkServer)
- server.store, err = NewStore(mysql, dbPath)
+ server = &Server{}
+ server.store, err = NewStore(dbPath, readOnly)
if err != nil {
return
}
server.quit = make(chan bool)
server.done = make(chan bool)
- server.appendData = make(chan StatisticsData, 5)
- server.appendManyData = make(chan appendManyToken, 5)
- server.getStatsChan = make(chan queryStatsToken, 5)
- server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1)
- server.getUpdatesChan = make(chan getUpdatesToken, 3)
- server.getHubIdChan = make(chan getHubIdToken, 1)
- server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1)
+ server.appendChan = make(chan appendToken, 32)
+ server.appendManyChan = make(chan appendManyToken, 32)
+ server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32)
+ server.getHubIdChan = make(chan getHubIdToken, 32)
+ server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32)
go server.appendActor()
+ s5l.Printf("server: started\n")
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index a072b2a..d6874de 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -9,24 +41,19 @@ import (
"time"
)
-func findMaxId(values []StatisticsData) int {
+func findMaxId(values []DataUpdateFull) int {
maxId := -1
for i := range values {
id := values[i].SourceHubDataUpdateId
- if id != nil && *id > maxId {
- maxId = *id
+ if id > maxId {
+ maxId = id
}
}
return maxId
}
-func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
- storeId, err = self.getHubIdInvoke()
-
- if err != nil {
- s5l.Printf("fwd: failed to get own hubid: %v\n", err)
- return
- }
+func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
+ storeId = srv.GetHubId()
var resp *http.Response
resp, err = client.Get(baseurl + "/lastupdate/" + storeId)
@@ -63,11 +90,11 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
-func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
+func (srv Server) forwardRun(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
for {
- lastId, _, err := self.getLastUpdate(baseurl, client)
+ lastId, _, err := srv.forwardGetLastUpdate(baseurl, client)
if err != nil {
s5l.Printf("fwd: lastupdate returned err: %v", err)
@@ -79,7 +106,7 @@ tryResync:
nextBatch:
for {
- updates, err := self.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
@@ -93,7 +120,7 @@ tryResync:
continue nextBatch
}
- data, err := json.Marshal(StatisticsDataContainer{updates})
+ data, err := json.Marshal(DataUpdateFullContainer{updates})
if err != nil {
s5l.Panicf("fwd: encode failed: %v\n", err)
@@ -116,11 +143,10 @@ tryResync:
s5l.Printf("fwd: post OK")
lastId = findMaxId(updates)
s5l.Printf("fwd: new lastid: %d", lastId)
- time.Sleep(1 * time.Second)
}
}
}
-func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
- self.handleForwarding(forwardBaseUrl, http.DefaultClient)
+func (srv Server) RunForwarding(forwardBaseUrl string) {
+ srv.forwardRun(forwardBaseUrl, http.DefaultClient)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 19abb1e..8880b8a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -10,23 +42,18 @@ import (
"time"
)
-const lastUpdateJson = `{
+const forwardEsLastUpdateJson = `{
"query": {"match": { "SourceHubUuid": "%s" } },
"aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } }
}`
-func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
+func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
url := baseurl + "/dataupdate/_search?search_type=count"
- storeId, err = self.getHubIdInvoke()
-
- if err != nil {
- s5l.Printf("fwd-es: failed to get own hubid: %v\n", err)
- return
- }
+ storeId = srv.GetHubId()
- queryJson := fmt.Sprintf(lastUpdateJson, storeId)
- s5tl.Printf("fwd-es: query: %s", queryJson)
+ queryJson := fmt.Sprintf(forwardEsLastUpdateJson, storeId)
+ s5dl.Printf("fwd-es: query: %s", queryJson)
var resp *http.Response
resp, err = client.Post(url, "application/json", strings.NewReader(queryJson))
@@ -48,7 +75,7 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client)
return
}
- s5tl.Printf("fwd-es: lastupdate response: %s\n", body)
+ s5dl.Printf("fwd-es: lastupdate response: %s\n", body)
if len(body) == 0 {
latestId = -1
@@ -73,11 +100,11 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client)
return
}
-func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
+func (srv Server) forwardEsRun(baseurl string, client *http.Client) {
url := baseurl + "/_bulk"
tryResync:
for {
- lastId, _, err := self.getLastUpdateEs(baseurl, client)
+ lastId, _, err := srv.forwardEsGetLastUpdate(baseurl, client)
if err != nil {
s5l.Printf("fwd-es: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -87,7 +114,7 @@ tryResync:
nextBatch:
for {
- updates, err := self.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd-es: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
@@ -114,8 +141,6 @@ tryResync:
postData.WriteRune('\n')
}
- //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes()))
-
s5l.Printf("fwd-es: marshal OK")
resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes()))
@@ -136,11 +161,10 @@ tryResync:
s5l.Printf("fwd-es: all posts OK")
lastId = findMaxId(updates)
s5l.Printf("fwd-es: new lastid: %d", lastId)
- //time.Sleep(1 * time.Second)
}
}
}
-func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) {
- self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient)
+func (srv Server) RunForwardingEs(forwardBaseUrl string) {
+ srv.forwardEsRun(forwardBaseUrl, http.DefaultClient)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index 9779960..2c0043b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -7,8 +39,8 @@ import (
"github.com/equinox0815/graphite-golang"
)
-func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) {
- latestId, err = self.getLastUpdateIdInvoke()
+func (srv Server) forwardGraphiteGetLastUpdate(conn *graphite.Graphite) (latestId int, storeId string, err error) {
+ latestId, err = srv.GetLastUpdateId()
if err != nil {
s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err)
return
@@ -17,7 +49,7 @@ func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (late
return
}
-func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) {
+func (srv Server) forwardGraphiteRun(forwardHost string, basePath string) {
tryResync:
for {
client, err := graphite.NewGraphiteFromAddress(forwardHost)
@@ -27,7 +59,7 @@ tryResync:
continue tryResync
}
- lastId, _, err := self.getLastUpdateGraphite(client)
+ lastId, _, err := srv.forwardGraphiteGetLastUpdate(client)
if err != nil {
s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
client.Disconnect()
@@ -38,7 +70,7 @@ tryResync:
nextBatch:
for {
- updates, err := self.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd-graphite: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
@@ -75,11 +107,10 @@ tryResync:
s5l.Printf("fwd-graphite: all metrics sent")
lastId = findMaxId(updates)
s5l.Printf("fwd-graphite: new lastid: %d", lastId)
- //time.Sleep(1 * time.Second)
}
}
}
-func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) {
- self.handleForwardingToGraphite(forwardHost, basePath)
+func (srv Server) RunForwardingGraphite(forwardHost string, basePath string) {
+ srv.forwardGraphiteRun(forwardHost, basePath)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index 5a25622..3d44500 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -12,15 +44,15 @@ import (
"time"
)
-type PiwikBulkRequest struct {
+type forwardPiwikBulkRequest struct {
Requests []string `json:"requests"`
TokenAuth string `json:"token_auth"`
}
-func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) {
+func (srv Server) forwardPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) {
// TODO: ask piwik what the last update was...
- latestId, err = 0, nil //self.getLastUpdateIdInvoke()
+ latestId, err = 0, nil //srv.getLastUpdateIdInvoke()
if err != nil {
s5l.Printf("fwd-piwik: failed to get own hubid: %v\n", err)
return
@@ -29,10 +61,10 @@ func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID
return
}
-func (self StatsSinkServer) handleForwardingToPiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
+func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
tryResync:
for {
- lastId, _, err := self.getLastUpdatePiwik(piwikURL, siteURL, siteID, token, client)
+ lastId, _, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client)
if err != nil {
s5l.Printf("fwd-piwik: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -42,7 +74,7 @@ tryResync:
nextBatch:
for {
- updates, err := self.getUpdatesAfterInvoke(lastId)
+ updates, err := srv.GetUpdatesAfter(lastId, 5000)
if err != nil {
s5l.Printf("fwd-piwik: failed reading updates: %v\n", err)
time.Sleep(500 * time.Millisecond)
@@ -56,7 +88,7 @@ tryResync:
continue nextBatch
}
- req := PiwikBulkRequest{TokenAuth: token}
+ req := forwardPiwikBulkRequest{TokenAuth: token}
for _, update := range updates {
if len(update.Data.Clients) == 0 {
continue
@@ -101,11 +133,10 @@ tryResync:
s5l.Printf("fwd-piwik: all posts OK")
lastId = findMaxId(updates)
s5l.Printf("fwd-piwik: new lastid: %d", lastId)
- //time.Sleep(1 * time.Second)
}
}
}
-func (self StatsSinkServer) RunForwardingToPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) {
- self.handleForwardingToPiwik(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient)
+func (srv Server) RunForwardingPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) {
+ srv.forwardPiwikRun(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 8084461..4b48d99 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -1,15 +1,50 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
"bufio"
+ "io"
"net"
)
-func (self StatsSinkServer) handleConnection(conn net.Conn) {
+func (srv Server) pipeHandle(conn net.Conn) {
reader := bufio.NewReader(conn)
buffer, err := reader.ReadBytes('\n')
if err != nil {
- s5l.Printf("pipe: failed to read from connection: %v\n", err)
+ if err != io.EOF {
+ s5l.Printf("pipe: failed to read from connection: %v\n", err)
+ }
return
}
marshaller, err := NewStatefulDecoder(buffer)
@@ -21,42 +56,47 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) {
for {
buffer, err := reader.ReadBytes('\n')
if err != nil {
- s5l.Printf("pipe: failed to read from connection: %v\n", err)
+ if err != io.EOF {
+ s5l.Printf("pipe: failed to read from connection: %v\n", err)
+ }
return
}
- // s5l.Printf("msg: %v", string(buffer))
-
value, err := marshaller.Decode(buffer)
if err != nil {
s5l.Printf("pipe: failed to decode message: %v\n", err)
continue
}
- self.appendData <- value
+ if err = srv.Append(value); err != nil {
+ s5l.Printf("pipe: failed to store data: %v\n", err)
+ }
}
}
-func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) {
- decoder := NewPlainDecoder()
+func (srv Server) pipegramHandle(pconn net.PacketConn) {
+ decoder := NewStatelessDecoder()
buffer := make([]byte, 64*1024)
for {
n, _, err := pconn.ReadFrom(buffer)
if err != nil {
- s5l.Printf("p-pipe: failed read: %v", err)
+ s5l.Printf("pipegram: failed read: %v", err)
continue
}
data := buffer[0:n]
value, err := decoder.Decode(data)
- if err == nil {
- self.appendData <- value
- } else {
- s5l.Printf("p-pipe: failed to decode message: %v\n", err)
+ if err != nil {
+ s5l.Printf("pipegram: failed to decode message: %v\n", err)
+ continue
+ }
+
+ if err = srv.Append(value); err != nil {
+ s5l.Printf("pipegram: failed to store data: %v\n", err)
}
}
}
-func (self StatsSinkServer) ServePipe(pipePath string) {
+func (srv Server) ServePipe(pipePath string) {
ln, err := net.Listen("unix", pipePath)
if err != nil {
s5l.Printf("pipe: failed to connect: %v", err)
@@ -71,17 +111,17 @@ func (self StatsSinkServer) ServePipe(pipePath string) {
// ignore
continue
}
- go self.handleConnection(conn)
+ go srv.pipeHandle(conn)
}
}
-func (self StatsSinkServer) ServeGramPipe(pipePath string) {
+func (srv Server) ServePipegram(pipePath string) {
pconn, err := net.ListenPacket("unixgram", pipePath)
if err != nil {
- s5l.Printf("p-pipe: failed to listen: %v", err)
+ s5l.Printf("pipegram: failed to listen: %v", err)
return
}
defer pconn.Close()
- self.handlePacketConn(pconn)
+ srv.pipegramHandle(pconn)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index af9e986..80af8c5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -1,3 +1,35 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
@@ -5,26 +37,25 @@ import (
"fmt"
"io/ioutil"
"net/http"
- "os"
"strconv"
- "time"
"github.com/zenazn/goji"
"github.com/zenazn/goji/web"
)
-func hello(c web.C, w http.ResponseWriter, r *http.Request) {
- fmt.Fprintf(w, "Hello, %s!", c.URLParams["name"])
+func (srv Server) webHealthz(c web.C, w http.ResponseWriter, r *http.Request) {
+ // TODO: do a more sophisticated check
+ fmt.Fprintf(w, "%s\n", srv.GetHubId())
}
-func (self StatsSinkServer) getTagList(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "tags"
- values, err := self.store.GetTags()
+func (srv Server) webGetHubsList(c web.C, w http.ResponseWriter, r *http.Request) {
+ const resourceName = "hubs"
+ values, err := srv.store.GetHubs()
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- jsonString, err := json.Marshal(DataContainer{values})
+ jsonString, err := json.Marshal(GenericDataContainer{values})
if err != nil {
http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
return
@@ -32,14 +63,14 @@ func (self StatsSinkServer) getTagList(c web.C, w http.ResponseWriter, r *http.R
fmt.Fprintf(w, "%s", jsonString)
}
-func (self StatsSinkServer) getSourcesList(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetSourcesList(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "sources"
- values, err := self.store.GetSources()
+ values, err := srv.store.GetSources()
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- jsonString, err := json.Marshal(DataContainer{values})
+ jsonString, err := json.Marshal(GenericDataContainer{values})
if err != nil {
http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
return
@@ -47,16 +78,20 @@ func (self StatsSinkServer) getSourcesList(c web.C, w http.ResponseWriter, r *ht
fmt.Fprintf(w, "%s", jsonString)
}
-func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetSource(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "source"
id, err := strconv.ParseInt(c.URLParams["id"], 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest)
return
}
- value, err := self.store.GetSource(int(id))
+ value, err := srv.store.GetSource(int(id))
if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ if err == ErrNotFound {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound)
+ } else {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ }
return
}
jsonString, err := json.Marshal(value)
@@ -67,78 +102,14 @@ func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Re
fmt.Fprintf(w, "%s", jsonString)
}
-func getFilter(r *http.Request) (filter StatsFilter) {
- from := r.FormValue("from")
- if from != "" {
- fromT, err := time.Parse(time.RFC3339, from)
- if err == nil {
- filter.start = &fromT
- }
- }
-
- to := r.FormValue("to")
- if to != "" {
- toT, err := time.Parse(time.RFC3339, to)
- if err == nil {
- filter.end = &toT
- }
- }
-
- hostname := r.FormValue("hostname")
- if hostname != "" {
- filter.hostname = &hostname
- }
-
- contentId := r.FormValue("contentId")
- if contentId != "" {
- filter.contentId = &contentId
- }
-
- format := r.FormValue("format")
- if format != "" {
- filter.format = &format
- }
-
- quality := r.FormValue("quality")
- if quality != "" {
- filter.quality = &quality
- }
-
- afterUpdateId := r.FormValue("afterUpdateId")
- if afterUpdateId != "" {
- id, err := strconv.ParseInt(afterUpdateId, 10, 32)
- if err == nil {
- idInt := int(id)
- filter.afterUpdateId = &idInt
- }
- }
-
- limit := r.FormValue("limit")
- if limit != "" {
- limitInt, err := strconv.ParseInt(limit, 10, 32)
- if err == nil {
- limitIntInt := int(limitInt)
- filter.limit = &limitIntInt
- }
- }
-
- sortOrder := r.FormValue("sortOrder")
- if sortOrder != "" {
- filter.sortOrder = &sortOrder
- }
-
- return
-}
-
-func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "updates"
- filter := getFilter(r)
- values, err := self.getUpdatesInvoke(&filter)
+ values, err := srv.GetUpdatesAfter(-1, 3) // TODO: get start and limit from param
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- jsonString, err := json.Marshal(DataContainer{values})
+ jsonString, err := json.Marshal(GenericDataContainer{values})
if err != nil {
http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
return
@@ -146,16 +117,20 @@ func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *htt
fmt.Fprintf(w, "%s", jsonString)
}
-func (self StatsSinkServer) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
id, err := strconv.ParseInt(c.URLParams["id"], 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("invalid id: %s: %v", resourceName, err), http.StatusBadRequest)
return
}
- value, err := self.store.GetUpdate(int(id))
+ value, err := srv.store.GetUpdate(int(id))
if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ if err == ErrNotFound {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusNotFound)
+ } else {
+ http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
+ }
return
}
jsonString, err := json.Marshal(value)
@@ -166,9 +141,9 @@ func (self StatsSinkServer) getUpdate(c web.C, w http.ResponseWriter, r *http.Re
fmt.Fprintf(w, "%s", jsonString)
}
-func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
- decoder := NewPlainDecoder()
+ decoder := NewStatelessDecoder()
buffer, err := ioutil.ReadAll(r.Body)
if err != nil {
@@ -177,17 +152,14 @@ func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.R
return
}
- container := StatisticsDataContainer{}
+ // TODO: add different API endpoint called bulk
+ container := DataUpdateFullContainer{}
err = json.Unmarshal(buffer, &container)
if err == nil {
- token := appendManyToken{
- data: container.Data,
- response: make(chan bool, 2)}
- defer close(token.response)
- self.appendManyData <- token
- success := <-token.response
- if !success {
- http.Error(w, "failed to store data", http.StatusInternalServerError)
+ if err = srv.AppendMany(container.Data); err != nil {
+ http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+ } else {
+ fmt.Fprintf(w, "%d update(s) successfully stored.", len(container.Data))
}
return
}
@@ -200,59 +172,43 @@ func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.R
return
}
- self.appendData <- data
- // TODO send response channel, wait for OK
+ if err = srv.Append(data); err != nil {
+ http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+ } else {
+ fmt.Fprintf(w, "1 update successfully stored.")
+ }
}
-func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
+func (srv Server) webGetLastUpdateId(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "lastupdateid"
- id := c.URLParams["id"]
- value, err := self.store.GetLastUpdateForUuid(id)
+ value, err := srv.store.GetLastUpdateId()
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- if value != nil {
- fmt.Fprintf(w, "%d", *value)
- }
+ fmt.Fprintf(w, "%d", value)
}
-func (self StatsSinkServer) getStats(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "stats"
- filter := getFilter(r)
- values, err := self.getStatsInvoke(&filter)
-
+func (srv Server) webGetLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) {
+ const resourceName = "lastupdateid"
+ id := c.URLParams["id"]
+ value, err := srv.store.GetLastUpdateForUuid(id)
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
}
- jsonString, err := json.Marshal(values)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- fmt.Fprintf(w, "%s", jsonString)
+ fmt.Fprintf(w, "%d", value)
}
-func (self StatsSinkServer) ServeWeb(vizAppLocation string) {
- if _, err := os.Stat(vizAppLocation); err != nil {
- if os.IsNotExist(err) {
- s5l.Panicf("web: viz-app at %s does not exist.", vizAppLocation)
- } else {
- s5l.Printf("web: failed to stat %s: %v", vizAppLocation, err)
- }
- }
-
- goji.Get("/hello/:name", hello)
- goji.Get("/tags", self.getTagList)
- goji.Get("/sources", self.getSourcesList)
- goji.Get("/sources/:id", self.getSource)
- goji.Get("/updates", self.getUpdateList)
- goji.Get("/updates/:id", self.getUpdate)
- goji.Post("/updates", self.postUpdate)
- goji.Get("/lastupdate/:id", self.getLastUpdateIdForUuid)
- goji.Get("/stats", self.getStats)
- goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation))))
-
+func (srv Server) ServeWeb() {
+ goji.Get("/healthz", srv.webHealthz)
+ goji.Get("/hubs", srv.webGetHubsList)
+ goji.Get("/sources", srv.webGetSourcesList)
+ goji.Get("/sources/:id", srv.webGetSource)
+ goji.Get("/updates", srv.webGetUpdateList)
+ goji.Get("/updates/:id", srv.webGetUpdate)
+ goji.Post("/updates", srv.webPostUpdate)
+ goji.Get("/lastupdate", srv.webGetLastUpdateId)
+ goji.Get("/lastupdate/:id", srv.webGetLastUpdateIdForUuid)
goji.Serve()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 6fb5f8a..a59166f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -1,643 +1,562 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
- "database/sql"
+ "encoding/json"
+ "errors"
"fmt"
+ "os"
"time"
- _ "github.com/mattn/go-sqlite3"
+ "github.com/boltdb/bolt"
"github.com/pborman/uuid"
- "gopkg.in/gorp.v2"
)
-type sqliteStore struct {
- db *gorp.DbMap
- hubId string
-}
-
-func tagsFromStatisticsData(value StatisticsData) []tagDb {
- tags := make([]tagDb, len(value.SourceId.Tags))
- for i := range value.SourceId.Tags {
- tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]}
- }
- return tags
-}
-
-func sourceFromStatisticsData(value StatisticsData) sourceDb {
- return sourceDb{
- -1,
- StreamId{
- ContentId: value.SourceId.StreamId.ContentId,
- Format: value.SourceId.StreamId.Format,
- Quality: value.SourceId.StreamId.Quality,
- },
- SourceId{
- Hostname: value.SourceId.Hostname},
- }
-}
-
-func clientsFromStatisticsData(value StatisticsData) []clientDataDb {
- res := make([]clientDataDb, len(value.Data.Clients))
- for i := range value.Data.Clients {
- res[i] = clientDataDb{-1, -1, value.Data.Clients[i]}
- }
- return res
-}
-
-func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
- return dataUpdateDb{
- -1,
- -1,
- value.SourceHubUuid,
- value.SourceHubDataUpdateId,
- value.StartTime.Unix(),
- value.Duration,
- value.Data.ClientCount,
- value.Data.BytesReceived,
- value.Data.BytesSent}
-}
+const (
+ StoreGetUpdatesLimit = 42000
+ StoreVersion = 1
+)
-func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) {
- du := dataUpdateFromStatisticsData(value)
- cd := clientsFromStatisticsData(value)
- src := sourceFromStatisticsData(value)
- tags := tagsFromStatisticsData(value)
+var (
+ storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, dataUpdatesBn,
+ sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn}
+)
- return du, cd, src, tags
+type Store struct {
+ version int
+ hubUuid string
+ db *bolt.DB
+ readOnly bool
}
-func initDb(mysql bool, path string) (res *gorp.DbMap, hubId string, err error) {
- // connect to db using standard Go database/sql API
- var db *sql.DB
- var dialect gorp.Dialect
+//
+// Initialization and Destruction
+//
- if mysql {
- db, err = sql.Open("mysql", path)
- if err != nil {
- return
+func openDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid string, err error) {
+ if _, err = os.Stat(dbPath); err != nil {
+ if os.IsNotExist(err) {
+ err = nil
}
- dialect = gorp.MySQLDialect{Engine: "InnoDB", Encoding: "UTF8"}
- } else {
- db, err = sql.Open("sqlite3", path)
- if err != nil {
- return
- }
- dialect = gorp.SqliteDialect{}
+ return
}
-
- dbmap := &gorp.DbMap{Db: db, Dialect: dialect}
- // dbmap.TraceOn("[gorp]", log.New(os.Stdout, "myapp:", log.Lmicroseconds))
-
- dbmap.AddTableWithName(tagDb{}, tagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true)
- dbmap.AddTableWithName(sourceTagsDb{}, sourceTagsTn).SetKeys(false, "TagId", "SourceId")
- dbmap.AddTableWithName(sourceDb{}, sourcesTn).SetKeys(true, "Id").SetUniqueTogether("ContentId", "Format", "Quality", "Hostname")
- dbmap.AddTableWithName(clientDataDb{}, clientdataUpdatesTn).SetKeys(true, "Id")
- dbmap.AddTableWithName(dataUpdateDb{}, dataUpdatesTn).SetKeys(true, "Id")
- dbmap.AddTableWithName(hubInfoDb{}, hubInfoTn).SetKeys(false, "Name")
-
- // TODO use some real migration, yadda yadda
- err = dbmap.CreateTablesIfNotExists()
+ opts := &bolt.Options{Timeout: 100 * time.Millisecond, ReadOnly: readOnly}
+ db, err = bolt.Open(dbPath, 0600, opts)
if err != nil {
return
}
- hubId, err = dbmap.SelectStr("select Value from " + hubInfoTn + " where Name = 'HubUuid'")
- // TODO handle only not-found this way
- if err != nil || hubId == "" {
- hubId = uuid.New()
- _, err = db.Exec("insert into "+hubInfoTn+" values ('HubUuid', ?)", hubId)
- if err != nil {
- hubId = ""
- return
+ err = db.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(hubInfoBn))
+ if b == nil {
+ return errors.New("store: failed to open, bucket '" + hubInfoBn + "'does not exist.")
+ }
+ bVersion := b.Get([]byte(storeVersionKey))
+ if bVersion == nil {
+ return errors.New("store: failed to open, no hub version found.")
+ }
+ version = btoi(bVersion)
+ if version != StoreVersion {
+ return fmt.Errorf("store: failed to open, wrong hub version: %d (expected: %d)", version, StoreVersion)
}
- }
-
- res = dbmap
- return
-}
-func isEmptyFilter(filter *StatsFilter) bool {
- if filter == nil {
- return true
- }
- if filter.start == nil &&
- filter.end == nil &&
- filter.hostname == nil &&
- filter.contentId == nil &&
- filter.format == nil &&
- filter.quality == nil &&
- (filter.tagsAny == nil || len(filter.tagsAny) == 0) &&
- filter.afterUpdateId == nil &&
- filter.limit == nil {
- return true
- }
- return false
-}
+ bHubId := b.Get([]byte(hubUuidKey))
+ if bHubId != nil {
+ hubUuid = string(bHubId)
+ }
+ if hubUuid == "" {
+ return errors.New("store: failed to open, UUID does not exist or is empty.")
+ }
-func insertAnd(needsAnd *bool) (res string) {
- if *needsAnd {
- res = " and"
- *needsAnd = false
+ for _, bn := range storeBuckets {
+ if b := tx.Bucket([]byte(bn)); b == nil {
+ return errors.New("store: failed to open, bucket '" + bn + "' does not exist.")
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ db.Close()
}
return
}
-type dataUpdateQueryResult struct {
- dataUpdateDb
- StreamId
- SourceId
-}
-
-func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) {
- const baseQuery = "(select * from " + dataUpdatesTn + "," + sourcesTn + " on " + dataUpdatesTn + ".SourceId = " + sourcesTn + ".Id"
- if isEmptyFilter(filter) {
- return baseQuery + ")", nil
+func createDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err error) {
+ db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond})
+ if err != nil {
+ return
}
- query := baseQuery
- parameters := make(map[string]interface{})
- needsAnd := false
-
- if filter.start != nil || filter.end != nil || filter.afterUpdateId != nil {
- query += " WHERE"
-
- if filter.start != nil {
- query += insertAnd(&needsAnd)
- query += " StartTime >= :filterstart"
- parameters["filterstart"] = filter.start.Unix()
- needsAnd = true
+ err = db.Update(func(tx *bolt.Tx) error {
+ for _, bn := range storeBuckets {
+ if _, err := tx.CreateBucket([]byte(bn)); err != nil {
+ return err
+ }
}
- if filter.end != nil {
- query += insertAnd(&needsAnd)
- query += " StartTime < :filterend"
- parameters["filterend"] = filter.end.Unix()
- needsAnd = true
+
+ b, err := tx.CreateBucket([]byte(hubInfoBn))
+ if err != nil {
+ return err
}
- if filter.afterUpdateId != nil {
- query += insertAnd(&needsAnd)
- query += " " + dataUpdatesTn + ".Id > :afterUpdateId"
- parameters["afterUpdateId"] = *filter.afterUpdateId
- needsAnd = true
+ version = StoreVersion
+ if err := b.Put([]byte(storeVersionKey), itob(version)); err != nil {
+ return err
}
- }
-
- if filter.sortOrder != nil {
- if *filter.sortOrder == "desc" {
- query += " ORDER BY " + dataUpdatesTn + ".Id DESC"
+ hubUuid = uuid.New()
+ if err := b.Put([]byte(hubUuidKey), []byte(hubUuid)); err != nil {
+ return err
}
+ return nil
+ })
+ if err != nil {
+ db.Close()
}
- if filter.limit != nil {
- query += " LIMIT :limit"
- parameters["limit"] = *filter.limit
- }
-
- // TODO other fields
- query += ")"
- return query, parameters
-}
-func (s sqliteStore) findTag(name string) (tag *tagDb, err error) {
- t := tagDb{}
- err = s.db.SelectOne(&t, "select * from "+tagsTn+" where Name = ?", name)
- if err == nil {
- tag = &t
- }
return
}
-func (s sqliteStore) insertNewTags(tags []tagDb) (err error) {
- for i := range tags {
- var t *tagDb
- t, err = s.findTag(tags[i].Name)
- if err != nil {
- _, err = s.db.Exec("insert into "+tagsTn+" VALUES (NULL, ?)", tags[i].Name)
- }
- t, err = s.findTag(tags[i].Name)
+func NewStore(dbPath string, readOnly bool) (Store, error) {
+ db, version, hubid, err := openDb(dbPath, readOnly)
+ if err != nil {
+ return Store{}, err
+ }
- if err == nil {
- tags[i] = *t
+ if db != nil {
+ if readOnly {
+ s5l.Printf("store: opened read-only (UUID: %s)", hubid)
} else {
- break
+ s5l.Printf("store: opened (UUID: %s)", hubid)
}
+ return Store{version, hubid, db, readOnly}, nil
}
- return
-}
-
-func (s sqliteStore) findSource(src sourceDb) (res *sourceDb, err error) {
- t := sourceDb{}
- err = s.db.SelectOne(
- &t,
- "select Id from "+sourcesTn+" where ContentId = ? and Format = ? and Quality = ? and Hostname = ?",
- src.ContentId,
- src.Format,
- src.Quality,
- src.Hostname)
+ if readOnly {
+ return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.")
+ }
- if err == nil {
- res = &t
+ db, version, hubid, err = createDb(dbPath)
+ if err != nil {
+ return Store{}, err
}
+ s5l.Printf("store: initialized (UUID: %s)", hubid)
+ return Store{version, hubid, db, readOnly}, nil
+}
- return
+func (st Store) Close() {
+ s5l.Printf("store: closing")
+ st.db.Close()
}
-func (s sqliteStore) insertNewSource(src *sourceDb) (err error) {
- var t *sourceDb
- t, err = s.findSource(*src)
- if err == nil {
- *src = *t
- } else {
- err = s.db.Insert(src)
+//
+// Append data
+//
+
+// append key-value pairs to buckets
+
+func (st Store) insertNewHub(tx *bolt.Tx, hubUuid string) (hubId int, err error) {
+ if hubUuid == "" {
+ return
}
- return
-}
+ bf := tx.Bucket([]byte(hubUuidsFwdBn))
+ bf.FillPercent = 1.0 // we only do appends
+ br := tx.Bucket([]byte(hubUuidsRevBn))
+ br.FillPercent = 1.0 // we only do appends
-func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error) {
- st := make([]sourceTagsDb, len(tags))
- for i := range tags {
- st[i].TagId = tags[i].Id
- st[i].SourceId = src.Id
- }
- for i := range st {
- _, err = s.db.Exec(
- "insert or ignore into "+sourceTagsTn+" values (?,?)",
- st[i].TagId,
- st[i].SourceId)
- // err = s.db.Insert(&st[i])
- if err != nil {
- // TODO
- //fmt.Printf("st\n")
- return
- }
+ bHubId := bf.Get([]byte(hubUuid))
+ if bHubId != nil {
+ return btoi(bHubId), nil
}
- return
-}
-func (s sqliteStore) insertDataUpdateEntry(src sourceDb, du *dataUpdateDb) (err error) {
- du.SourceId = src.Id
- err = s.db.Insert(du)
- if err != nil {
- //fmt.Printf("du\n")
+ next, _ := bf.NextSequence()
+ hubId = int(next)
+ if err = bf.Put([]byte(hubUuid), itob(hubId)); err != nil {
return
}
- return
-}
-
-func (s sqliteStore) insertDataUpdateClientEntries(cd []clientDataDb, du dataUpdateDb) (err error) {
- for i := range cd {
- cd[i].DataUpdatesId = du.Id
- err = s.db.Insert(&cd[i])
- if err != nil {
- // TODO
- return
- }
+ if err = br.Put(itob(hubId), []byte(hubUuid)); err != nil {
+ return
}
- return
+
+ return hubId, err
}
-func (s sqliteStore) appendItem(du dataUpdateDb, cd []clientDataDb, src sourceDb, tags []tagDb) (err error) {
- err = s.insertNewTags(tags)
- if err != nil {
- return
+func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) {
+ bf := tx.Bucket([]byte(sourcesFwdBn))
+ // br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id)
+ br := tx.Bucket([]byte(sourcesRevBn))
+ br.FillPercent = 1.0 // we only do appends (with ever incrementing interger ids)
+
+ slug := src.Slug()
+ bSrcId := bf.Get([]byte(slug))
+ if bSrcId != nil {
+ return btoi(bSrcId), nil
}
- err = s.insertNewSource(&src)
- if err != nil {
- //fmt.Printf("src\n")
+ var jsonData []byte
+ if jsonData, err = json.Marshal(src); err != nil {
return
}
- err = s.insertSourceTagLinks(src, tags)
- if err != nil {
+ next, _ := bf.NextSequence()
+ srcId = int(next)
+ if err = bf.Put([]byte(slug), itob(srcId)); err != nil {
return
}
-
- err = s.insertDataUpdateEntry(src, &du)
- if err != nil {
+ if err = br.Put(itob(srcId), jsonData); err != nil {
return
}
- err = s.insertDataUpdateClientEntries(cd, du)
- if err != nil {
+ return srcId, err
+}
+
+func (st Store) insertDataUpdate(tx *bolt.Tx, du dataUpdateDb) (duId int, err error) {
+ b := tx.Bucket([]byte(dataUpdatesBn))
+ b.FillPercent = 1.0 // we only do appends
+
+ next, _ := b.NextSequence()
+ duId = int(next)
+
+ var jsonData []byte
+ if jsonData, err = json.Marshal(du); err != nil {
return
}
-
+ err = b.Put(itob(duId), jsonData)
return
}
-// this function is the biggest pile of copy/pasted crap while sick that is still compilable.
-func (s sqliteStore) Append(update StatisticsData) (err error) {
- var tx *gorp.Transaction
- tx, err = s.db.Begin()
- if err != nil {
- return
+func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) {
+ bf := tx.Bucket([]byte(userAgentsFwdBn))
+ bf.FillPercent = 1.0 // we only do appends
+ br := tx.Bucket([]byte(userAgentsRevBn))
+ br.FillPercent = 1.0 // we only do appends
+
+ bUaId := bf.Get([]byte(ua))
+ if bUaId != nil {
+ return btoi(bUaId), nil
}
- du, cd, src, tags := updateFromStatisticsData(update)
- //s5l.Printf("blah: %v", du)
- err = s.appendItem(du, cd, src, tags)
- if err != nil {
- tx.Rollback()
+ next, _ := bf.NextSequence()
+ uaId = int(next)
+ if err = bf.Put([]byte(ua), itob(uaId)); err != nil {
+ return
+ }
+ if err = br.Put(itob(uaId), []byte(ua)); err != nil {
return
}
- return tx.Commit()
+ return uaId, err
}
-func (s sqliteStore) AppendMany(updates []StatisticsData) (err error) {
- var tx *gorp.Transaction
- tx, err = s.db.Begin()
- if err != nil {
- return
+func (st Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error {
+ if len(cd) == 0 {
+ return nil
}
- for _, update := range updates {
- du, cd, src, tags := updateFromStatisticsData(update)
- err = s.appendItem(du, cd, src, tags)
+ b := tx.Bucket([]byte(clientDataBn))
+ b.FillPercent = 1.0 // we only do appends
+
+ data := []clientDataDb{}
+ for _, c := range cd {
+ uaId, err := st.insertNewUserAgent(tx, c.UserAgent)
if err != nil {
- tx.Rollback()
- return
+ return err
}
+ data = append(data, clientDataDb{c.Ip, uaId, c.BytesSent})
}
- return tx.Commit()
+ jsonData, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ return b.Put(itob(duId), jsonData)
}
-func castArrayToString(value []interface{}) []string {
- res := make([]string, len(value))
- for i := range value {
- res[i] = value[i].(string)
+func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error {
+ b := tx.Bucket([]byte(latestUpdatesBn))
+ b.FillPercent = 1.0 // we only do appends
+
+ last := b.Get([]byte(uuid))
+ if last != nil && btoi(last) > duId {
+ return nil
}
- return res
+ return b.Put([]byte(uuid), itob(duId))
}
-func (s sqliteStore) GetTags() ([]string, error) {
- res, dbErr := s.db.Select("", "select Name from "+tagsTn)
- if dbErr == nil {
- sRes := castArrayToString(res)
- return sRes, nil
+// Split up the multidimensional dataupdate and append all the key-value pairs
+
+func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) {
+ du := NewDataUpdateDb(update)
+
+ srcUuid := update.SourceHubUuid
+ if du.SourceHubId, err = st.insertNewHub(tx, srcUuid); err != nil {
+ return
+ }
+ if du.SourceId, err = st.insertNewSource(tx, NewSourceDb(update)); err != nil {
+ return
+ }
+ if duId, err = st.insertDataUpdate(tx, du); err != nil {
+ return
+ }
+ if err = st.insertClientData(tx, duId, update.Data.Clients); err != nil {
+ return
}
- return nil, dbErr
-}
-func (s sqliteStore) GetTagsByDataUpdateId(id int) (res []string, err error) {
- var qres []interface{}
- qres, err = s.db.Select(
- tagDb{},
- "select * from "+tagsTn+" where Id in (select TagId from "+sourceTagsTn+" where SourceId = (select SourceId from "+dataUpdatesTn+" where Id = ?))", id)
- if err == nil {
- res = make([]string, len(qres))
- for i := range qres {
- res[i] = qres[i].(*tagDb).Name
- }
+ if srcUuid != "" {
+ err = st.setLastUpdateForUuid(tx, srcUuid, du.SourceHubDataUpdateId)
+ }
+ if fwdUuid := update.ForwardHubUuid; fwdUuid != "" {
+ err = st.setLastUpdateForUuid(tx, fwdUuid, update.ForwardHubDataUpdateId)
}
return
}
-func (s sqliteStore) GetSources() (res []sourceDb, err error) {
- var qres []interface{}
- qres, err = s.db.Select(sourceDb{}, "select * from "+sourcesTn)
- if err == nil {
- res = make([]sourceDb, len(qres))
- for i := range qres {
- res[i] = *qres[i].(*sourceDb)
- }
+// Public Append Interface
+
+func (st Store) AppendMany(updates []DataUpdateFull) (err error) {
+ if st.readOnly {
+ return ErrReadOnly
}
- return
+ return st.db.Update(func(tx *bolt.Tx) error {
+ for _, update := range updates {
+ if _, err := st.appendItem(tx, update); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
}
-func (s sqliteStore) GetSource(id int) (res sourceDb, err error) {
- err = s.db.SelectOne(&res, "select * from "+sourcesTn+" where Id = ?", id)
- return
+func (st Store) Append(update DataUpdateFull) (err error) {
+ return st.AppendMany([]DataUpdateFull{update})
}
-func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
- err = s.db.SelectOne(&res, "select * from "+dataUpdatesTn+" where Id = ?", id)
- return
-}
+//
+// Fetch data
+//
-func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) {
- var qres []interface{}
- qres, err = s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id)
- if err == nil {
- res = make([]clientDataDb, len(qres))
- for i := range qres {
- res[i] = *qres[i].(*clientDataDb)
- }
+// fetch key-value pairs from buckets
+
+func (st Store) getHub(tx *bolt.Tx, id int) string {
+ b := tx.Bucket([]byte(hubUuidsRevBn))
+ uuid := b.Get(itob(id))
+ if uuid == nil {
+ return ""
}
- return
+ return string(uuid)
}
-var (
- updateColumnSelect = `
- Id,
- SourceHubUuid,
- SourceHubDataUpdateId,
- StartTime,
- Duration,
- ClientCount,
- BytesReceived,
- BytesSent,
- Hostname,
- ContentId,
- Format,
- Quality
-`
-)
+func (st Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
+ b := tx.Bucket([]byte(sourcesRevBn))
-func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) {
- var clientsDb []clientDataDb
- clientsDb, err = s.GetClientsByUpdateId(dat.Id)
- if err != nil {
- s5l.Printf("store GetClients failed: %v", err)
+ jsonData := b.Get(itob(id))
+ if jsonData == nil {
+ err = ErrNotFound
return
}
- tagsDb, err := s.GetTagsByDataUpdateId(dat.Id)
- if err != nil {
- s5l.Printf("store GetClients failed: %v", err)
+ if err = json.Unmarshal(jsonData, &res); err != nil {
return
}
- res.CopyFromDataUpdateDb(dat.dataUpdateDb, s.hubId)
- res.Hostname = dat.Hostname
- res.StreamId.ContentId = dat.ContentId
- res.StreamId.Format = dat.Format
- res.StreamId.Quality = dat.Quality
- res.CopyFromClientDataDb(clientsDb)
- res.Tags = tagsDb
return
}
-func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) {
- res = make([]StatisticsData, len(dat))
- for i := range dat {
- t := *dat[i].(*dataUpdateQueryResult)
- res[i], _ = s.CreateStatisticsDataFrom(t)
- }
- return
-}
+func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
+ bc := tx.Bucket([]byte(clientDataBn))
+ bu := tx.Bucket([]byte(userAgentsRevBn))
-func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
- limit := 5000
- sourceSql, parameters := getFilteredDataUpdateSelect(&StatsFilter{afterUpdateId: &id, limit: &limit})
- sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql
- var updates []interface{}
- updates, err = s.db.Select(dataUpdateQueryResult{}, sql, parameters)
- s5tl.Printf("sql: %s", sql)
- if err == nil {
- res, _ = s.CreateStatisticsDatasFrom(updates)
+ jsonData := bc.Get(itob(id))
+ if jsonData == nil {
+ return
+ }
+ data := []clientDataDb{}
+ if err = json.Unmarshal(jsonData, &data); err != nil {
+ return
+ }
+ for _, c := range data {
+ cd := ClientData{Ip: c.Ip, BytesSent: c.BytesSent}
+ ua := bu.Get(itob(c.UserAgentId))
+ if ua != nil {
+ cd.UserAgent = string(ua)
+ }
+ res = append(res, cd)
}
return
}
-func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err error) {
- sourceSql, parameters := getFilteredDataUpdateSelect(filter)
- sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql
- s5tl.Printf("store: sql: %s", sql)
- var updates []interface{}
- updates, err = s.db.Select(dataUpdateQueryResult{}, sql, parameters)
- if err == nil {
- res, _ = s.CreateStatisticsDatasFrom(updates)
+// fetch all the key-value pairs and merge them into the multidimensional dataupdate
+
+func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
+ res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId)
+ var src sourceDb
+ if src, err = st.getSource(tx, du.SourceId); err != nil {
+ return
+ }
+ res.CopyFromSourceDb(src)
+ if res.Data.Clients, err = st.getClients(tx, duId); err != nil {
+ return
}
return
}
-type lastUpdateQueryResult struct {
- MaxDataUpdateId *int
-}
+// Public Fetch Interface
-func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) {
- result := lastUpdateQueryResult{}
- err = s.db.SelectOne(
- &result,
- "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
- uuid)
- if err == nil {
- updateId = result.MaxDataUpdateId
- } else {
- s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
+func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) {
+ res = []DataUpdateFull{}
+ if id < 0 { // TODO: interpret negative values as last x values
+ id = 0
+ }
+ if limit < 0 || limit > StoreGetUpdatesLimit {
+ s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit)
+ limit = StoreGetUpdatesLimit
}
+ err = st.db.View(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte(dataUpdatesBn)).Cursor()
+ k, v := c.Seek(itob(id))
+ if k == nil {
+ return nil
+ }
+ if btoi(k) == id {
+ k, v = c.Next()
+ }
+ for ; k != nil; k, v = c.Next() {
+ var d dataUpdateDb
+ if err := json.Unmarshal(v, &d); err != nil {
+ return err
+ }
+
+ duf, err := st.fetchItem(tx, btoi(k), d)
+ if err != nil {
+ return err
+ }
+ res = append(res, duf)
+ if len(res) >= limit {
+ return nil
+ }
+ }
+ return nil
+ })
return
}
-func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) {
- result := lastUpdateQueryResult{}
- err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn)
- if err == nil {
- updateId = result.MaxDataUpdateId
- } else {
- s5l.Printf("db: failed to find max DataUpdateId: %v", err)
- }
+func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) {
+ err = st.db.View(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(dataUpdatesBn))
+
+ jsonData := b.Get(itob(id))
+ if jsonData == nil {
+ return ErrNotFound
+ }
+
+ var d dataUpdateDb
+ if err := json.Unmarshal(jsonData, &d); err != nil {
+ return err
+ }
+
+ var err error
+ if res, err = st.fetchItem(tx, id, d); err != nil {
+ return err
+ }
+ return nil
+ })
return
}
-type statsResult struct {
- UpdateCount *int
- HubCount *int
- SourcesCount *int
- ClientCount *float32
- BytesSent *uint
- BytesReceived *uint
- StartTime *int64
- LastStartTime *int64
-}
+//
+// Auxilliary Data
+//
-type StatsResult struct {
- UpdateCount int
- HubCount int
- SourcesCount int
- ClientCount float32
- BytesSent uint
- BytesReceived uint
- StartTime time.Time
- LastStartTime time.Time
+func (st Store) GetStoreId() string {
+ return st.hubUuid
}
-func toApiStatsResult(value statsResult) (res StatsResult) {
- if value.UpdateCount != nil {
- res.UpdateCount = *value.UpdateCount
- }
- if value.HubCount != nil {
- res.HubCount = *value.HubCount
- }
- if value.SourcesCount != nil {
- res.SourcesCount = *value.SourcesCount
- }
- if value.ClientCount != nil {
- res.ClientCount = *value.ClientCount
- }
- if value.BytesSent != nil {
- res.BytesSent = *value.BytesSent
- }
- if value.BytesReceived != nil {
- res.BytesReceived = *value.BytesReceived
- }
- if value.StartTime != nil {
- res.StartTime = time.Unix(*value.StartTime, 0)
- }
- if value.LastStartTime != nil {
- res.LastStartTime = time.Unix(*value.LastStartTime, 0)
- }
- return res
+func (st Store) GetLastUpdateId() (updateId int, err error) {
+ err = st.db.View(func(tx *bolt.Tx) error {
+ updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence())
+ return nil
+ })
+ return
}
-var (
- statsGroupSelect = `
-SELECT
- count(*) as UpdateCount,
- SourceHubUuid as SourceHubUuid,
- count(distinct SourceId) as SourcesCount,
- avg(ClientCount) as ClientCount,
- sum(BytesSent) as BytesSent,
- sum(BytesReceived) as BytesReceived,
- min(StartTime) as StartTime,
- max(StartTime) as LastStartTime
-FROM
-`
- statsGroupClause = `
-GROUP BY
- SourceHubUuid
-`
- statsAggregateSelect = `
-SELECT
- sum(UpdateCount) as UpdateCount,
- count(distinct SourceHubUuid) as HubCount,
- sum(SourcesCount) as SourcesCount,
- sum(ClientCount) as ClientCount,
- sum(BytesSent) as BytesSent,
- sum(BytesReceived) as BytesReceived,
- min(StartTime) as StartTime,
- max(LastStartTime) as LastStartTime
-FROM
-`
-)
-
-func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (map[string]interface{}, error) {
- sourceSql, parameters := getFilteredDataUpdateSelect(filter)
- _ = sourceSql
- sql := fmt.Sprintf("%s (%s %s %s)", statsAggregateSelect, statsGroupSelect, sourceSql, statsGroupClause)
- s5tl.Printf("store: stats sql: %s", sql)
- res := statsResult{}
- err := s.db.SelectOne(&res, sql, parameters)
- if err == nil {
- return toApiStatsResult(res), nil
- }
- return StatsResult{}, err
+func (st Store) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
+ err = st.db.View(func(tx *bolt.Tx) error {
+ bUpdateId := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid))
+ if bUpdateId == nil {
+ return nil
+ }
+ updateId = btoi(bUpdateId)
+ return nil
+ })
+ return
}
-func (s sqliteStore) GetStoreId() (uuid string, err error) {
- uuid, err = s.db.SelectStr("select Value from HubInfo where Name = ?", "HubUuid")
+func (st Store) GetHubs() (res []string, err error) {
+ res = []string{st.hubUuid}
+ err = st.db.View(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte(hubUuidsRevBn)).Cursor()
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ res = append(res, string(v))
+ }
+ return nil
+ })
return
}
-func NewStore(mysql bool, path string) (sqliteStore, error) {
- db, hubid, err := initDb(mysql, path)
- if err != nil {
- return sqliteStore{}, err
- }
- return sqliteStore{db, hubid}, nil
+func (st Store) GetSource(id int) (res SourceId, err error) {
+ err = st.db.View(func(tx *bolt.Tx) error {
+ src, err := st.getSource(tx, id)
+ if err != nil {
+ return err
+ }
+ res.CopyFromSourceDb(src)
+ return nil
+ })
+ return
}
-func (s sqliteStore) Close() {
- s.db.Db.Close()
+func (st Store) GetSources() (res []SourceId, err error) {
+ res = []SourceId{}
+ err = st.db.View(func(tx *bolt.Tx) error {
+ c := tx.Bucket([]byte(sourcesRevBn)).Cursor()
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ var s sourceDb
+ if err := json.Unmarshal(v, &s); err != nil {
+ return err
+ }
+ var src SourceId
+ src.CopyFromSourceDb(s)
+ res = append(res, src)
+ }
+ return nil
+ })
+ return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 409bd08..5fcae5e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -1,179 +1,815 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
+ "fmt"
+ "io"
+ "os"
+ "os/user"
+ "reflect"
"testing"
"time"
+
+ "github.com/boltdb/bolt"
)
-func TestAppend(t *testing.T) {
- store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared")
- if err != nil {
- t.Errorf("Failed to initialize: %v", err)
- return
+var (
+ testBoltPath = "/run/s5hub_testing_db.bolt"
+ testBoltPath2 = "/run/s5hub_testing_db2.bolt"
+ testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt"
+ testBoltPathFinal = "/run/s5hub_testing_db_final.bolt"
+
+ streamIdData = StreamId{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"}
+ sourceData = SourceId{Hostname: "streamer", Tags: []string{"tag1", "master"}, StreamId: streamIdData}
+ updateData = DataUpdate{Data: SourceData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000}
+ clientsData = []ClientData{
+ ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
+ ClientData{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
+ ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
+ ClientData{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
+ ClientData{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}}
+)
+
+func generateTestData(n int) ([]DataUpdateFull, int) {
+ hostnames := []string{"streamer1", "streamer2"}
+ contents := []string{"av", "audio"}
+ formats := []string{"webm", "flash", "hls"}
+ qualities := []string{"high", "medium", "low"}
+ tags := []string{"2017", "elevate", "discourse"}
+ duration := int64(15000)
+
+ starttime := time.Now()
+ numSrcs := len(hostnames) * len(contents) * len(formats) * len(qualities)
+ if n < 0 {
+ n = numSrcs
}
- defer store.Close()
+ var data []DataUpdateFull
+ for i := 0; i < n; i += numSrcs {
+ for _, hostname := range hostnames {
+ for _, content := range contents {
+ for _, format := range formats {
+ for _, quality := range qualities {
+ d := DataUpdateFull{}
+ d.Version = ProtocolVersion
+
+ d.SourceId.Hostname = hostname
+ d.SourceId.StreamId.ContentId = content
+ d.SourceId.StreamId.Format = format
+ d.SourceId.StreamId.Quality = quality
+ d.SourceId.Tags = tags
- startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC)
- update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000}
- streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
- source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
- dat := StatisticsData{nil, nil, source, update}
+ d.DataUpdate.StartTime = starttime
+ d.DataUpdate.Duration = duration
+ d.DataUpdate.Data.ClientCount = uint(len(clientsData))
+ d.DataUpdate.Data.BytesSent = 6400 * uint(len(clientsData))
- err = store.Append(dat)
+ d.DataUpdate.Data.Clients = clientsData
+ data = append(data, d)
+ }
+ }
+ }
+ }
+ starttime = starttime.Add(time.Duration(duration) * time.Millisecond)
+ }
+ return data[:n], numSrcs
+}
+
+func TestMain(m *testing.M) {
+ u, err := user.Current()
if err != nil {
- t.Errorf("Failed to append: %v", err)
- return
+ os.Exit(-1)
+ }
+ testBoltPath = fmt.Sprintf("/run/user/%s/s5hub_testing_db.bolt", u.Uid)
+ testBoltPath2 = fmt.Sprintf("/run/user/%s/s5hub_testing_db2.bolt", u.Uid)
+ testBoltPathFwd = fmt.Sprintf("/run/user/%s/s5hub_testing_db_fwd.bolt", u.Uid)
+ testBoltPathFinal = fmt.Sprintf("/run/user/%s/s5hub_testing_db_final.bolt", u.Uid)
+ os.Exit(m.Run())
+}
+
+//
+// Testing
+//
+
+func TestOpen(t *testing.T) {
+ // non-existing directory
+ if _, err := NewStore("/nonexistend/db.bolt", false); err == nil {
+ t.Fatalf("opening store in nonexisting directory should throw an error")
+ }
+
+ // store path is a directory
+ os.Remove(testBoltPath)
+ if err := os.MkdirAll(testBoltPath, 0700); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store using a directory should throw an error")
}
- stats, err := store.GetStats(nil)
+ // exisiting but non-database file
+ os.Remove(testBoltPath)
+ if f, err := os.Create(testBoltPath); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ } else {
+ io.WriteString(f, "this is not a bolt db.")
+ f.Close()
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store using a invalid database should throw an error")
+ }
+
+ // bolt db without HubInfo Bucket
+ os.Remove(testBoltPath)
+ db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond})
if err != nil {
- t.Errorf("Failed to get stats: %v", err)
+ t.Fatalf("unexpected error: %v", err)
+ } else {
+ if err = db.Close(); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store without HubInfo Bucket should throw an error")
+ }
+
+ // bolt db no version key
+ if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil {
+ t.Fatalf("unexpected error: %v", err)
} else {
- clientCount := int(stats.ClientCount)
- updateCount := stats.UpdateCount
- if 3 != clientCount {
- t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount)
+ err = db.Update(func(tx *bolt.Tx) error {
+ if _, err := tx.CreateBucket([]byte(hubInfoBn)); err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
}
- if 1 != updateCount {
- t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount)
+ if err = db.Close(); err != nil {
+ t.Fatalf("unexpected error: %v", err)
}
}
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store without a database version should throw an error")
+ }
- queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC)
- filterStruct := StatsFilter{start: &queryStartTime}
- stats, err = store.GetStats(&filterStruct)
- if err != nil {
- t.Errorf("Failed to get stats: %v", err)
+ // bolt db wrong version
+ if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ } else {
+ err = db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(hubInfoBn))
+ if err := b.Put([]byte(storeVersionKey), itob(0)); err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err = db.Close(); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store with wrong database version should throw an error")
+ }
+
+ // bolt db no UUID key
+ if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil {
+ t.Fatalf("unexpected error: %v", err)
} else {
- updateCount := stats.UpdateCount
- if 0 != updateCount {
- t.Errorf("Failed to filter entries by start time, 0 != %v", updateCount)
+ err = db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(hubInfoBn))
+ if err := b.Put([]byte(storeVersionKey), itob(StoreVersion)); err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err = db.Close(); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store without a database UUID should throw an error")
+ }
+
+ // bolt db empty UUID
+ if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ } else {
+ err = db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(hubInfoBn))
+ if err := b.Put([]byte(hubUuidKey), []byte("")); err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err = db.Close(); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store with empty UUID should throw an error")
+ }
+
+ // bolt db with missing buckets
+ if db, err := bolt.Open(testBoltPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ } else {
+ err = db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte(hubInfoBn))
+ if err := b.Put([]byte(hubUuidKey), []byte("6d1a2192-7cb6-404d-80fb-add9b40a8f33")); err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err = db.Close(); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ }
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening store with (some) buckets missing should throw an error")
+ }
+
+ // create new bolt-db and reopen it
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("creating new store failed: %v", err)
+ }
+ createdUuid := store.hubUuid
+ store.Close()
+
+ store, err = NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("re-opening existing store failed: %v", err)
+ }
+ if createdUuid != store.hubUuid {
+ t.Fatalf("UUID of opened store differs from the one previously generated: '%s' != '%s'", createdUuid, store.hubUuid)
+ }
+
+ if _, err := NewStore(testBoltPath, false); err == nil {
+ t.Fatalf("opening already opened database should throw an error")
+ }
+ store.Close()
+}
+
+func TestAppendAndFetch(t *testing.T) {
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer store.Close()
+
+ if _, err := store.GetUpdate(17); err != ErrNotFound {
+ t.Fatalf("fetching not exisiting update should return not-found, error=%v", err)
+ }
+
+ upd := updateData
+ upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
+ upd.Data.Clients = clientsData
+ in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+
+ if err = store.Append(in); err != nil {
+ t.Fatalf("failed to append update: %v", err)
+ }
+
+ out, err := store.GetUpdate(1)
+ if err != nil {
+ t.Fatalf("failed to fetch update: %v", err)
+ }
+ out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder
+
+ expected := in
+ expected.SourceHubUuid = store.GetStoreId()
+ expected.SourceHubDataUpdateId = 1
+ expected.ForwardHubUuid = ""
+ expected.ForwardHubDataUpdateId = 0
+
+ if !reflect.DeepEqual(expected, out) {
+ t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected)
+ }
+
+ // append many
+ var ins []DataUpdateFull
+ upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
+ ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd})
+ upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
+ upd.Data.Clients = nil
+ ins = append(ins, DataUpdateFull{0, "", -1, "", -1, sourceData, upd})
+ if err = store.AppendMany(ins); err != nil {
+ t.Fatalf("failed to append update: %v", err)
+ }
+
+ for i := 0; i < 2; i = i + 1 {
+ out, err = store.GetUpdate(i + 2)
+ if err != nil {
+ t.Fatalf("failed to fetch update: %v", err)
+
+ }
+ out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder
+ expected = ins[i]
+ expected.SourceHubUuid = store.GetStoreId()
+ expected.SourceHubDataUpdateId = i + 2
+ expected.ForwardHubUuid = ""
+ expected.ForwardHubDataUpdateId = 0
+
+ if !reflect.DeepEqual(expected, out) {
+ t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected)
}
}
}
-func TestCount(t *testing.T) {
- store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared")
+func TestReadOnly(t *testing.T) {
+ // create read-only db from not-existing file must fail
+ os.Remove(testBoltPath)
+ if _, err := NewStore(testBoltPath, true); err == nil {
+ t.Fatalf("creating a read-only database should throw an error")
+ }
+
+ // prepare a store with one data-update
+ store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ upd := updateData
+ upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
+ upd.Data.Clients = clientsData
+ in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ if err = store.Append(in); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ store.Close()
+
+ // read-only db from existing file must succeed
+ store, err = NewStore(testBoltPath, true)
if err != nil {
- t.Errorf("Failed to initialize: %v", err)
+ t.Fatalf("opening existing store in read-only mode failed: %v", err)
}
defer store.Close()
- stats, err := store.GetStats(nil)
- clientCount := int(stats.ClientCount)
- if 0 != clientCount {
- t.Errorf("Failed to count correctly.")
+ // fetching the date-update from read-only store must succeed
+ if _, err := store.GetUpdate(1); err != nil {
+ t.Fatalf("failed to fetch update: %v", err)
+ }
+
+ // appending to read-only store must fail
+ if err = store.Append(in); err == nil {
+ t.Fatalf("appending to read-only store should throw an error")
}
}
func TestGetUpdatesAfter(t *testing.T) {
- store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared")
+ // prepare a store with 3 data-updates
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
if err != nil {
- t.Errorf("Failed to initialize: %v", err)
- return
+ t.Fatalf("unexpected error: %v", err)
}
defer store.Close()
- startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC)
- update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000}
- streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
- source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
- dat := StatisticsData{nil, nil, source, update}
+ upd := updateData
+ upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
+ upd.Data.Clients = clientsData
- err = store.Append(dat)
+ expected := []DataUpdateFull{}
+ for i := 0; i < 3; i = i + 1 {
+ in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ if err = store.Append(in); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ e := in
+ e.SourceHubUuid = store.hubUuid
+ e.SourceHubDataUpdateId = i + 1
+ e.ForwardHubUuid = ""
+ e.ForwardHubDataUpdateId = 0
+ expected = append(expected, e)
+ upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
+ }
+
+ // check if there are 3 updates in store
+ lastId, err := store.GetLastUpdateId()
if err != nil {
- t.Errorf("Failed to retrieve: %v", err)
- return
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != 3 {
+ t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId)
}
- res, err := store.GetUpdatesAfter(2)
- t.Logf("got updates (err %v):\n%#v", err, res)
-}
+ // all the updates
+ updList, err := store.GetUpdatesAfter(-1, -1)
+ if err != nil {
+ t.Fatalf("failed to fetch updates: %v", err)
+ }
+ for i, _ := range updList {
+ updList[i].StartTime = updList[i].StartTime.UTC() // this would normally be handled by the protocol encoder
+ }
-func generateStatisticsData(n int) (data []StatisticsData) {
- hostnames := []string{"streamer1", "streamer2"}
- contents := []string{"av", "audio"}
- formats := []string{"webm", "flash", "hls"}
- qualities := []string{"high", "medium", "low"}
+ if !reflect.DeepEqual(expected, updList) {
+ t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected)
+ }
- numcombis := len(hostnames) * len(contents) * len(formats) * len(qualities)
+ // first 2
+ updList, err = store.GetUpdatesAfter(0, 2)
+ if err != nil {
+ t.Fatalf("failed to fetch updates: %v", err)
+ }
+ for i, _ := range updList {
+ updList[i].StartTime = updList[i].StartTime.UTC() // this would normally be handled by the protocol encoder
+ }
+ if len(updList) != 2 {
+ t.Fatalf("failed to fetch updates: got %d updates, expected 1", len(updList))
+ }
+ if !reflect.DeepEqual(expected[:2], updList) {
+ t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected[:2])
+ }
- clients := []ClientData{
- ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
- ClientData{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
- ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
- ClientData{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
- ClientData{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}}
- starttime := time.Now()
- duration := int64(15000)
- tags := []string{"2017", "elevate", "discourse"}
+ // all after id 2
+ updList, err = store.GetUpdatesAfter(2, -1)
+ if err != nil {
+ t.Fatalf("failed to fetch updates: %v", err)
+ }
+ for i, _ := range updList {
+ updList[i].StartTime = updList[i].StartTime.UTC() // this would normally be handled by the protocol encoder
+ }
- for i := 0; i < n; i += numcombis {
- for _, hostname := range hostnames {
- for _, content := range contents {
- for _, format := range formats {
- for _, quality := range qualities {
- d := StatisticsData{}
- d.SourceId.Version = 1
- d.SourceId.Hostname = hostname
- d.SourceId.Tags = tags
+ if !reflect.DeepEqual(expected[2:], updList) {
+ t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected[2:])
+ }
- d.SourceId.StreamId.ContentId = content
- d.SourceId.StreamId.Format = format
- d.SourceId.StreamId.Quality = quality
+ // all after id 3 -> should return nothing
+ updList, err = store.GetUpdatesAfter(3, -1)
+ if err != nil {
+ t.Fatalf("failed to fetch updates: %v", err)
+ }
+ if len(updList) != 0 {
+ t.Fatalf("failed to fetch updates: got %d updates, expected 0", len(updList))
+ }
+}
- d.DataUpdate.StartTime = starttime
- d.DataUpdate.Duration = duration
- d.DataUpdate.Data.ClientCount = uint(len(clients))
- d.DataUpdate.Data.BytesSent = 6400 * uint(len(clients))
+func TestForwardedDataUpdates(t *testing.T) {
+ // prepare a new store
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer store.Close()
- for _, client := range clients {
- c := ClientData{client.Ip, client.UserAgent, client.BytesSent}
- d.DataUpdate.Data.Clients = append(d.DataUpdate.Data.Clients, c)
- }
- data = append(data, d)
- }
- }
- }
+ // generate/append some local updates
+ data, _ := generateTestData(10)
+ if err := store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ myLastId := 10
+ forwardedHub := "05defdfa-e7d1-4ca8-8b5c-02abb0088d29"
+
+ // check if there are no updates for this hub in store
+ lastId, err := store.GetLastUpdateForUuid(forwardedHub)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != 0 {
+ t.Fatalf("failed to get last update ID: %d, expected 0", lastId)
+ }
+
+ // get list of all hubs
+ hubs, err := store.GetHubs()
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if len(hubs) != 1 {
+ t.Fatalf("failed to get hub UUIDs: got %d hubs, expected 1", len(hubs))
+ }
+ if hubs[0] != store.hubUuid {
+ t.Fatalf("fist hub should be the own stores UUID but is: %s", hubs[0])
+ }
+
+ // append 3 forwarded data-updates
+ upd := updateData
+ upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
+ upd.Data.Clients = clientsData
+
+ expected := []DataUpdateFull{}
+ for i := 0; i < 3; i = i + 1 {
+ in := DataUpdateFull{0, "", -1, "", -1, sourceData, upd}
+ in.SourceHubUuid = forwardedHub
+ in.SourceHubDataUpdateId = 3 - i // out of order
+ if err = store.Append(in); err != nil {
+ t.Fatalf("unexpected error: %v", err)
}
- starttime = starttime.Add(time.Duration(duration) * time.Millisecond)
+ myLastId = myLastId + 1
+ in.ForwardHubUuid = store.GetStoreId()
+ in.ForwardHubDataUpdateId = myLastId
+ expected = append(expected, in)
+ upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
+ }
+
+ out, err := store.GetUpdatesAfter(10, 3)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ for i, _ := range out {
+ out[i].StartTime = out[i].StartTime.UTC() // this would normally be handled by the protocol encoder
+ }
+ if !reflect.DeepEqual(out, expected) {
+ t.Fatalf("failed to fetch source\nactual: %v\nexpected: %v\n", out, expected)
+ }
+
+ // check if the last update for this hub is 3
+ lastId, err = store.GetLastUpdateForUuid(forwardedHub)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != 3 {
+ t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId)
+ }
+
+ // get list of all hubs
+ hubs, err = store.GetHubs()
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if len(hubs) != 2 {
+ t.Fatalf("failed to get hub UUIDs: got %d hubs, expected 2", len(hubs))
+ }
+ if hubs[0] != store.hubUuid {
+ t.Fatalf("fist hub should be the own stores UUID but is: %s", hubs[0])
+ }
+ if hubs[1] != forwardedHub {
+ t.Fatalf("second hub UUID is wrong: %s, expected: %s", hubs[1], forwardedHub)
+ }
+
+ // check if the last update is now 13
+ lastId, _ = store.GetLastUpdateId()
+ if lastId != 13 {
+ t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId)
}
- return
}
+func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) {
+ lastId, err := fwdStore.GetLastUpdateForUuid(src1Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != fwdSrc1Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id)
+ }
+ lastId, err = fwdStore.GetLastUpdateForUuid(src2Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != fwdSrc2Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id)
+ }
+ lastId, err = finalStore.GetLastUpdateForUuid(src1Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != finalSrc1Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id)
+ }
+ lastId, err = finalStore.GetLastUpdateForUuid(src2Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != finalSrc2Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id)
+ }
+ lastId, err = finalStore.GetLastUpdateForUuid(fwdStore.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != finalFwdId {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalFwdId)
+ }
+}
+
+func TestForwardedDataUpdates2(t *testing.T) {
+ // prepare 4 new stores
+ os.Remove(testBoltPath)
+ src1Store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer src1Store.Close()
+
+ os.Remove(testBoltPath2)
+ src2Store, err := NewStore(testBoltPath2, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer src2Store.Close()
+
+ os.Remove(testBoltPathFwd)
+ fwdStore, err := NewStore(testBoltPathFwd, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer fwdStore.Close()
+
+ os.Remove(testBoltPathFinal)
+ finalStore, err := NewStore(testBoltPathFinal, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer finalStore.Close()
+
+ // generate/append some updates to src
+ data, _ := generateTestData(10)
+ if err := src1Store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ data, _ = generateTestData(7)
+ if err := src2Store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0)
+
+ // forward 5 updates from src1 to fwd
+ if data, err = src1Store.GetUpdatesAfter(0, 5); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0)
+
+ // forward 3 updates from src2 to fwd
+ if data, err = src2Store.GetUpdatesAfter(0, 3); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0)
+
+ // forward 7 updates from fwd to final
+ if data, err = fwdStore.GetUpdatesAfter(0, 7); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := finalStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7)
+
+ // forward remaining updates from src1 and src2 to fwd
+ if data, err = src1Store.GetUpdatesAfter(5, -1); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if data, err = src2Store.GetUpdatesAfter(3, -1); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7)
+
+ // forward remainging from fwd to final
+ if data, err = fwdStore.GetUpdatesAfter(7, -1); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := finalStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17)
+}
+
+func TestGetSources(t *testing.T) {
+ // prepare a new store
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer store.Close()
+
+ if _, err := store.GetSource(17); err != ErrNotFound {
+ t.Fatalf("fetching not exisiting source should return not-found, error=%v", err)
+ }
+
+ // generate/append some data
+ data, numSrcs := generateTestData(-1)
+ if err := store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // fetch first source
+ src, err := store.GetSource(1)
+ if err != nil {
+ t.Fatalf("fetching source failed: %v", err)
+ }
+ if !reflect.DeepEqual(data[0].SourceId, src) {
+ t.Fatalf("failed to fetch source\nactual: %v\nexpected: %v\n", src, data[0].SourceId)
+ }
+
+ // fetch all the sources
+ srcList, err := store.GetSources()
+ if err != nil {
+ t.Fatalf("fetching sources failed: %v", err)
+ }
+ if len(srcList) != numSrcs {
+ t.Fatalf("wrong number of sources: %d, expected %d", len(srcList), numSrcs)
+ }
+ // the result will be orderd using the slug so doing a DeepEqual doesn't work here
+}
+
+//
+// Benchmarking
+//
+
func BenchmarkAppendMany(b *testing.B) {
- store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared")
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
if err != nil {
- b.Errorf("Failed to initialize: %v", err)
+ b.Fatalf("unexpected error: %v", err)
}
defer store.Close()
- data := generateStatisticsData(b.N)
+ data, _ := generateTestData(b.N)
b.ResetTimer()
if err := store.AppendMany(data); err != nil {
- b.Errorf("Failed to append: %v", err)
+ b.Fatalf("unexpected error: %v", err)
}
}
func BenchmarkGetUpdatesAfter(b *testing.B) {
- store, err := NewStore(false, "file:memdb1?mode=memory&cache=shared")
+ os.Remove(testBoltPath)
+ store, err := NewStore(testBoltPath, false)
if err != nil {
- b.Errorf("Failed to initialize: %v", err)
+ b.Fatalf("unexpected error: %v", err)
}
defer store.Close()
- data := generateStatisticsData(b.N)
+ data, _ := generateTestData(b.N)
if err := store.AppendMany(data); err != nil {
- b.Errorf("Failed to append: %v", err)
+ b.Fatalf("unexpected error: %v", err)
}
b.ResetTimer()
latestId := -1
for {
- updates, err := store.GetUpdatesAfter(latestId)
+ updates, err := store.GetUpdatesAfter(latestId, -1)
if err != nil {
- b.Errorf("Failed to retrieve: %v", err)
+ b.Fatalf("failed to retrieve: %v", err)
}
if len(updates) == 0 {
break
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 5b2b29f..12e92c8 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -1,13 +1,39 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import "time"
-const (
- QualityLow = "low"
- QualityMedium = "medium"
- QualityHigh = "high"
-)
-
type StreamId struct {
ContentId string `json:"content-id"`
Format string `json:"format"`
@@ -15,23 +41,22 @@ type StreamId struct {
}
type SourceId struct {
- Version uint `json:"version" db:"-"`
Hostname string `json:"hostname"`
- StreamId StreamId `json:"streamer-id" db:"-"`
- Tags []string `json:"tags" db:"-"`
+ StreamId StreamId `json:"streamer-id"`
+ Tags []string `json:"tags,omitempty"`
}
type ClientData struct {
Ip string `json:"ip"`
- UserAgent string `json:"user-agent"`
+ UserAgent string `json:"user-agent,omitempty"`
BytesSent uint `json:"bytes-sent"`
}
type SourceData struct {
ClientCount uint `json:"client-count"`
- BytesReceived uint `json:"bytes-received"`
+ BytesReceived uint `json:"bytes-received,omitempty"`
BytesSent uint `json:"bytes-sent"`
- Clients []ClientData `json:"clients"`
+ Clients []ClientData `json:"clients,omitempty"`
}
type DataUpdate struct {
@@ -40,43 +65,32 @@ type DataUpdate struct {
Data SourceData `json:"data"`
}
-type StatisticsData struct {
- SourceHubUuid *string
- SourceHubDataUpdateId *int
+type DataUpdateFull struct {
+ Version uint `json:"version"`
+ SourceHubUuid string `json:"SourceHubUuid,omitempty"`
+ SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
+ ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`
+ ForwardHubDataUpdateId int `json:"ForwardHubDataUpdateId,omitempty"`
SourceId
DataUpdate
}
-type DataContainer struct {
- Data interface{} `json:"data"`
-}
-
-type StatisticsDataContainer struct {
- Data []StatisticsData `json:"data"`
+type DataUpdateFullContainer struct {
+ Data []DataUpdateFull `json:"data"`
}
-type StatsFilter struct {
- start *time.Time
- end *time.Time
- hostname *string
- contentId *string
- format *string
- quality *string
- tagsAny []string
- afterUpdateId *int
- limit *int
- sortOrder *string
+func (duf *DataUpdateFull) CopyFromSourceId(src *SourceId) {
+ duf.Hostname = src.Hostname
+ duf.StreamId = src.StreamId
+ duf.Tags = src.Tags
}
-func (self *StatisticsData) CopyFromSourceId(id *SourceId) {
- self.Hostname = id.Hostname
- self.StreamId = id.StreamId
- self.Tags = id.Tags
- self.Version = id.Version
+func (duf *DataUpdateFull) CopyFromUpdate(du *DataUpdate) {
+ duf.StartTime = du.StartTime
+ duf.Duration = du.Duration
+ duf.Data = du.Data
}
-func (self *StatisticsData) CopyFromUpdate(id *DataUpdate) {
- self.StartTime = id.StartTime
- self.Duration = id.Duration
- self.Data = id.Data
+type GenericDataContainer struct {
+ Data interface{} `json:"data"`
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index f06e953..bba4433 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -1,120 +1,161 @@
+//
+// sfive
+//
+// sfive - spreadspace streaming statistics suite is a generic
+// statistic collection tool for streaming server infrastuctures.
+// The system collects and stores meta data like number of views
+// and throughput from a number of streaming servers and stores
+// it in a global data store.
+// The data acquisition is designed to be generic and extensible in
+// order to support different streaming software.
+// sfive also contains tools and applications to filter and visualize
+// live and recorded data.
+//
+//
+// Copyright (C) 2014-2017 Christian Pointner <equinox@spreadspace.org>
+// Markus Grüneis <gimpf@gimpf.org>
+//
+// This file is part of sfive.
+//
+// sfive is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3
+// as published by the Free Software Foundation.
+//
+// sfive is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with sfive. If not, see <http://www.gnu.org/licenses/>.
+//
+
package sfive
import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "strings"
"time"
)
-// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections
-// this is very much not normalized at all, because I'm too lazy to type
+var (
+ ErrNotFound = errors.New("not found")
+ ErrReadOnly = errors.New("store is in read-only mode")
+)
-// table names
const (
- tagsTn = "Tags"
- sourceTagsTn = "StreamToTagMap"
- sourcesTn = "Sources"
- clientdataUpdatesTn = "ClientDataUpdates"
- dataUpdatesTn = "DataUpdates"
- hubInfoTn = "HubInfo"
+ // bucket names
+ hubInfoBn = "HubInfo"
+ latestUpdatesBn = "LatestUpdates"
+ hubUuidsFwdBn = "HubUUIDsFwd"
+ hubUuidsRevBn = "HubUUIDsRev"
+ dataUpdatesBn = "DataUpdates"
+ sourcesFwdBn = "SourcesFwd"
+ sourcesRevBn = "SourcesRev"
+ clientDataBn = "ClientData"
+ userAgentsFwdBn = "UserAgentsFwd"
+ userAgentsRevBn = "UserAgentsRev"
+
+ // well-known keys
+ hubUuidKey = "HubUUID"
+ storeVersionKey = "Version"
)
-type hubInfoDb struct {
- Name string
- Value string
+// stored in sourcesRevBn
+type streamIdDb struct {
+ ContentId string `json:"c"`
+ Format string `json:"f"`
+ Quality string `json:"q"`
}
-// stored in tagsTn
-type tagDb struct {
- Id int
- Name string
+type sourceDb struct {
+ Hostname string `json:"h"`
+ StreamId streamIdDb `json:"s"`
+ Tags []string `json:"t"`
}
-// stored in sourceTagsTn
-// Stream m:n Tag
-type sourceTagsDb struct {
- TagId int // foreign key to tagsTn
- SourceId int // foreign key to sourcesTn
+func NewSourceDb(value DataUpdateFull) sourceDb {
+ return sourceDb{
+ Hostname: value.SourceId.Hostname,
+ StreamId: streamIdDb{
+ ContentId: value.SourceId.StreamId.ContentId,
+ Format: value.SourceId.StreamId.Format,
+ Quality: value.SourceId.StreamId.Quality,
+ },
+ Tags: value.SourceId.Tags,
+ }
}
-// stored in sourcesTn
-type sourceDb struct {
- Id int
- StreamId
- SourceId
+func (s sourceDb) Slug() string {
+ return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ","))
}
-// stored in clientdataUpdatesTn
-// ClientData n:1 DataUpdate
-type clientDataDb struct {
- Id int
- DataUpdatesId int // foreign key to dataUpdatesTn
- ClientData
+func (s *SourceId) CopyFromSourceDb(v sourceDb) {
+ s.Hostname = v.Hostname
+ s.StreamId.ContentId = v.StreamId.ContentId
+ s.StreamId.Format = v.StreamId.Format
+ s.StreamId.Quality = v.StreamId.Quality
+ s.Tags = v.Tags
}
-// stored in dataUpdatesTn
-// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
-type dataUpdateDb struct {
- Id int
- SourceId int // foreign key to sourcesTn
- SourceHubUuid *string
- SourceHubDataUpdateId *int
- StartTime int64 // time.Time
- Duration int64 // duration in milliseconds
- ClientCount uint
- BytesReceived uint
- BytesSent uint
+// stored in clientDataBn
+type clientDataDb struct {
+ Ip string `json:"ip"`
+ UserAgentId int `json:"ua"`
+ BytesSent uint `json:"bs"`
}
-func (self *SourceId) CopyFromSourceDb(value sourceDb) {
- self.Version = value.Version
- self.Hostname = value.Hostname
- self.StreamId.ContentId = value.ContentId
- self.StreamId.Format = value.Format
- self.StreamId.Quality = value.Quality
+// stored in dataUpdatesBn
+type dataUpdateDb struct {
+ SourceHubId int `json:"h,omitempty"`
+ SourceHubDataUpdateId int `json:"hi,omitempty"`
+ SourceId int `json:"si"`
+ StartTime int64 `json:"st"` // unix timestamp in milliseconds
+ Duration int64 `json:"du"` // duration in milliseconds
+ ClientCount uint `json:"cc,omitempty"`
+ BytesReceived uint `json:"br,omitempty"`
+ BytesSent uint `json:"bs,omitempty"`
}
-func (self *SourceId) CopyFromTagsDb(values []tagDb) {
- tags := make([]string, len(values))
- for i := range values {
- tags[i] = values[i].Name
+func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb {
+ return dataUpdateDb{
+ -1,
+ v.SourceHubDataUpdateId,
+ -1,
+ int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000),
+ v.Duration,
+ v.Data.ClientCount,
+ v.Data.BytesReceived,
+ v.Data.BytesSent,
}
- self.Tags = tags
}
-func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId string) {
- if value.SourceHubUuid == nil {
- self.SourceHubUuid = &hubId
+func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, srcHubUuid, hubUuid string, id int) {
+ if srcHubUuid == "" {
+ s.SourceHubUuid = hubUuid
+ s.SourceHubDataUpdateId = id
} else {
- self.SourceHubUuid = value.SourceHubUuid
- }
- if value.SourceHubDataUpdateId == nil {
- self.SourceHubDataUpdateId = &value.Id
- } else {
- self.SourceHubDataUpdateId = value.SourceHubDataUpdateId
+ s.SourceHubUuid = srcHubUuid
+ s.SourceHubDataUpdateId = v.SourceHubDataUpdateId
+ s.ForwardHubUuid = hubUuid
+ s.ForwardHubDataUpdateId = id
}
- self.StartTime = time.Unix(value.StartTime, 0)
- self.Duration = value.Duration
- self.Data.ClientCount = value.ClientCount
- self.Data.BytesReceived = value.BytesReceived
- self.Data.BytesSent = value.BytesSent
+ s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000)
+ s.Duration = v.Duration
+ s.Data.ClientCount = v.ClientCount
+ s.Data.BytesReceived = v.BytesReceived
+ s.Data.BytesSent = v.BytesSent
}
-func (self *StatisticsData) CopyFromClientDataDb(values []clientDataDb) {
- clients := make([]ClientData, len(values))
- for i := range values {
- clients[i].Ip = values[i].Ip
- clients[i].UserAgent = values[i].UserAgent
- clients[i].BytesSent = values[i].BytesSent
- }
- self.Data.Clients = clients
+func itob(v int) []byte {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(v))
+ return b
}
-func cvtToApiStatisticsData(
- hubId string, source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData {
- res := StatisticsData{}
- res.CopyFromSourceDb(source)
- res.CopyFromDataUpdateDb(update, hubId)
- res.CopyFromClientDataDb(clients)
- res.CopyFromTagsDb(tags)
- return res
+func btoi(b []byte) int {
+ return int(binary.BigEndian.Uint64(b))
}
diff --git a/src/hub/test-bolt b/src/hub/test-bolt
new file mode 100755
index 0000000..1787181
--- /dev/null
+++ b/src/hub/test-bolt
@@ -0,0 +1,21 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name> [ ... args-to-bolt ... ]"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+shift
+
+BIN="$(go env GOPATH)/bin/bolt"
+if [ ! -x "$BIN" ]; then
+ echo "bolt not found. Please run:"
+ echo ""
+ echo " go get -u github.com/boltdb/bolt/..."
+ echo ""
+ exit 1
+fi
+
+exec "$BIN" $@ "$TEST_DB"
diff --git a/src/hub/test-bolter b/src/hub/test-bolter
new file mode 100755
index 0000000..24c8901
--- /dev/null
+++ b/src/hub/test-bolter
@@ -0,0 +1,21 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name> [ ... args-to-bolter ... ]"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+shift
+
+BIN="$(go env GOPATH)/bin/bolter"
+if [ ! -x "$BIN" ]; then
+ echo "bolter not found. Please run:"
+ echo ""
+ echo " go get -u github.com/hasit/bolter"
+ echo ""
+ exit 1
+fi
+
+exec "$BIN" --file "$TEST_DB" $@
diff --git a/src/hub/test-client b/src/hub/test-client
index 8a24c7d..fcb9a98 100755
--- a/src/hub/test-client
+++ b/src/hub/test-client
@@ -1,23 +1,22 @@
#!/bin/sh
+
+TEST_D="./test"
+
echo pipe: import sample.json
echo ------------------------
-socat file:../../dat/sample.json,rdonly unix-client:/run/sfive/pipe
+socat file:../../dat/sample.json,rdonly "unix-client:$TEST_D/pipe"
echo pipe-gram: import sample-gram.json
echo ----------------------------------
-while read x; do echo "$x" | socat stdio unix-sendto:/run/sfive/pipegram; done < ../../dat/sample-gram.json
+while read x; do echo "$x" | socat stdio "unix-sendto:$TEST_D/pipegram"; done < ../../dat/sample-gram.json
+
+echo post update
+echo -----------
+curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates'
echo show query result
echo -----------------
-curl -i 'http://localhost:8000/updates?from=2013-10-21T00:00:00Z&to=2013-10-21T12:31:00Z'
+curl -i 'http://localhost:8000/updates'
-echo '\npost update'
-echo ------------
-curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates'
-
-echo show stats
-echo ----------
-curl -i 'http://localhost:8000/stats'
echo '\n\ndone'
-
diff --git a/src/hub/test-collector b/src/hub/test-collector
deleted file mode 100755
index e6ca367..0000000
--- a/src/hub/test-collector
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/sh
-
-./bin/sfive-hub -db "file:memdb1?mode=memory&cache=shared" -start-pipe-server=false -start-pipegram-server=false -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8000"
-
diff --git a/src/hub/test-fwd b/src/hub/test-fwd
new file mode 100755
index 0000000..371aa55
--- /dev/null
+++ b/src/hub/test-fwd
@@ -0,0 +1,11 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-url="http://localhost:8000"
diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es
index 3f3eb12..61bbcf7 100755
--- a/src/hub/test-fwd-es
+++ b/src/hub/test-fwd-es
@@ -1,5 +1,11 @@
#!/bin/sh
-rm -f /run/sfive/pipe /run/sfive/pipegram
-./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-es-url="http://stream.elevate.at:9200/e14"
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-es-url="http://stream.elevate.at:9200/testing"
diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik
index 1d45219..8a2ae33 100755
--- a/src/hub/test-fwd-piwik
+++ b/src/hub/test-fwd-piwik
@@ -1,4 +1,11 @@
#!/bin/sh
-rm -f /run/sfive/pipe /run/sfive/pipegram
-./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"
diff --git a/src/hub/test-import b/src/hub/test-import
index d1a5044..a6c2942 100755
--- a/src/hub/test-import
+++ b/src/hub/test-import
@@ -1,10 +1,12 @@
#!/bin/sh
+
+TEST_D="./test"
+
echo pipe: import sample.json
echo ------------------------
[ -f ../../dat/sample-access.json ] || zcat ../../dat/sample-accesslog.json.gz > ../../dat/sample-accesslog.json
-socat file:../../dat/sample-accesslog.json,rdonly unix-client:/run/sfive/pipe
+socat file:../../dat/sample-accesslog.json,rdonly "unix-client:$TEST_D/pipe"
echo '\n\ndone'
-
diff --git a/src/hub/test-srv b/src/hub/test-srv
index 254549a..064fa3a 100755
--- a/src/hub/test-srv
+++ b/src/hub/test-srv
@@ -1,4 +1,13 @@
#!/bin/sh
-rm -f /run/sfive/pipe /run/sfive/pipegram
-./bin/sfive-hub -db /var/lib/sfive/db.sqlite -start-pipe-server -pipe /var/run/sfive/pipe -start-pipegram-server -pipegram /var/run/sfive/pipegram -start-web-server -viz-dir "$(pwd)/../viz" -bind=":8001"
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+mkdir -p "$TEST_D"
+rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
+exec ./bin/sfive-hub -db "$TEST_DB" -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000"
diff --git a/src/hub/test-srv-ro b/src/hub/test-srv-ro
new file mode 100755
index 0000000..56fe440
--- /dev/null
+++ b/src/hub/test-srv-ro
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+if [ -z "$1" ]; then
+ echo "Usage: $0 <db-name>"
+ exit 1
+fi
+
+TEST_D="./test"
+TEST_DB="$TEST_D/$1.bolt"
+
+mkdir -p "$TEST_D"
+rm -f "$TEST_D/pipe" "$TEST_D/pipegram"
+exec ./bin/sfive-hub -db "$TEST_DB" -read-only -start-pipe-server -pipe "$TEST_D/pipe" -start-pipegram-server -pipegram "$TEST_D/pipegram" -start-web-server -bind=":8000"