diff options
-rw-r--r-- | dat/sample-accesslog.json.gz | bin | 224140 -> 0 bytes | |||
-rw-r--r-- | dat/sample-importer.json.gz | bin | 20176 -> 0 bytes | |||
-rw-r--r-- | dat/sample-pipe.json | 2 | ||||
-rw-r--r-- | dat/sample-pipegram.json | 6 | ||||
-rw-r--r-- | dat/sample-web-bulk.json | 4 | ||||
-rw-r--r-- | dat/sample-web.json | 4 | ||||
-rw-r--r-- | doc/protocol.md | 18 | ||||
-rwxr-xr-x | src/daq/accesslog/s5-accesslog | 44 | ||||
-rw-r--r-- | src/daq/flumotion-plug/s5.py | 12 | ||||
-rwxr-xr-x | src/daq/flumotion-rrd/s5-flumotion-rrd | 16 | ||||
-rwxr-xr-x | src/daq/nginx-lua/s5-nginx-lua-fetch | 44 | ||||
-rw-r--r-- | src/daq/s5proxy/sample.json | 4 | ||||
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/config.go | 2 | ||||
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/stats.go | 52 | ||||
-rw-r--r-- | src/es5/es-query.json | 2 | ||||
-rw-r--r-- | src/es5/sfive-init.json | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesApi.go | 8 | ||||
-rwxr-xr-x | src/hub/test-import | 12 |
21 files changed, 116 insertions, 124 deletions
diff --git a/dat/sample-accesslog.json.gz b/dat/sample-accesslog.json.gz Binary files differdeleted file mode 100644 index a68d35d..0000000 --- a/dat/sample-accesslog.json.gz +++ /dev/null diff --git a/dat/sample-importer.json.gz b/dat/sample-importer.json.gz Binary files differdeleted file mode 100644 index 7baf9a4..0000000 --- a/dat/sample-importer.json.gz +++ /dev/null diff --git a/dat/sample-pipe.json b/dat/sample-pipe.json index ed327fc..f099ba1 100644 --- a/dat/sample-pipe.json +++ b/dat/sample-pipe.json @@ -1,4 +1,4 @@ -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], "version": 1} +{"version": 2, "stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"]} {"data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000} {"data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000} {"data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000} diff --git a/dat/sample-pipegram.json b/dat/sample-pipegram.json index a9c919a..7dc6b6a 100644 --- a/dat/sample-pipegram.json +++ b/dat/sample-pipegram.json @@ -1,3 +1,3 @@ -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000} -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000} -{"streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 1, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000} +{"stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 2, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000} +{"stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 2, "data": {"bytes-sent": 22849, "client-count": 1}, "start-time": "2013-10-21T12:35:00Z", "duration-ms": 300000} +{"stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["other", "toll"], "version": 2, "data": {"bytes-sent": 33100, "client-count": 1}, "start-time": "2013-10-21T12:40:00Z", "duration-ms": 300000} diff --git a/dat/sample-web-bulk.json b/dat/sample-web-bulk.json index 31f1f2b..3809500 100644 --- a/dat/sample-web-bulk.json +++ b/dat/sample-web-bulk.json @@ -1,6 +1,6 @@ { - "version": 1, - "streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, + "version": 2, + "stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"] } diff --git a/dat/sample-web.json b/dat/sample-web.json index 35c76e0..c6cc573 100644 --- a/dat/sample-web.json +++ b/dat/sample-web.json @@ -1,8 +1,8 @@ { - "streamer-id": {"quality": "high", "content-id": "av", "format": "webm"}, + "stream": {"quality": "high", "content": "av", "format": "webm"}, "hostname": "localhost", "tags": ["suppa", "toll"], - "version": 1, + "version": 2, "data": {"bytes-sent": 1519, "client-count": 0}, "start-time": "2013-10-21T12:30:00Z", "duration-ms": 300000 diff --git a/doc/protocol.md b/doc/protocol.md index 29391c6..e960506 100644 --- a/doc/protocol.md +++ b/doc/protocol.md @@ -9,13 +9,13 @@ defaults which will be used if the corresponding value is missing in subsequent update messages. { - "version": 1, + "version": 2, "SourceHubUuid": "f7df89b4-171e-4b2f-a8a4-e58ac99e5dc5", - "SourceHubDataUpdateId": 23, + "SourceHubUpdateId": 23, "ForwardHubUuid": "b041315e-5039-4c75-81e8-9fd42250b011", - "ForwardHubDataUpdateId": 42, + "ForwardHubUpdateId": 42, "hostname": "myhostname", - "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" }, + "stream": { "content": "av-orig", "format": "flash", "quality": "medium" }, "tags": [ "elevate", "2014", "discourse" ] } @@ -26,19 +26,19 @@ data-update All values which have been defined by the init message are optional. In any case the values from data updates override values from init. Stateless interfaces will not use init messages and therefore all values must be defined here. -"SourceHubUuid", "SourceHubDataUpdateId", "ForwardHubUuid", "ForwardHubDataUpdateId", +"SourceHubUuid", "SourceHubUpdateId", "ForwardHubUuid", "ForwardHubUpdateId", "user-agent", "bytes-received", "tags" and "clients" might be omitted and are treated as an empty string, 0 or empty array respectively. The start-time will be processesd and stored with millisecond precision. { - "version": 1, + "version": 2, "SourceHubUuid": "f7df89b4-171e-4b2f-a8a4-e58ac99e5dc5", - "SourceHubDataUpdateId": 23, + "SourceHubUpdateId": 23, "ForwardHubUuid": "b041315e-5039-4c75-81e8-9fd42250b011", - "ForwardHubDataUpdateId": 42, + "ForwardHubUpdateId": 42, "hostname": "myhostname", - "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" }, + "stream": { "content": "av-orig", "format": "flash", "quality": "medium" }, "tags": [ "elevate", "2014", "discourse" ] "start-time": "2014-08-03T12:34:56.123Z", "duration-ms": 5000, diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog index e91c523..29ec9f3 100755 --- a/src/daq/accesslog/s5-accesslog +++ b/src/daq/accesslog/s5-accesslog @@ -138,7 +138,7 @@ class AccessLog(): self._duration = properties['duration'] self._tags = properties['tags'] self._logfile = properties['logfile'] - self._initStreamerIds(properties['streamer-ids']) + self._initStreamerIds(properties['streams']) self._proto = None self._conn = None @@ -175,7 +175,7 @@ class AccessLog(): if len(cs.keys()) == 1: self._content_id = cs.keys()[0] - print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id + print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id if len(fs.keys()) == 1: self._format = fs.keys()[0] @@ -345,9 +345,9 @@ class AccessLog(): def _sendDatasetFull(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] - data = { "version": 1, "hostname": self._hostname, "tags": self._tags, - "streamer-id": { - "content-id": content_id, + data = { "version": 2, "hostname": self._hostname, "tags": self._tags, + "stream": { + "content": content_id, "format": format, "quality": quality }, @@ -362,17 +362,17 @@ class AccessLog(): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id" : { }, + initdata = { "version": 2, "hostname": self._hostname, + "stream" : { }, "tags": self._tags } if self._content_id: - initdata["streamer-id"]["content-id"] = self._content_id + initdata["stream"]["content"] = self._content_id if self._format: - initdata["streamer-id"]["format"] = self._format + initdata["stream"]["format"] = self._format if self._quality: - initdata["streamer-id"]["quality"] = self._quality - if len(initdata["streamer-id"].keys()) == 0: - del initdata["streamer-id"] + initdata["stream"]["quality"] = self._quality + if len(initdata["stream"].keys()) == 0: + del initdata["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) @@ -380,7 +380,7 @@ class AccessLog(): clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, - "streamer-id": { }, + "stream": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), @@ -388,13 +388,13 @@ class AccessLog(): } } if not self._content_id: - data["streamer-id"]["content-id"] = content_id + data["stream"]["content"] = content_id if not self._format: - data["streamer-id"]["format"] = format + data["stream"]["format"] = format if not self._quality: - data["streamer-id"]["quality"] = quality - if len(data["streamer-id"].keys()) == 0: - del data["streamer-id"] + data["stream"]["quality"] = quality + if len(data["stream"].keys()) == 0: + del data["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(data))) @@ -411,8 +411,8 @@ if __name__ == '__main__': help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', - help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') + parser.add_argument('--stream', '-S', dest='streams', action='append', + help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') parser.add_argument('--logfile', '-l', dest='logfile', required=True, help='path to the logfile or \'-\' for standard input') parser.add_argument('--nameformat', '-F', dest='nameformat', required=False, @@ -426,8 +426,8 @@ if __name__ == '__main__': args['duration'] = int(args['duration']) if not args['nameformat']: args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*' - if not args['streamer-ids']: - print 'SFive: you have to specify at least one streamer-id!' + if not args['streams']: + print 'SFive: you have to specify at least one stream!' exit(-1) importer = AccessLog(args) importer.run() diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py index 0db01e4..58dc71a 100644 --- a/src/daq/flumotion-plug/s5.py +++ b/src/daq/flumotion-plug/s5.py @@ -126,7 +126,7 @@ class ComponentSFivePlug(base.ComponentPlug): properties = self.args['properties'] self._socket = properties['socket'] self._hostname = properties['hostname'] - self._content_id = properties.get('content-id') + self._content_id = properties.get('content') self._format = properties.get('format') self._quality = properties.get('quality') tagstring = properties.get('tags', '') @@ -208,9 +208,9 @@ class ComponentSFivePlug(base.ComponentPlug): def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent, bytes_received): - data = { "version": 1, "hostname": self._hostname, - "streamer-id": { - "content-id": self._content_id, + data = { "version": 2, "hostname": self._hostname, + "stream": { + "content": self._content_id, "format": self._format, "quality": self._quality }, @@ -226,8 +226,8 @@ class ComponentSFivePlug(base.ComponentPlug): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + initdata = { "version": 2, "hostname": self._hostname, + "stream": { "content": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd index 197cb14..3170030 100755 --- a/src/daq/flumotion-rrd/s5-flumotion-rrd +++ b/src/daq/flumotion-rrd/s5-flumotion-rrd @@ -100,7 +100,7 @@ class FlumotionRRD(): self._socket = properties['socket'] self._hostname = properties['hostname'] - self._content_id = properties['content-id'] + self._content_id = properties['content'] self._format = properties['format'] self._quality = properties['quality'] self._tags = properties['tags'] @@ -191,9 +191,9 @@ class FlumotionRRD(): def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent): client_count = int(round(client_count)) if client_count else 0 bytes_sent = int(round(bytes_sent)) if bytes_sent else 0 - data = { "version": 1, "hostname": self._hostname, - "streamer-id": { - "content-id": self._content_id, + data = { "version": 2, "hostname": self._hostname, + "stream": { + "content": self._content_id, "format": self._format, "quality": self._quality }, @@ -208,8 +208,8 @@ class FlumotionRRD(): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + initdata = { "version": 2, "hostname": self._hostname, + "stream": { "content": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) @@ -234,8 +234,8 @@ if __name__ == '__main__': help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') + parser.add_argument('--content', '-c', dest='content', required=True, + help='the content (i.e. av)') parser.add_argument('--format', '-f', dest='format', required=True, help='the format (i.e. webm)') parser.add_argument('--quality', '-q', dest='quality', required=True, diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch index 65e2c83..565a24f 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch @@ -169,7 +169,7 @@ class NGXLuaFetcher(): self._duration = properties['duration'] self._tags = properties['tags'] self._url = properties['url'] - self._initStreamerIds(properties['streamer-ids']) + self._initStreamerIds(properties['streams']) self._proto = None self._conn = None @@ -207,7 +207,7 @@ class NGXLuaFetcher(): if len(cs.keys()) == 1: self._content_id = cs.keys()[0] -# print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id +# print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id if len(fs.keys()) == 1: self._format = fs.keys()[0] @@ -310,9 +310,9 @@ class NGXLuaFetcher(): def _sendDatasetFull(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] - data = { "version": 1, "hostname": self._hostname, "tags": self._tags, - "streamer-id": { - "content-id": content_id, + data = { "version": 2, "hostname": self._hostname, "tags": self._tags, + "stream": { + "content": content_id, "format": format, "quality": quality }, @@ -327,17 +327,17 @@ class NGXLuaFetcher(): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id" : { }, + initdata = { "version": 2, "hostname": self._hostname, + "stream" : { }, "tags": self._tags } if self._content_id: - initdata["streamer-id"]["content-id"] = self._content_id + initdata["stream"]["content"] = self._content_id if self._format: - initdata["streamer-id"]["format"] = self._format + initdata["stream"]["format"] = self._format if self._quality: - initdata["streamer-id"]["quality"] = self._quality - if len(initdata["streamer-id"].keys()) == 0: - del initdata["streamer-id"] + initdata["stream"]["quality"] = self._quality + if len(initdata["stream"].keys()) == 0: + del initdata["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) @@ -345,7 +345,7 @@ class NGXLuaFetcher(): clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, - "streamer-id": { }, + "stream": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), @@ -353,13 +353,13 @@ class NGXLuaFetcher(): } } if not self._content_id: - data["streamer-id"]["content-id"] = content_id + data["stream"]["content"] = content_id if not self._format: - data["streamer-id"]["format"] = format + data["stream"]["format"] = format if not self._quality: - data["streamer-id"]["quality"] = quality - if len(data["streamer-id"].keys()) == 0: - del data["streamer-id"] + data["stream"]["quality"] = quality + if len(data["stream"].keys()) == 0: + del data["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(data))) @@ -376,8 +376,8 @@ if __name__ == '__main__': help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', - help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') + parser.add_argument('--stream', '-S', dest='streams', action='append', + help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') parser.add_argument('--url', '-u', dest='url', required=False, help='the url of the nginx sfive exporert, default http://localhost/sfive') parser.add_argument('--nameformat', '-F', dest='nameformat', required=False, @@ -393,8 +393,8 @@ if __name__ == '__main__': args['url'] = 'http://localhost/sfive' if not args['nameformat']: args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*' - if not args['streamer-ids']: - print 'SFive: you have to specify at least one streamer-id!' + if not args['streams']: + print 'SFive: you have to specify at least one stream!' exit(-1) fetcher = NGXLuaFetcher(args) diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index b13ec7d..b1abcdc 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -18,8 +18,8 @@ "hostname": "public1", "tags": [ "hello", "world" ], "duration": "15s", - "format" : "/${content-id}-${format}-${quality}", - "streamer": [ + "format" : "/${content}-${format}-${quality}", + "stream": [ "av-orig/flash,webm/high,medium,low,mini" ] } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index 8a94c48..280b9fd 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -113,7 +113,7 @@ type SFiveConf struct { Tags []string `json:"tags"` Duration Duration `json:"duration"` Format string `json:"format"` - Streamer []string `json:"streamer"` + Stream []string `json:"stream"` } type Config struct { diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 96ccd74..803dfa3 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -44,8 +44,12 @@ import ( // TODO: this is basically a copy from src/hub.... -type StreamID struct { - ContentID string `json:"content-id"` +const ( + ProtocolVersion = 2 +) + +type Stream struct { + ContentID string `json:"content"` Format string `json:"format"` Quality string `json:"quality"` } @@ -56,32 +60,32 @@ type ClientData struct { BytesSent uint `json:"bytes-sent"` } -type SourceData struct { +type UpdateData struct { ClientCount uint `json:"client-count"` BytesReceived uint `json:"bytes-received"` BytesSent uint `json:"bytes-sent"` Clients []ClientData `json:"clients,omitempty"` } -type DataUpdate struct { +type Update struct { Version uint `json:"version"` Hostname string `json:"hostname"` - StreamID StreamID `json:"streamer-id"` + Stream Stream `json:"stream"` Tags []string `json:"tags,omitempty"` StartTime time.Time `json:"start-time"` Duration int64 `json:"duration-ms"` - Data SourceData `json:"data"` + Data UpdateData `json:"data"` } type StatsWorker struct { - stream StreamID + stream Stream current map[string]*ClientData trigger chan time.Time - output chan<- *DataUpdate + output chan<- *Update input chan *ClientData } -func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) { +func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) { sw.stream = stream sw.current = make(map[string]*ClientData) sw.trigger = make(chan time.Time) @@ -95,7 +99,7 @@ func (sw StatsWorker) Run() { select { case t := <-sw.trigger: if t.UnixNano() != 0 { - upd := &DataUpdate{StreamID: sw.stream, StartTime: t} + upd := &Update{Stream: sw.stream, StartTime: t} upd.Data.ClientCount = uint(len(sw.current)) for _, c := range sw.current { upd.Data.Clients = append(upd.Data.Clients, *c) @@ -122,7 +126,7 @@ type Stats struct { conf *Config sock net.Conn dataEncoder *json.Encoder - updates chan *DataUpdate + updates chan *Update workers map[string]StatsWorker } @@ -136,7 +140,7 @@ func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData { return nil } -func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) { +func parseStreamDescriptionElement(desc, desctype string) (e []string, err error) { parts := strings.Split(desc, ",") for _, p := range parts { if p = strings.TrimSpace(p); p == "" { @@ -148,28 +152,28 @@ func parseStreamerDescriptionElement(desc, desctype string) (e []string, err err return } -func parseStreamerDescription(desc string) (c, f, q []string, err error) { +func parseStreamDescription(desc string) (c, f, q []string, err error) { parts := strings.Split(desc, "/") if len(parts) != 3 { err = fmt.Errorf("invalid streamer description: '%s'", desc) return } - if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil { + if c, err = parseStreamDescriptionElement(parts[0], "content"); err != nil { return } - if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil { + if f, err = parseStreamDescriptionElement(parts[1], "format"); err != nil { return } - if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil { + if q, err = parseStreamDescriptionElement(parts[2], "quality"); err != nil { return } return } -func generateStreamerName(format, c, f, q string) string { +func generateStreamName(format, c, f, q string) string { return os.Expand(format, func(k string) string { switch k { - case "content-id": + case "content": return c case "format": return f @@ -187,18 +191,18 @@ func NewStats(conf *Config) (s *Stats, err error) { return } - s.updates = make(chan *DataUpdate, 1000) + s.updates = make(chan *Update, 1000) s.workers = make(map[string]StatsWorker) var content, format, quality []string - for _, desc := range conf.SFive.Streamer { - if content, format, quality, err = parseStreamerDescription(desc); err != nil { + for _, desc := range conf.SFive.Stream { + if content, format, quality, err = parseStreamDescription(desc); err != nil { return } for _, c := range content { for _, f := range format { for _, q := range quality { - name := generateStreamerName(conf.SFive.Format, c, f, q) - s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates) + name := generateStreamName(conf.SFive.Format, c, f, q) + s.workers[name] = NewStatsWorker(Stream{c, f, q}, s.updates) s5l.Printf("STATS: adding streamer '%s'", name) } } @@ -223,7 +227,7 @@ func (s *Stats) connectToHub() (err error) { func (s *Stats) sendUpdates() { for { upd := <-s.updates - upd.Version = 1 + upd.Version = ProtocolVersion upd.Hostname = s.conf.SFive.Hostname upd.Tags = s.conf.SFive.Tags upd.Duration = int64(s.conf.SFive.Duration) diff --git a/src/es5/es-query.json b/src/es5/es-query.json index b51f3f7..226a379 100644 --- a/src/es5/es-query.json +++ b/src/es5/es-query.json @@ -17,7 +17,7 @@ "aggregations": { "by_source": { "terms": { - "script": "[ doc['hostname'].value, doc['streamer-id.content-id'].value, doc['streamer-id.format'].value, doc['streamer-id.quality'].value ].join('/') ", + "script": "[ doc['hostname'].value, doc['stream.content'].value, doc['stream.format'].value, doc['stream.quality'].value ].join('/') ", "size": 0 }, "aggregations": { diff --git a/src/es5/sfive-init.json b/src/es5/sfive-init.json index bc89227..cf2a980 100644 --- a/src/es5/sfive-init.json +++ b/src/es5/sfive-init.json @@ -50,9 +50,9 @@ "type" : "date", "format" : "dateOptionalTime" }, - "streamer-id" : { + "stream" : { "properties" : { - "content-id" : { + "content" : { "type" : "string", "index" : "not_analyzed" }, diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 2741a5a..0abbdef 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -39,7 +39,7 @@ import ( ) const ( - ProtocolVersion = 1 + ProtocolVersion = 2 ) // diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 46b5705..55d8415 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -42,7 +42,7 @@ import ( ) var ( - initEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` + initEncoded = `"hostname": "localhost", "stream": {"quality": "low", "content": "av", "format": "webm"}, "tags": ["elevate", "2014"]` initStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} updateEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` updateStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 005e03d..2794a24 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -44,7 +44,7 @@ import ( const forwardEsLastUpdateIdJson = `{ "query": {"match": { "SourceHubUuid": "%s" } }, - "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } + "aggregations": { "last-id" : { "max" : { "field": "SourceHubUpdateId" } } } }` func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 5d36e0d..0a30162 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -35,14 +35,14 @@ package sfive import "time" type Stream struct { - ContentId string `json:"content-id"` + ContentId string `json:"content"` Format string `json:"format"` Quality string `json:"quality"` } type Source struct { Hostname string `json:"hostname"` - Stream Stream `json:"streamer-id"` + Stream Stream `json:"stream"` Tags []string `json:"tags,omitempty"` } @@ -68,9 +68,9 @@ type Update struct { type Header struct { Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API SourceHubUuid string `json:"SourceHubUuid,omitempty"` - SourceHubUpdateId int `json:"SourceHubDataUpdateId,omitempty"` + SourceHubUpdateId int `json:"SourceHubUpdateId,omitempty"` ForwardHubUuid string `json:"ForwardHubUuid,omitempty"` - ForwardHubUpdateId int `json:"ForwardHubDataUpdateId,omitempty"` + ForwardHubUpdateId int `json:"ForwardHubUpdateId,omitempty"` } type UpdateFull struct { diff --git a/src/hub/test-import b/src/hub/test-import deleted file mode 100755 index a6c2942..0000000 --- a/src/hub/test-import +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/sh - -TEST_D="./test" - -echo pipe: import sample.json -echo ------------------------ - -[ -f ../../dat/sample-access.json ] || zcat ../../dat/sample-accesslog.json.gz > ../../dat/sample-accesslog.json - -socat file:../../dat/sample-accesslog.json,rdonly "unix-client:$TEST_D/pipe" - -echo '\n\ndone' |