diff options
Diffstat (limited to 'src/daq')
-rwxr-xr-x | src/daq/accesslog/s5-accesslog | 44 | ||||
-rw-r--r-- | src/daq/flumotion-plug/s5.py | 12 | ||||
-rwxr-xr-x | src/daq/flumotion-rrd/s5-flumotion-rrd | 16 | ||||
-rwxr-xr-x | src/daq/nginx-lua/s5-nginx-lua-fetch | 44 | ||||
-rw-r--r-- | src/daq/s5proxy/sample.json | 4 | ||||
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/config.go | 2 | ||||
-rw-r--r-- | src/daq/s5proxy/src/s5proxy/stats.go | 52 |
7 files changed, 89 insertions, 85 deletions
diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog index e91c523..29ec9f3 100755 --- a/src/daq/accesslog/s5-accesslog +++ b/src/daq/accesslog/s5-accesslog @@ -138,7 +138,7 @@ class AccessLog(): self._duration = properties['duration'] self._tags = properties['tags'] self._logfile = properties['logfile'] - self._initStreamerIds(properties['streamer-ids']) + self._initStreamerIds(properties['streams']) self._proto = None self._conn = None @@ -175,7 +175,7 @@ class AccessLog(): if len(cs.keys()) == 1: self._content_id = cs.keys()[0] - print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id + print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id if len(fs.keys()) == 1: self._format = fs.keys()[0] @@ -345,9 +345,9 @@ class AccessLog(): def _sendDatasetFull(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] - data = { "version": 1, "hostname": self._hostname, "tags": self._tags, - "streamer-id": { - "content-id": content_id, + data = { "version": 2, "hostname": self._hostname, "tags": self._tags, + "stream": { + "content": content_id, "format": format, "quality": quality }, @@ -362,17 +362,17 @@ class AccessLog(): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id" : { }, + initdata = { "version": 2, "hostname": self._hostname, + "stream" : { }, "tags": self._tags } if self._content_id: - initdata["streamer-id"]["content-id"] = self._content_id + initdata["stream"]["content"] = self._content_id if self._format: - initdata["streamer-id"]["format"] = self._format + initdata["stream"]["format"] = self._format if self._quality: - initdata["streamer-id"]["quality"] = self._quality - if len(initdata["streamer-id"].keys()) == 0: - del initdata["streamer-id"] + initdata["stream"]["quality"] = self._quality + if len(initdata["stream"].keys()) == 0: + del initdata["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) @@ -380,7 +380,7 @@ class AccessLog(): clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, - "streamer-id": { }, + "stream": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), @@ -388,13 +388,13 @@ class AccessLog(): } } if not self._content_id: - data["streamer-id"]["content-id"] = content_id + data["stream"]["content"] = content_id if not self._format: - data["streamer-id"]["format"] = format + data["stream"]["format"] = format if not self._quality: - data["streamer-id"]["quality"] = quality - if len(data["streamer-id"].keys()) == 0: - del data["streamer-id"] + data["stream"]["quality"] = quality + if len(data["stream"].keys()) == 0: + del data["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(data))) @@ -411,8 +411,8 @@ if __name__ == '__main__': help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', - help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') + parser.add_argument('--stream', '-S', dest='streams', action='append', + help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') parser.add_argument('--logfile', '-l', dest='logfile', required=True, help='path to the logfile or \'-\' for standard input') parser.add_argument('--nameformat', '-F', dest='nameformat', required=False, @@ -426,8 +426,8 @@ if __name__ == '__main__': args['duration'] = int(args['duration']) if not args['nameformat']: args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*' - if not args['streamer-ids']: - print 'SFive: you have to specify at least one streamer-id!' + if not args['streams']: + print 'SFive: you have to specify at least one stream!' exit(-1) importer = AccessLog(args) importer.run() diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py index 0db01e4..58dc71a 100644 --- a/src/daq/flumotion-plug/s5.py +++ b/src/daq/flumotion-plug/s5.py @@ -126,7 +126,7 @@ class ComponentSFivePlug(base.ComponentPlug): properties = self.args['properties'] self._socket = properties['socket'] self._hostname = properties['hostname'] - self._content_id = properties.get('content-id') + self._content_id = properties.get('content') self._format = properties.get('format') self._quality = properties.get('quality') tagstring = properties.get('tags', '') @@ -208,9 +208,9 @@ class ComponentSFivePlug(base.ComponentPlug): def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent, bytes_received): - data = { "version": 1, "hostname": self._hostname, - "streamer-id": { - "content-id": self._content_id, + data = { "version": 2, "hostname": self._hostname, + "stream": { + "content": self._content_id, "format": self._format, "quality": self._quality }, @@ -226,8 +226,8 @@ class ComponentSFivePlug(base.ComponentPlug): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + initdata = { "version": 2, "hostname": self._hostname, + "stream": { "content": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd index 197cb14..3170030 100755 --- a/src/daq/flumotion-rrd/s5-flumotion-rrd +++ b/src/daq/flumotion-rrd/s5-flumotion-rrd @@ -100,7 +100,7 @@ class FlumotionRRD(): self._socket = properties['socket'] self._hostname = properties['hostname'] - self._content_id = properties['content-id'] + self._content_id = properties['content'] self._format = properties['format'] self._quality = properties['quality'] self._tags = properties['tags'] @@ -191,9 +191,9 @@ class FlumotionRRD(): def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent): client_count = int(round(client_count)) if client_count else 0 bytes_sent = int(round(bytes_sent)) if bytes_sent else 0 - data = { "version": 1, "hostname": self._hostname, - "streamer-id": { - "content-id": self._content_id, + data = { "version": 2, "hostname": self._hostname, + "stream": { + "content": self._content_id, "format": self._format, "quality": self._quality }, @@ -208,8 +208,8 @@ class FlumotionRRD(): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + initdata = { "version": 2, "hostname": self._hostname, + "stream": { "content": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) @@ -234,8 +234,8 @@ if __name__ == '__main__': help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') + parser.add_argument('--content', '-c', dest='content', required=True, + help='the content (i.e. av)') parser.add_argument('--format', '-f', dest='format', required=True, help='the format (i.e. webm)') parser.add_argument('--quality', '-q', dest='quality', required=True, diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch index 65e2c83..565a24f 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch @@ -169,7 +169,7 @@ class NGXLuaFetcher(): self._duration = properties['duration'] self._tags = properties['tags'] self._url = properties['url'] - self._initStreamerIds(properties['streamer-ids']) + self._initStreamerIds(properties['streams']) self._proto = None self._conn = None @@ -207,7 +207,7 @@ class NGXLuaFetcher(): if len(cs.keys()) == 1: self._content_id = cs.keys()[0] -# print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id +# print 'SFive: only one content detected "%s" - will include it in init messages only' % self._content_id if len(fs.keys()) == 1: self._format = fs.keys()[0] @@ -310,9 +310,9 @@ class NGXLuaFetcher(): def _sendDatasetFull(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] - data = { "version": 1, "hostname": self._hostname, "tags": self._tags, - "streamer-id": { - "content-id": content_id, + data = { "version": 2, "hostname": self._hostname, "tags": self._tags, + "stream": { + "content": content_id, "format": format, "quality": quality }, @@ -327,17 +327,17 @@ class NGXLuaFetcher(): self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id" : { }, + initdata = { "version": 2, "hostname": self._hostname, + "stream" : { }, "tags": self._tags } if self._content_id: - initdata["streamer-id"]["content-id"] = self._content_id + initdata["stream"]["content"] = self._content_id if self._format: - initdata["streamer-id"]["format"] = self._format + initdata["stream"]["format"] = self._format if self._quality: - initdata["streamer-id"]["quality"] = self._quality - if len(initdata["streamer-id"].keys()) == 0: - del initdata["streamer-id"] + initdata["stream"]["quality"] = self._quality + if len(initdata["stream"].keys()) == 0: + del initdata["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) @@ -345,7 +345,7 @@ class NGXLuaFetcher(): clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, - "streamer-id": { }, + "stream": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), @@ -353,13 +353,13 @@ class NGXLuaFetcher(): } } if not self._content_id: - data["streamer-id"]["content-id"] = content_id + data["stream"]["content"] = content_id if not self._format: - data["streamer-id"]["format"] = format + data["stream"]["format"] = format if not self._quality: - data["streamer-id"]["quality"] = quality - if len(data["streamer-id"].keys()) == 0: - del data["streamer-id"] + data["stream"]["quality"] = quality + if len(data["stream"].keys()) == 0: + del data["stream"] self._proto.sendDatagram('%s\n' % (json.dumps(data))) @@ -376,8 +376,8 @@ if __name__ == '__main__': help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', - help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') + parser.add_argument('--stream', '-S', dest='streams', action='append', + help='a streamer description like <content1>[,<content2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') parser.add_argument('--url', '-u', dest='url', required=False, help='the url of the nginx sfive exporert, default http://localhost/sfive') parser.add_argument('--nameformat', '-F', dest='nameformat', required=False, @@ -393,8 +393,8 @@ if __name__ == '__main__': args['url'] = 'http://localhost/sfive' if not args['nameformat']: args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*' - if not args['streamer-ids']: - print 'SFive: you have to specify at least one streamer-id!' + if not args['streams']: + print 'SFive: you have to specify at least one stream!' exit(-1) fetcher = NGXLuaFetcher(args) diff --git a/src/daq/s5proxy/sample.json b/src/daq/s5proxy/sample.json index b13ec7d..b1abcdc 100644 --- a/src/daq/s5proxy/sample.json +++ b/src/daq/s5proxy/sample.json @@ -18,8 +18,8 @@ "hostname": "public1", "tags": [ "hello", "world" ], "duration": "15s", - "format" : "/${content-id}-${format}-${quality}", - "streamer": [ + "format" : "/${content}-${format}-${quality}", + "stream": [ "av-orig/flash,webm/high,medium,low,mini" ] } diff --git a/src/daq/s5proxy/src/s5proxy/config.go b/src/daq/s5proxy/src/s5proxy/config.go index 8a94c48..280b9fd 100644 --- a/src/daq/s5proxy/src/s5proxy/config.go +++ b/src/daq/s5proxy/src/s5proxy/config.go @@ -113,7 +113,7 @@ type SFiveConf struct { Tags []string `json:"tags"` Duration Duration `json:"duration"` Format string `json:"format"` - Streamer []string `json:"streamer"` + Stream []string `json:"stream"` } type Config struct { diff --git a/src/daq/s5proxy/src/s5proxy/stats.go b/src/daq/s5proxy/src/s5proxy/stats.go index 96ccd74..803dfa3 100644 --- a/src/daq/s5proxy/src/s5proxy/stats.go +++ b/src/daq/s5proxy/src/s5proxy/stats.go @@ -44,8 +44,12 @@ import ( // TODO: this is basically a copy from src/hub.... -type StreamID struct { - ContentID string `json:"content-id"` +const ( + ProtocolVersion = 2 +) + +type Stream struct { + ContentID string `json:"content"` Format string `json:"format"` Quality string `json:"quality"` } @@ -56,32 +60,32 @@ type ClientData struct { BytesSent uint `json:"bytes-sent"` } -type SourceData struct { +type UpdateData struct { ClientCount uint `json:"client-count"` BytesReceived uint `json:"bytes-received"` BytesSent uint `json:"bytes-sent"` Clients []ClientData `json:"clients,omitempty"` } -type DataUpdate struct { +type Update struct { Version uint `json:"version"` Hostname string `json:"hostname"` - StreamID StreamID `json:"streamer-id"` + Stream Stream `json:"stream"` Tags []string `json:"tags,omitempty"` StartTime time.Time `json:"start-time"` Duration int64 `json:"duration-ms"` - Data SourceData `json:"data"` + Data UpdateData `json:"data"` } type StatsWorker struct { - stream StreamID + stream Stream current map[string]*ClientData trigger chan time.Time - output chan<- *DataUpdate + output chan<- *Update input chan *ClientData } -func NewStatsWorker(stream StreamID, updates chan<- *DataUpdate) (sw StatsWorker) { +func NewStatsWorker(stream Stream, updates chan<- *Update) (sw StatsWorker) { sw.stream = stream sw.current = make(map[string]*ClientData) sw.trigger = make(chan time.Time) @@ -95,7 +99,7 @@ func (sw StatsWorker) Run() { select { case t := <-sw.trigger: if t.UnixNano() != 0 { - upd := &DataUpdate{StreamID: sw.stream, StartTime: t} + upd := &Update{Stream: sw.stream, StartTime: t} upd.Data.ClientCount = uint(len(sw.current)) for _, c := range sw.current { upd.Data.Clients = append(upd.Data.Clients, *c) @@ -122,7 +126,7 @@ type Stats struct { conf *Config sock net.Conn dataEncoder *json.Encoder - updates chan *DataUpdate + updates chan *Update workers map[string]StatsWorker } @@ -136,7 +140,7 @@ func (s *Stats) GetUpdateChannel(url string) chan<- *ClientData { return nil } -func parseStreamerDescriptionElement(desc, desctype string) (e []string, err error) { +func parseStreamDescriptionElement(desc, desctype string) (e []string, err error) { parts := strings.Split(desc, ",") for _, p := range parts { if p = strings.TrimSpace(p); p == "" { @@ -148,28 +152,28 @@ func parseStreamerDescriptionElement(desc, desctype string) (e []string, err err return } -func parseStreamerDescription(desc string) (c, f, q []string, err error) { +func parseStreamDescription(desc string) (c, f, q []string, err error) { parts := strings.Split(desc, "/") if len(parts) != 3 { err = fmt.Errorf("invalid streamer description: '%s'", desc) return } - if c, err = parseStreamerDescriptionElement(parts[0], "content-id"); err != nil { + if c, err = parseStreamDescriptionElement(parts[0], "content"); err != nil { return } - if f, err = parseStreamerDescriptionElement(parts[1], "format"); err != nil { + if f, err = parseStreamDescriptionElement(parts[1], "format"); err != nil { return } - if q, err = parseStreamerDescriptionElement(parts[2], "quality"); err != nil { + if q, err = parseStreamDescriptionElement(parts[2], "quality"); err != nil { return } return } -func generateStreamerName(format, c, f, q string) string { +func generateStreamName(format, c, f, q string) string { return os.Expand(format, func(k string) string { switch k { - case "content-id": + case "content": return c case "format": return f @@ -187,18 +191,18 @@ func NewStats(conf *Config) (s *Stats, err error) { return } - s.updates = make(chan *DataUpdate, 1000) + s.updates = make(chan *Update, 1000) s.workers = make(map[string]StatsWorker) var content, format, quality []string - for _, desc := range conf.SFive.Streamer { - if content, format, quality, err = parseStreamerDescription(desc); err != nil { + for _, desc := range conf.SFive.Stream { + if content, format, quality, err = parseStreamDescription(desc); err != nil { return } for _, c := range content { for _, f := range format { for _, q := range quality { - name := generateStreamerName(conf.SFive.Format, c, f, q) - s.workers[name] = NewStatsWorker(StreamID{c, f, q}, s.updates) + name := generateStreamName(conf.SFive.Format, c, f, q) + s.workers[name] = NewStatsWorker(Stream{c, f, q}, s.updates) s5l.Printf("STATS: adding streamer '%s'", name) } } @@ -223,7 +227,7 @@ func (s *Stats) connectToHub() (err error) { func (s *Stats) sendUpdates() { for { upd := <-s.updates - upd.Version = 1 + upd.Version = ProtocolVersion upd.Hostname = s.conf.SFive.Hostname upd.Tags = s.conf.SFive.Tags upd.Duration = int64(s.conf.SFive.Duration) |