From adb7cb7a7229200cc8fd18ad8498a805c93c0f65 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 30 Jan 2018 23:32:05 +0100 Subject: added worker templates (not done yet) --- src/flufigut.py | 163 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 88 insertions(+), 75 deletions(-) (limited to 'src/flufigut.py') diff --git a/src/flufigut.py b/src/flufigut.py index f9f63f2..1acdf44 100755 --- a/src/flufigut.py +++ b/src/flufigut.py @@ -53,16 +53,27 @@ class Description: self.streams = {} self.records = {} - def _sanity_check(self): - # TODO: add more sanity checks - # - check if stream['localdup']['port'] and stream['port'] are different - components = {} - for _, worker in self.globals['workers'].items(): - for c in worker: - if c in components: - raise Exception("ERROR: component '%s' is assigned to more than one worker!" % c) - else: - components[c] = 1 + self.worker_patterns = {} + + def _compute_worker_patterns(self): + for worker_name, worker in self.globals['workers'].items(): + for pattern in worker: + is_prefix = False + orig_pattern = pattern + if '*' in pattern: + if '*' in pattern[:-1]: + raise Exception("invalid worker pattern '%s' (component globs may only have a single '*' at the end)" % pattern) + is_prefix = True + pattern = pattern[:-1] + if not pattern: + raise Exception("worker '%s' has wildcard pattern '%s' which is not allowed" % (worker_name, orig_pattern)) + if pattern in self.worker_patterns: + raise Exception("the worker pattern '%s' is used by at least two workers (%s and %s)" % + (orig_pattern, self.worker_patterns[pattern]['worker'], worker_name)) + self.worker_patterns[pattern] = { + 'worker': worker_name, + 'is_prefix': is_prefix + } def parse(self, config_file): cf = open(config_file, 'r') @@ -76,7 +87,7 @@ class Description: if 'records' in config: self.records = config['records'] - return self._sanity_check() + return self._compute_worker_patterns() # a flumtion planet configuration ############################# @@ -95,25 +106,26 @@ class Porter: class Planet: - def __init__(self): + def __init__(self, desc): + self._desc = desc self.atmosphere = {} self.flow = {} # # inputs - def __set_input_properties(self, comp_name, props, globals): + def __set_input_properties(self, comp_name, props): for prop in props: if prop == 'resolution': - self.flow['inputs'][comp_name]['properties']['width'] = globals['resolutions'][props[prop]]['width'] - self.flow['inputs'][comp_name]['properties']['height'] = globals['resolutions'][props[prop]]['height'] - self.flow['inputs'][comp_name]['properties']['framerate'] = globals['resolutions'][props[prop]]['rate'] + self.flow['inputs'][comp_name]['properties']['width'] = self._desc.globals['resolutions'][props[prop]]['width'] + self.flow['inputs'][comp_name]['properties']['height'] = self._desc.globals['resolutions'][props[prop]]['height'] + self.flow['inputs'][comp_name]['properties']['framerate'] = self._desc.globals['resolutions'][props[prop]]['rate'] else: self.flow['inputs'][comp_name]['properties'][prop] = props[prop] - def _generate_inputs(self, inputs, globals): + def _generate_inputs(self): self.flow['inputs'] = {} master_cnt = 0 - for source, input in inputs.items(): + for source, input in self._desc.inputs.items(): comp_name = 'input-%s' % source comp_desc = 'capture raw data from %s' % (source) self.flow['inputs'][comp_name] = { @@ -125,7 +137,7 @@ class Planet: } if input['master']: master_cnt += 1 - self.__set_input_properties(comp_name, input['properties'], globals) + self.__set_input_properties(comp_name, input['properties']) if master_cnt == 0: raise Exception("You have not configured any master clock device!") @@ -151,14 +163,14 @@ class Planet: } return comp_name - def __generate_audio_encoder(self, mux, format, profile, inputs, globals): + def __generate_audio_encoder(self, mux, format, profile): source = mux['audio'].split(':')[0] - encoder = globals['formats'][format]['audio'] - bitrate = globals['profiles'][profile]['audio'] - input_samplerate = inputs[source]['properties']['samplerate'] + encoder = self._desc.globals['formats'][format]['audio'] + bitrate = self._desc.globals['profiles'][profile]['audio'] + input_samplerate = self._desc.inputs[source]['properties']['samplerate'] samplerate = input_samplerate - if 'samplerate' in globals['formats'][format]: - samplerate = globals['formats'][format]['samplerate'] + if 'samplerate' in self._desc.globals['formats'][format]: + samplerate = self._desc.globals['formats'][format]['samplerate'] feeder = self.__generate_audio_feeder('input-%s' % (mux['audio']), source, input_samplerate, samplerate) comp_name = 'encode-%s-%s-%i-%i' % (source, encoder, bitrate, samplerate) @@ -181,45 +193,45 @@ class Planet: } return comp_name - def __generate_video_feeder(self, input, source, input_resolution, target_resolution, globals): - if globals['resolutions'][target_resolution]['rate'] != globals['resolutions'][input_resolution]['rate']: + def __generate_video_feeder(self, input, source, input_resolution, target_resolution): + if self._desc.globals['resolutions'][target_resolution]['rate'] != self._desc.globals['resolutions'][input_resolution]['rate']: raise Exception("ERROR: video rate conversion is not supported!!!") if target_resolution == input_resolution: return input comp_name = 'resize-%s-%s' % (source, target_resolution) - comp_desc = 'resize video from %s to %sx%s' % (source, globals['resolutions'][target_resolution]['width'], - globals['resolutions'][target_resolution]['height']), + comp_desc = 'resize video from %s to %sx%s' % (source, self._desc.globals['resolutions'][target_resolution]['width'], + self._desc.globals['resolutions'][target_resolution]['height']), self.flow['inputs'][comp_name] = { 'type': 'video-resize', 'desc': comp_desc, 'worker': None, 'feeder': input, 'properties': { - 'width': globals['resolutions'][target_resolution]['width'], - 'height': globals['resolutions'][target_resolution]['height'], + 'width': self._desc.globals['resolutions'][target_resolution]['width'], + 'height': self._desc.globals['resolutions'][target_resolution]['height'], }, } return comp_name - def __generate_video_encoder(self, mux, format, profile, inputs, globals): - if 'video' not in globals['profiles'][profile]: + def __generate_video_encoder(self, mux, format, profile): + if 'video' not in self._desc.globals['profiles'][profile]: return None source = mux['video'].split(':')[0] - encoder = globals['formats'][format]['video'] - input_resolution = inputs[source]['properties']['resolution'] - resolution = globals['profiles'][profile]['video'] - bitrate = globals['bitrates'][encoder][resolution] + encoder = self._desc.globals['formats'][format]['video'] + input_resolution = self._desc.inputs[source]['properties']['resolution'] + resolution = self._desc.globals['profiles'][profile]['video'] + bitrate = self._desc.globals['bitrates'][encoder][resolution] if input_resolution == "": raise Exception("format definition needs video but no video input given") - feeder = self.__generate_video_feeder('input-%s' % (mux['video']), source, input_resolution, resolution, globals) + feeder = self.__generate_video_feeder('input-%s' % (mux['video']), source, input_resolution, resolution) comp_name = 'encode-%s-%s-%s' % (source, encoder, resolution) - comp_desc = '%s encoder for %sx%s, from %s' % (encoder, globals['resolutions'][resolution]['width'], - globals['resolutions'][resolution]['height'], source), + comp_desc = '%s encoder for %sx%s, from %s' % (encoder, self._desc.globals['resolutions'][resolution]['width'], + self._desc.globals['resolutions'][resolution]['height'], source), if comp_name in self.flow['encoders-video']: return comp_name @@ -235,8 +247,8 @@ class Planet: } return comp_name - def __generate_muxer(self, mux_name, format, profile, globals, feeder_audio, feeder_video): - muxer = globals['formats'][format]['muxer'] + def __generate_muxer(self, mux_name, format, profile, feeder_audio, feeder_video): + muxer = self._desc.globals['formats'][format]['muxer'] comp_name = 'mux-%s-%s-%s' % (mux_name, format, profile) comp_desc = '%s muxer for %s, profile %s' % (format, mux_name, profile), @@ -249,22 +261,22 @@ class Planet: 'properties': {}, } - def _generate_muxes(self, muxes, inputs, globals): + def _generate_muxes(self): self.flow['encoders-audio'] = {} self.flow['encoders-video'] = {} self.flow['muxers'] = {} - for mux_name, mux in muxes.items(): + for mux_name, mux in self._desc.muxes.items(): for format in mux['formats']: for profile in mux['formats'][format]: audio_encoder = None video_encoder = None if 'audio' in mux: - audio_encoder = self.__generate_audio_encoder(mux, format, profile, inputs, globals) + audio_encoder = self.__generate_audio_encoder(mux, format, profile) if 'video' in mux: - video_encoder = self.__generate_video_encoder(mux, format, profile, inputs, globals) + video_encoder = self.__generate_video_encoder(mux, format, profile) - self.__generate_muxer(mux_name, format, profile, globals, audio_encoder, video_encoder) + self.__generate_muxer(mux_name, format, profile, audio_encoder, video_encoder) # # streams @@ -320,32 +332,33 @@ class Planet: self.flow['streamers'][comp_name]['properties']['burst-on-connect'] = 'true' self.flow['streamers'][comp_name]['properties']['burst-time'] = stream[prop] - def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile, globals): - if 'stats' in globals: + def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile): + if 'stats' in self._desc.globals: self.flow['streamers'][comp_name]['plugs'] = {} - if 'rrd' in globals['stats']: + if 'rrd' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['rrd'] = {} - self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % (globals['stats']['rrd']['directory'], comp_name) - self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (globals['stats']['rrd']['directory'], comp_name) + self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % ( + self._desc.globals['stats']['rrd']['directory'], comp_name) + self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (self._desc.globals['stats']['rrd']['directory'], comp_name) - if 'sfive' in globals['stats']: + if 'sfive' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['sfive'] = {} - self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = globals['stats']['sfive']['socket'] - self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = globals['stats']['sfive']['duration'] - self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = globals['stats']['sfive']['tags'] + self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = self._desc.globals['stats']['sfive']['socket'] + self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = self._desc.globals['stats']['sfive']['duration'] + self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = self._desc.globals['stats']['sfive']['tags'] self.flow['streamers'][comp_name]['plugs']['sfive']['hostname'] = '%s%i' % (stream_name, idx + 1) self.flow['streamers'][comp_name]['plugs']['sfive']['content-id'] = mux self.flow['streamers'][comp_name]['plugs']['sfive']['format'] = format self.flow['streamers'][comp_name]['plugs']['sfive']['quality'] = profile - def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter, globals): + def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter): muxer_feed = 'muxer-%s-%s-%s' % (mux, format, profile) feeder = muxer_feed if 'repeater' in stream: feeder = self.__generate_stream_mux_repeater(stream_name, mux, format, profile, muxer_feed) comp_name = '%s-%s%i-%s-%s-%s' % (stream['type'], stream_name, idx + 1, mux, format, profile) - mount_point = '/%s-%s-%s.%s' % (mux, format, profile, globals['formats'][format]['muxer']) + mount_point = '/%s-%s-%s.%s' % (mux, format, profile, self._desc.globals['formats'][format]['muxer']) hostname = stream['hostname'] hostname_next = None if cnt > 1: @@ -361,7 +374,7 @@ class Planet: 'worker': None, 'feeder': feeder, 'properties': { - 'description': globals['description'], + 'description': self._desc.globals['description'], 'type': 'slave', 'porter-socket-path': porter.socket_path, 'porter-username': porter.username, @@ -372,35 +385,35 @@ class Planet: } } self.__set_stream_mux_instance_props(comp_name, stream, porter.port, mount_point, hostname_next) - self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile, globals) + self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile) - def __generate_stream_instance(self, stream_name, stream, idx, cnt, muxes, globals): + def __generate_stream_instance(self, stream_name, stream, idx, cnt): port = stream['port'] interface = None if 'interface' in stream: interface = stream['interface'] porter = self.__create_porter(stream_name, idx, port, interface) for mux_name in stream['muxes']: - for format_name, format in muxes[mux_name]['formats'].items(): + for format_name, format in self._desc.muxes[mux_name]['formats'].items(): for profile_name in format: self.__generate_stream_mux_instance(stream_name, stream, mux_name, format_name, profile_name, - idx, cnt, porter, globals) + idx, cnt, porter) - def _generate_streams(self, streams, muxes, globals): + def _generate_streams(self): self.flow['repeaters'] = {} self.flow['streamers'] = {} - for stream_name, stream in streams.items(): + for stream_name, stream in self._desc.streams.items(): cnt = stream['count'] for idx in range(cnt): - self.__generate_stream_instance(stream_name, stream, idx, cnt, muxes, globals) + self.__generate_stream_instance(stream_name, stream, idx, cnt) # # records - def _generate_records(self, records, globals): + def _generate_records(self): self.flow['recorders'] = {} - for _, record in records.items(): + for _, record in self._desc.records.items(): for mux in record['muxes']: format = record['muxes'][mux]["format"] profile = record['muxes'][mux]["profile"] @@ -422,11 +435,11 @@ class Planet: # # all - def generate(self, desc): - self._generate_inputs(desc.inputs, desc.globals) - self._generate_muxes(desc.muxes, desc.inputs, desc.globals) - self._generate_streams(desc.streams, desc.muxes, desc.globals) - self._generate_records(desc.records, desc.globals) + def generate(self): + self._generate_inputs() + self._generate_muxes() + self._generate_streams() + self._generate_records() # Main ######################################################## @@ -447,8 +460,8 @@ if __name__ == '__main__': d = Description() d.parse(config_file) - p = Planet() - p.generate(d) + p = Planet(d) + p.generate() print("****************************************************") print("** atmosphere **") -- cgit v1.2.3