From 694253b66a3130203e406ec25a73cb6950929f57 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 1 Feb 2018 10:54:34 +0100 Subject: workers can now have a subname --- src/examples/elevate2018.yml | 4 ++- src/flufigut.py | 82 ++++++++++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/src/examples/elevate2018.yml b/src/examples/elevate2018.yml index 27d7343..77bf36a 100644 --- a/src/examples/elevate2018.yml +++ b/src/examples/elevate2018.yml @@ -35,7 +35,9 @@ globals: vp8: { 1080p25: 3500, 720p25: 1800, 480p25: 1000, 360p25: 600, 240p25: 300 } mjpeg: { 1080p25: 95 } workers: - dione: [ "capture-*", "resize-*", "resample-*", "encode-*" ] + dione: + capture: [ "capture-*" ] + default: [ "resize-*", "resample-*", "encode-*" ] helene: [ "encode-sdi-orig-vp8-*", "encode-sdi-orig-h264-360p25", "mux-*", "http-local1*", "record-*" ] emc-00: [ "repeat-public*" ] emc-01: [ "http-public1*" ] diff --git a/src/flufigut.py b/src/flufigut.py index 3e75ea6..ce2a48b 100755 --- a/src/flufigut.py +++ b/src/flufigut.py @@ -45,12 +45,24 @@ def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + # a flufigut stream description ############################### # +class Worker: + + def __init__(self, name, subname=None): + self.name = name + self.subname = subname + + def __str__(self): + if self.subname: + return "%s:%s" % (self.name, self.subname) + return self.name + + class WorkerPattern: - def __init__(self, pattern, is_prefix, worker_name): + def __init__(self, pattern, is_prefix, worker): self.pattern = pattern self.is_prefix = is_prefix - self.worker_name = worker_name + self.worker = worker class Description: @@ -64,23 +76,33 @@ class Description: self.worker_patterns = {} + def __compute_worker_patterns(self, worker_name, worker_subname, patterns): + for pattern in patterns: + is_prefix = False + search_string = pattern + worker = Worker(worker_name, worker_subname) + if '*' in pattern: + if '*' in pattern[:-1]: + raise Exception("invalid worker pattern '%s' in worker '%s' component globs may only have a single '*' at the end)" % (pattern, worker)) + is_prefix = True + search_string = pattern[:-1] + if not search_string: + raise Exception("worker '%s' has wildcard or empty pattern which is not allowed" % (worker)) + if search_string in self.worker_patterns: + raise Exception("worker pattern '%s' is used by at least two workers (%s and %s)" % + (pattern, worker, self.worker_patterns[search_string].worker)) + + self.worker_patterns[search_string] = WorkerPattern(pattern, is_prefix, worker) + def _compute_worker_patterns(self): - for worker_name, worker in self.globals['workers'].items(): - for pattern in worker: - is_prefix = False - search_string = pattern - if '*' in pattern: - if '*' in pattern[:-1]: - raise Exception("invalid worker pattern '%s' (component globs may only have a single '*' at the end)" % pattern) - is_prefix = True - search_string = pattern[:-1] - if not pattern: - raise Exception("worker '%s' has wildcard pattern '%s' which is not allowed" % (worker_name, pattern)) - if search_string in self.worker_patterns: - raise Exception("worker pattern '%s' is used by at least two workers (%s and %s)" % - (pattern, self.worker_patterns[search_string].worker_name, worker_name)) - - self.worker_patterns[search_string] = WorkerPattern(pattern, is_prefix, worker_name) + for name, entry in self.globals['workers'].items(): + if isinstance(entry, list): + self.__compute_worker_patterns(name, None, entry) + elif isinstance(entry, dict): + for subname, subentry in entry.items(): + self.__compute_worker_patterns(name, subname, subentry) + else: + raise Exception("globals.workers entries must contain lists of patterns") def parse(self, config_file): cf = open(config_file, 'r') @@ -128,7 +150,7 @@ class Planet: if comp_name.startswith(search_string): if len(search_string) == matched_prefix_len: raise Exception("component '%s': both patterns '%s' of worker '%s' and '%s' of worker '%s' match the component name and have the same length" % - (comp_name, match.pattern, match.worker_name, pattern.pattern, pattern.worker_name)) + (comp_name, match.pattern, match.worker, pattern.pattern, pattern.worker)) elif len(search_string) > matched_prefix_len: match_prefix_len = len(search_string) match = pattern @@ -141,7 +163,7 @@ class Planet: self._unassigned_components.append(comp_name) return None - return match.worker_name + return match.worker # # inputs @@ -163,7 +185,7 @@ class Planet: self.flow['inputs'][comp_name] = { 'type': input['type'], 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'master': input['master'], 'properties': {}, } @@ -187,7 +209,7 @@ class Planet: self.flow['inputs'][comp_name] = { 'type': 'audio-resample', 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': input, 'properties': { 'samplerate': target_samplerate, @@ -217,7 +239,7 @@ class Planet: self.flow['encoders-audio'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'bitrate': bitrate, @@ -238,7 +260,7 @@ class Planet: self.flow['inputs'][comp_name] = { 'type': 'video-resize', 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': input, 'properties': { 'width': self._desc.globals['resolutions'][target_resolution]['width'], @@ -271,7 +293,7 @@ class Planet: self.flow['encoders-video'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'bitrate': bitrate, @@ -287,7 +309,7 @@ class Planet: self.flow['muxers'][comp_name] = { 'type': '%s-mux' % muxer, 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder_audio': feeder_audio, 'feeder_video': feeder_video, 'properties': {}, @@ -326,7 +348,7 @@ class Planet: self.atmosphere[porter.name] = { 'type': "porter", 'desc': "%s porter %s%i on %s" % (protocol, stream, idx + 1, addr), - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'properties': { 'port': port, 'socket-path': porter.socket_path, @@ -348,7 +370,7 @@ class Planet: self.flow['repeaters'][comp_name] = { 'type': 'repeater', 'desc': "repeater for %s (%s %s-%s)" % (stream, mux, format, profile), - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, } return comp_name @@ -406,7 +428,7 @@ class Planet: self.flow['streamers'][comp_name] = { 'type': "%s-stream" % stream['type'], 'desc': "%s streamer for %s %s-%s (part %i of %i in %s cluster)" % (stream['type'], mux, format, profile, idx + 1, cnt, stream_name), - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'description': self._desc.globals['description'], @@ -460,7 +482,7 @@ class Planet: self.flow['recorders'][comp_name] = { 'type': "recorder", 'desc': comp_desc, - 'worker': self.__get_worker(comp_name), + 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': {} } -- cgit v1.2.3