summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-10 23:13:11 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-10 23:15:47 +0200
commitb53f49f8cbd3b7666c9267f0e2d88fce28ae6c1a (patch)
tree5da51d292af7c5ad16a7198552f785f8cdf872f6
parentand one more variable refactoring (diff)
new protocol version with streamlined names for variables
-rw-r--r--dat/sample-accesslog.json.gzbin224140 -> 0 bytes
-rw-r--r--dat/sample-importer.json.gzbin20176 -> 0 bytes
-rw-r--r--dat/sample-pipe.json2
-rw-r--r--dat/sample-pipegram.json6
-rw-r--r--dat/sample-web-bulk.json4
-rw-r--r--dat/sample-web.json4
-rw-r--r--doc/protocol.md18
-rwxr-xr-xsrc/daq/accesslog/s5-accesslog44
-rw-r--r--src/daq/flumotion-plug/s5.py12
-rwxr-xr-xsrc/daq/flumotion-rrd/s5-flumotion-rrd16
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch44
-rw-r--r--src/daq/s5proxy/sample.json4
-rw-r--r--src/daq/s5proxy/src/s5proxy/config.go2
-rw-r--r--src/daq/s5proxy/src/s5proxy/stats.go52
-rw-r--r--src/es5/es-query.json2
-rw-r--r--src/es5/sfive-init.json4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go8
-rwxr-xr-xsrc/hub/test-import12
21 files changed, 116 insertions, 124 deletions
diff --git a/dat/sample-accesslog.json.gz b/dat/sample-accesslog.json.gz
deleted file mode 100644
index a68d35d..0000000
--- a/dat/sample-accesslog.json.gz
+++ /dev/null
Binary files differ
diff --git a/dat/sample-importer.json.gz b/dat/sample-importer.json.gz
deleted file mode 100644
index 7baf9a4..0000000
--- a/dat/sample-importer.json.gz
+++ /dev/null
Binary files differ
diff --git a/dat/sample-pipe.json b/dat/sample-pipe.json
index ed327fc..f099ba1 100644
--- a/dat/sample-pipe.json
+++ b/dat/sample-pipe.json
@@ -1,4 +1,4 @@
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1}
+{"version": 2, "stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"]}
{"data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000}
{"data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000}
{"data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000}
diff --git a/dat/sample-pipegram.json b/dat/sample-pipegram.json
index a9c919a..7dc6b6a 100644
--- a/dat/sample-pipegram.json
+++ b/dat/sample-pipegram.json
@@ -1,3 +1,3 @@
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000}
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000}
-{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000}
+{"stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 2, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000}
+{"stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 2, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000}
+{"stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 2, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000}
diff --git a/dat/sample-web-bulk.json b/dat/sample-web-bulk.json
index 31f1f2b..3809500 100644
--- a/dat/sample-web-bulk.json
+++ b/dat/sample-web-bulk.json
@@ -1,6 +1,6 @@
{
- "version": 1,
- "streamer-id": {"quality": "high", "content-id": "av", "format": "webm"},
+ "version": 2,
+ "stream": {"quality": "high", "content": "av", "format": "webm"},
"hostname": "localhost",
"tags": ["suppa", "toll"]
}
diff --git a/dat/sample-web.json b/dat/sample-web.json
index 35c76e0..c6cc573 100644
--- a/dat/sample-web.json
+++ b/dat/sample-web.json
@@ -1,8 +1,8 @@
{
- "streamer-id": {"quality": "high", "content-id": "av", "format": "webm"},
+ "stream": {"quality": "high", "content": "av", "format": "webm"},
"hostname": "localhost",
"tags": ["suppa", "toll"],
- "version": 1,
+ "version": 2,
"data": {"bytes-sent": 1519, "client-count": 0},
"start-time": "2013-10-21T12:30:00Z",
"duration-ms": 300000
diff --git a/doc/protocol.md b/doc/protocol.md
index 29391c6..e960506 100644
--- a/doc/protocol.md
+++ b/doc/protocol.md
@@ -9,13 +9,13 @@ defaults which will be used if the corresponding value is missing in subsequent
update messages.
{
- "version": 1,
+ "version": 2,
"SourceHubUuid": "f7df89b4-171e-4b2f-a8a4-e58ac99e5dc5",
- "SourceHubDataUpdateId": 23,
+ "SourceHubUpdateId": 23,
"ForwardHubUuid": "b041315e-5039-4c75-81e8-9fd42250b011",
- "ForwardHubDataUpdateId": 42,
+ "ForwardHubUpdateId": 42,
"hostname": "myhostname",
- "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" },
+ "stream": { "content": "av-orig", "format": "flash", "quality": "medium" },
"tags": [ "elevate", "2014", "discourse" ]
}
@@ -26,19 +26,19 @@ data-update
All values which have been defined by the init message are optional. In any case the
values from data updates override values from init. Stateless interfaces will not use
init messages and therefore all values must be defined here.
-"SourceHubUuid", "SourceHubDataUpdateId", "ForwardHubUuid", "ForwardHubDataUpdateId",
+"SourceHubUuid", "SourceHubUpdateId", "ForwardHubUuid", "ForwardHubUpdateId",
"user-agent", "bytes-received", "tags" and "clients" might be omitted and are treated
as an empty string, 0 or empty array respectively.
The start-time will be processesd and stored with millisecond precision.
{
- "version": 1,
+ "version": 2,
"SourceHubUuid": "f7df89b4-171e-4b2f-a8a4-e58ac99e5dc5",
- "SourceHubDataUpdateId": 23,
+ "SourceHubUpdateId": 23,
"ForwardHubUuid": "b041315e-5039-4c75-81e8-9fd42250b011",
- "ForwardHubDataUpdateId": 42,
+ "ForwardHubUpdateId": 42,
"hostname": "myhostname",
- "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" },
+ "stream": { "content": "av-orig", "format": "flash", "quality": "medium" },
"tags": [ "elevate", "2014", "discourse" ]
"start-time": "2014-08-03T12:34:56.123Z",
"duration-ms": 5000,
diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog
index e91c523..29ec9f3 100755
--- a/src/daq/accesslog/s5-accesslog
+++ b/src/daq/accesslog/s5-accesslog
@@ -138,7 +138,7 @@ class AccessLog():
self._duration = properties['duration']
self._tags = properties['tags']
self._logfile = properties['logfile']
- self._initStreamerIds(properties['streamer-ids'])
+ self._initStreamerIds(properties['streams'])
self._proto = None
self._conn = None
@@ -175,7 +175,7 @@ class AccessLog():
if len(cs.keys()) == 1:
self._content_id = cs.keys()[0]
- print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+ print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id
if len(fs.keys()) == 1:
self._format = fs.keys()[0]
@@ -345,9 +345,9 @@ class AccessLog():
def _sendDatasetFull(self, timestamp, duration, content_id, format, quality):
clients = self._streamer[content_id][format][quality]
- data = { "version": 1, "hostname": self._hostname, "tags": self._tags,
- "streamer-id": {
- "content-id": content_id,
+ data = { "version": 2, "hostname": self._hostname, "tags": self._tags,
+ "stream": {
+ "content": content_id,
"format": format,
"quality": quality
},
@@ -362,17 +362,17 @@ class AccessLog():
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id" : { },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream" : { },
"tags": self._tags }
if self._content_id:
- initdata["streamer-id"]["content-id"] = self._content_id
+ initdata["stream"]["content"] = self._content_id
if self._format:
- initdata["streamer-id"]["format"] = self._format
+ initdata["stream"]["format"] = self._format
if self._quality:
- initdata["streamer-id"]["quality"] = self._quality
- if len(initdata["streamer-id"].keys()) == 0:
- del initdata["streamer-id"]
+ initdata["stream"]["quality"] = self._quality
+ if len(initdata["stream"].keys()) == 0:
+ del initdata["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
@@ -380,7 +380,7 @@ class AccessLog():
clients = self._streamer[content_id][format][quality]
data = { "start-time": timestamp.isoformat('T') + 'Z',
"duration-ms": duration * 1000,
- "streamer-id": { },
+ "stream": { },
"data": {
"clients": list(clients.values()),
"client-count": clients.getCnt(),
@@ -388,13 +388,13 @@ class AccessLog():
}
}
if not self._content_id:
- data["streamer-id"]["content-id"] = content_id
+ data["stream"]["content"] = content_id
if not self._format:
- data["streamer-id"]["format"] = format
+ data["stream"]["format"] = format
if not self._quality:
- data["streamer-id"]["quality"] = quality
- if len(data["streamer-id"].keys()) == 0:
- del data["streamer-id"]
+ data["stream"]["quality"] = quality
+ if len(data["stream"].keys()) == 0:
+ del data["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
@@ -411,8 +411,8 @@ if __name__ == '__main__':
help='time (in seconds) between updates; defaults to 15')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
- help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
+ parser.add_argument('--stream', '-S', dest='streams', action='append',
+ help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
parser.add_argument('--logfile', '-l', dest='logfile', required=True,
help='path to the logfile or \'-\' for standard input')
parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
@@ -426,8 +426,8 @@ if __name__ == '__main__':
args['duration'] = int(args['duration'])
if not args['nameformat']:
args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
- if not args['streamer-ids']:
- print 'SFive: you have to specify at least one streamer-id!'
+ if not args['streams']:
+ print 'SFive: you have to specify at least one stream!'
exit(-1)
importer = AccessLog(args)
importer.run()
diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py
index 0db01e4..58dc71a 100644
--- a/src/daq/flumotion-plug/s5.py
+++ b/src/daq/flumotion-plug/s5.py
@@ -126,7 +126,7 @@ class ComponentSFivePlug(base.ComponentPlug):
properties = self.args['properties']
self._socket = properties['socket']
self._hostname = properties['hostname']
- self._content_id = properties.get('content-id')
+ self._content_id = properties.get('content')
self._format = properties.get('format')
self._quality = properties.get('quality')
tagstring = properties.get('tags', '')
@@ -208,9 +208,9 @@ class ComponentSFivePlug(base.ComponentPlug):
def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent, bytes_received):
- data = { "version": 1, "hostname": self._hostname,
- "streamer-id": {
- "content-id": self._content_id,
+ data = { "version": 2, "hostname": self._hostname,
+ "stream": {
+ "content": self._content_id,
"format": self._format,
"quality": self._quality
},
@@ -226,8 +226,8 @@ class ComponentSFivePlug(base.ComponentPlug):
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream": { "content": self._content_id, "format": self._format, "quality": self._quality },
"tags": self._tags }
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd
index 197cb14..3170030 100755
--- a/src/daq/flumotion-rrd/s5-flumotion-rrd
+++ b/src/daq/flumotion-rrd/s5-flumotion-rrd
@@ -100,7 +100,7 @@ class FlumotionRRD():
self._socket = properties['socket']
self._hostname = properties['hostname']
- self._content_id = properties['content-id']
+ self._content_id = properties['content']
self._format = properties['format']
self._quality = properties['quality']
self._tags = properties['tags']
@@ -191,9 +191,9 @@ class FlumotionRRD():
def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent):
client_count = int(round(client_count)) if client_count else 0
bytes_sent = int(round(bytes_sent)) if bytes_sent else 0
- data = { "version": 1, "hostname": self._hostname,
- "streamer-id": {
- "content-id": self._content_id,
+ data = { "version": 2, "hostname": self._hostname,
+ "stream": {
+ "content": self._content_id,
"format": self._format,
"quality": self._quality
},
@@ -208,8 +208,8 @@ class FlumotionRRD():
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream": { "content": self._content_id, "format": self._format, "quality": self._quality },
"tags": self._tags }
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
@@ -234,8 +234,8 @@ if __name__ == '__main__':
help='the path to the data socket of the local SFive hub')
parser.add_argument('--hostname', '-n', dest='hostname', required=True,
help='the hostname of the machine')
- parser.add_argument('--content-id', '-c', dest='content-id', required=True,
- help='the content-id (i.e. av)')
+ parser.add_argument('--content', '-c', dest='content', required=True,
+ help='the content (i.e. av)')
parser.add_argument('--format', '-f', dest='format', required=True,
help='the format (i.e. webm)')
parser.add_argument('--quality', '-q', dest='quality', required=True,
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch
index 65e2c83..565a24f 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch
@@ -169,7 +169,7 @@ class NGXLuaFetcher():
self._duration = properties['duration']
self._tags = properties['tags']
self._url = properties['url']
- self._initStreamerIds(properties['streamer-ids'])
+ self._initStreamerIds(properties['streams'])
self._proto = None
self._conn = None
@@ -207,7 +207,7 @@ class NGXLuaFetcher():
if len(cs.keys()) == 1:
self._content_id = cs.keys()[0]
-# print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+# print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id
if len(fs.keys()) == 1:
self._format = fs.keys()[0]
@@ -310,9 +310,9 @@ class NGXLuaFetcher():
def _sendDatasetFull(self, timestamp, duration, content_id, format, quality):
clients = self._streamer[content_id][format][quality]
- data = { "version": 1, "hostname": self._hostname, "tags": self._tags,
- "streamer-id": {
- "content-id": content_id,
+ data = { "version": 2, "hostname": self._hostname, "tags": self._tags,
+ "stream": {
+ "content": content_id,
"format": format,
"quality": quality
},
@@ -327,17 +327,17 @@ class NGXLuaFetcher():
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id" : { },
+ initdata = { "version": 2, "hostname": self._hostname,
+ "stream" : { },
"tags": self._tags }
if self._content_id:
- initdata["streamer-id"]["content-id"] = self._content_id
+ initdata["stream"]["content"] = self._content_id
if self._format:
- initdata["streamer-id"]["format"] = self._format
+ initdata["stream"]["format"] = self._format
if self._quality:
- initdata["streamer-id"]["quality"] = self._quality
- if len(initdata["streamer-id"].keys()) == 0:
- del initdata["streamer-id"]
+ initdata["stream"]["quality"] = self._quality
+ if len(initdata["stream"].keys()) == 0:
+ del initdata["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
@@ -345,7 +345,7 @@ class NGXLuaFetcher():
clients = self._streamer[content_id][format][quality]
data = { "start-time": timestamp.isoformat('T') + 'Z',
"duration-ms": duration * 1000,
- "streamer-id": { },
+ "stream": { },
"data": {
"clients": list(clients.values()),
"client-count": clients.getCnt(),
@@ -353,13 +353,13 @@ class NGXLuaFetcher():
}
}
if not self._content_id:
- data["streamer-id"]["content-id"] = content_id
+ data["stream"]["content"] = content_id
if not self._format:
- data["streamer-id"]["format"] = format
+ data["stream"]["format"] = format
if not self._quality:
- data["streamer-id"]["quality"] = quality
- if len(data["streamer-id"].keys()) == 0:
- del data["streamer-id"]
+ data["stream"]["quality"] = quality
+ if len(data["stream"].keys()) == 0:
+ del data["stream"]
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
@@ -376,8 +376,8 @@ if __name__ == '__main__':
help='time (in seconds) between updates; defaults to 15')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
- help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
+ parser.add_argument('--stream', '-S', dest='streams', action='append',
+ help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
parser.add_argument('--url', '-u', dest='url', required=False,
help='the url of the nginx sfive exporert, default http://localhost/sfive')
parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
@@ -393,8 +393,8 @@ if __name__ == '__main__':
args['url'] = 'http://localhost/sfive'
if not args['nameformat']:
args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
- if not args['streamer-ids']:
- print 'SFive: you have to specify at least one streamer-id!'
+ if not args['streams']:
+ print 'SFive: you have to specify at least one stream!'
exit(-1)
fetcher = NGXLuaFetcher(args)
diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json
index b13ec7d..b1abcdc 100644
--- a/src/daq/s5proxy/sample.json
+++ b/src/daq/s5proxy/sample.json
@@ -18,8 +18,8 @@
"hostname": "public1",
"tags": [ "hello", "world" ],
"duration": "15s",
- "format" : "/${content-id}-${format}-${quality}",
- "streamer": [
+ "format" : "/${content}-${format}-${quality}",
+ "stream": [
"av-orig/flash,webm/high,medium,low,mini"
]
}
diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go
index 8a94c48..280b9fd 100644
--- a/src/daq/s5proxy/src/s5proxy/config.go
+++ b/src/daq/s5proxy/src/s5proxy/config.go
@@ -113,7 +113,7 @@ type SFiveConf struct {
Tags []string `json:"tags"`
Duration Duration `json:"duration"`
Format string `json:"format"`
- Streamer []string `json:"streamer"`
+ Stream []string `json:"stream"`
}
type Config struct {
diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go
index 96ccd74..803dfa3 100644
--- a/src/daq/s5proxy/src/s5proxy/stats.go
+++ b/src/daq/s5proxy/src/s5proxy/stats.go
@@ -44,8 +44,12 @@ import (
// TODO: this is basically a copy from src/hub....
-type StreamID struct {
- ContentID string `json:"content-id"`
+const (
+ ProtocolVersion = 2
+)
+
+type Stream struct {
+ ContentID string `json:"content"`
Format string `json:"format"`
Quality string `json:"quality"`
}
@@ -56,32 +60,32 @@ type ClientData struct {
BytesSent uint `json:"bytes-sent"`
}
-type SourceData struct {
+type UpdateData struct {
ClientCount uint `json:"client-count"`
BytesReceived uint `json:"bytes-received"`
BytesSent uint `json:"bytes-sent"`
Clients []ClientData `json:"clients,omitempty"`
}
-type DataUpdate struct {
+type Update struct {
Version uint `json:"version"`
Hostname string `json:"hostname"`
- StreamID StreamID `json:"streamer-id"`
+ Stream Stream `json:"stream"`
Tags []string `json:"tags,omitempty"`
StartTime time.Time `json:"start-time"`
Duration int64 `json:"duration-ms"`
- Data SourceData `json:"data"`
+ Data UpdateData `json:"data"`
}
type StatsWorker struct {
- stream StreamID
+ stream Stream
current map[string]*ClientData
trigger chan time.Time
- output chan<- *DataUpdate
+ output chan<- *Update
input chan *ClientData
}
-func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) {
+func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) {
sw.stream = stream
sw.current = make(map[string]*ClientData)
sw.trigger = make(chan time.Time)
@@ -95,7 +99,7 @@ func (sw StatsWorker) Run() {
select {
case t := <-sw.trigger:
if t.UnixNano() != 0 {
- upd := &DataUpdate{StreamID: sw.stream, StartTime: t}
+ upd := &Update{Stream: sw.stream, StartTime: t}
upd.Data.ClientCount = uint(len(sw.current))
for _, c := range sw.current {
upd.Data.Clients = append(upd.Data.Clients, *c)
@@ -122,7 +126,7 @@ type Stats struct {
conf *Config
sock net.Conn
dataEncoder *json.Encoder
- updates chan *DataUpdate
+ updates chan *Update
workers map[string]StatsWorker
}
@@ -136,7 +140,7 @@ func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData {
return nil
}
-func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) {
+func parseStreamDescriptionElement(desc, desctype string) (e []string, err error) {
parts := strings.Split(desc, ",")
for _, p := range parts {
if p = strings.TrimSpace(p); p == "" {
@@ -148,28 +152,28 @@ func parseStreamerDescriptionElement(desc, desctype string) (e []string, err err
return
}
-func parseStreamerDescription(desc string) (c, f, q []string, err error) {
+func parseStreamDescription(desc string) (c, f, q []string, err error) {
parts := strings.Split(desc, "/")
if len(parts) != 3 {
err = fmt.Errorf("invalid streamer description: '%s'", desc)
return
}
- if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil {
+ if c, err = parseStreamDescriptionElement(parts[0], "content"); err != nil {
return
}
- if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil {
+ if f, err = parseStreamDescriptionElement(parts[1], "format"); err != nil {
return
}
- if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil {
+ if q, err = parseStreamDescriptionElement(parts[2], "quality"); err != nil {
return
}
return
}
-func generateStreamerName(format, c, f, q string) string {
+func generateStreamName(format, c, f, q string) string {
return os.Expand(format, func(k string) string {
switch k {
- case "content-id":
+ case "content":
return c
case "format":
return f
@@ -187,18 +191,18 @@ func NewStats(conf *Config) (s *Stats, err error) {
return
}
- s.updates = make(chan *DataUpdate, 1000)
+ s.updates = make(chan *Update, 1000)
s.workers = make(map[string]StatsWorker)
var content, format, quality []string
- for _, desc := range conf.SFive.Streamer {
- if content, format, quality, err = parseStreamerDescription(desc); err != nil {
+ for _, desc := range conf.SFive.Stream {
+ if content, format, quality, err = parseStreamDescription(desc); err != nil {
return
}
for _, c := range content {
for _, f := range format {
for _, q := range quality {
- name := generateStreamerName(conf.SFive.Format, c, f, q)
- s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates)
+ name := generateStreamName(conf.SFive.Format, c, f, q)
+ s.workers[name] = NewStatsWorker(Stream{c, f, q}, s.updates)
s5l.Printf("STATS: adding streamer '%s'", name)
}
}
@@ -223,7 +227,7 @@ func (s *Stats) connectToHub() (err error) {
func (s *Stats) sendUpdates() {
for {
upd := <-s.updates
- upd.Version = 1
+ upd.Version = ProtocolVersion
upd.Hostname = s.conf.SFive.Hostname
upd.Tags = s.conf.SFive.Tags
upd.Duration = int64(s.conf.SFive.Duration)
diff --git a/src/es5/es-query.json b/src/es5/es-query.json
index b51f3f7..226a379 100644
--- a/src/es5/es-query.json
+++ b/src/es5/es-query.json
@@ -17,7 +17,7 @@
"aggregations": {
"by_source": {
"terms": {
- "script": "[ doc['hostname'].value, doc['streamer-id.content-id'].value, doc['streamer-id.format'].value, doc['streamer-id.quality'].value ].join('/') ",
+ "script": "[ doc['hostname'].value, doc['stream.content'].value, doc['stream.format'].value, doc['stream.quality'].value ].join('/') ",
"size": 0
},
"aggregations": {
diff --git a/src/es5/sfive-init.json b/src/es5/sfive-init.json
index bc89227..cf2a980 100644
--- a/src/es5/sfive-init.json
+++ b/src/es5/sfive-init.json
@@ -50,9 +50,9 @@
"type" : "date",
"format" : "dateOptionalTime"
},
- "streamer-id" : {
+ "stream" : {
"properties" : {
- "content-id" : {
+ "content" : {
"type" : "string",
"index" : "not_analyzed"
},
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 2741a5a..0abbdef 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -39,7 +39,7 @@ import (
)
const (
- ProtocolVersion = 1
+ ProtocolVersion = 2
)
//
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 46b5705..55d8415 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -42,7 +42,7 @@ import (
)
var (
- initEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
+ initEncoded = `"hostname": "localhost", "stream": {"quality": "low", "content": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
initStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
updateEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
updateStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 005e03d..2794a24 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -44,7 +44,7 @@ import (
const forwardEsLastUpdateIdJson = `{
"query": {"match": { "SourceHubUuid": "%s" } },
- "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } }
+ "aggregations": { "last-id" : { "max" : { "field": "SourceHubUpdateId" } } }
}`
func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 5d36e0d..0a30162 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -35,14 +35,14 @@ package sfive
import "time"
type Stream struct {
- ContentId string `json:"content-id"`
+ ContentId string `json:"content"`
Format string `json:"format"`
Quality string `json:"quality"`
}
type Source struct {
Hostname string `json:"hostname"`
- Stream Stream `json:"streamer-id"`
+ Stream Stream `json:"stream"`
Tags []string `json:"tags,omitempty"`
}
@@ -68,9 +68,9 @@ type Update struct {
type Header struct {
Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API
SourceHubUuid string `json:"SourceHubUuid,omitempty"`
- SourceHubUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
+ SourceHubUpdateId int `json:"SourceHubUpdateId,omitempty"`
ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`
- ForwardHubUpdateId int `json:"ForwardHubDataUpdateId,omitempty"`
+ ForwardHubUpdateId int `json:"ForwardHubUpdateId,omitempty"`
}
type UpdateFull struct {
diff --git a/src/hub/test-import b/src/hub/test-import
deleted file mode 100755
index a6c2942..0000000
--- a/src/hub/test-import
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/sh
-
-TEST_D="./test"
-
-echo pipe: import sample.json
-echo ------------------------
-
-[ -f ../../dat/sample-access.json ] || zcat ../../dat/sample-accesslog.json.gz > ../../dat/sample-accesslog.json
-
-socat file:../../dat/sample-accesslog.json,rdonly "unix-client:$TEST_D/pipe"
-
-echo '\n\ndone'