#!/usr/bin/python3 # # flufigut # # flufigut, the flumotion configuration utility, is a simple tool # that generates flumotion configuration files using pyhton jinja2 # template engine. flufigut generates planet.xml and worker.xml # files from configuration templates and an easy to understand # representation of the flow structure written in json or yaml. # # # Copyright (C) 2018 Christian Pointner # # This file is part of flufigut. # # flufigut is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # any later version. # # flufigut is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with flufigut. If not, see . # import string import random import sys import yaml # from jinja2 import Environment, FileSystemLoader # helper functions ############################################ # def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) # a flufigut stream description ############################### # class WorkerPattern: def __init__(self, pattern, is_prefix, worker_name): self.pattern = pattern self.is_prefix = is_prefix self.worker_name = worker_name class Description: def __init__(self): self.globals = {} self.inputs = {} self.muxes = {} self.streams = {} self.records = {} self.worker_patterns = {} def _compute_worker_patterns(self): for worker_name, worker in self.globals['workers'].items(): for pattern in worker: is_prefix = False search_string = pattern if '*' in pattern: if '*' in pattern[:-1]: raise Exception("invalid worker pattern '%s' (component globs may only have a single '*' at the end)" % pattern) is_prefix = True search_string = pattern[:-1] if not pattern: raise Exception("worker '%s' has wildcard pattern '%s' which is not allowed" % (worker_name, pattern)) if search_string in self.worker_patterns: raise Exception("worker pattern '%s' is used by at least two workers (%s and %s)" % (pattern, self.worker_patterns[search_string].worker_name, worker_name)) self.worker_patterns[search_string] = WorkerPattern(pattern, is_prefix, worker_name) def parse(self, config_file): cf = open(config_file, 'r') config = yaml.load(cf) cf.close() self.globals = config['globals'] self.inputs = config['inputs'] self.muxes = config['muxes'] self.streams = config['streams'] if 'records' in config: self.records = config['records'] return self._compute_worker_patterns() # a flumtion planet configuration ############################# # class Porter: def __init__(self, name, interface, port): self.name = name self.interface = interface self.port = port self.socket_path = rand_string() self.username = rand_string(size=12) self.password = rand_string(size=20) class Planet: def __init__(self, desc): self.atmosphere = {} self.flow = {} self._desc = desc self._unassigned_components = [] def __get_worker(self, comp_name): matched_prefix_len = 0 match = None for search_string, pattern in self._desc.worker_patterns.items(): if pattern.is_prefix: if comp_name.startswith(search_string): if len(search_string) == matched_prefix_len: raise Exception("component '%s': both patterns '%s' of worker '%s' and '%s' of worker '%s' match the component name and have the same length" % (comp_name, match.pattern, match.worker_name, pattern.pattern, pattern.worker_name)) elif len(search_string) > matched_prefix_len: match_prefix_len = len(search_string) match = pattern else: if comp_name == search_string: match = pattern break if not match: self._unassigned_components.append(comp_name) return None return match.worker_name # # inputs def __set_input_properties(self, comp_name, props): for prop in props: if prop == 'resolution': self.flow['inputs'][comp_name]['properties']['width'] = self._desc.globals['resolutions'][props[prop]]['width'] self.flow['inputs'][comp_name]['properties']['height'] = self._desc.globals['resolutions'][props[prop]]['height'] self.flow['inputs'][comp_name]['properties']['framerate'] = self._desc.globals['resolutions'][props[prop]]['rate'] else: self.flow['inputs'][comp_name]['properties'][prop] = props[prop] def _generate_inputs(self): self.flow['inputs'] = {} master_cnt = 0 for source, input in self._desc.inputs.items(): comp_name = 'input-%s' % source comp_desc = 'capture raw data from %s' % (source) self.flow['inputs'][comp_name] = { 'type': input['type'], 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'master': input['master'], 'properties': {}, } if input['master']: master_cnt += 1 self.__set_input_properties(comp_name, input['properties']) if master_cnt == 0: raise Exception("You have not configured any master clock device!") elif master_cnt > 1: raise Exception("You have configured multiple master clock devices!") # # muxes def __generate_audio_feeder(self, input, source, input_samplerate, target_samplerate): if target_samplerate == input_samplerate: return input comp_name = 'resample-%s-%s' % (source, target_samplerate) comp_desc = 'resample audio from % s to % s Hz' % (source, target_samplerate) self.flow['inputs'][comp_name] = { 'type': 'audio-resample', 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'feeder': input, 'properties': { 'samplerate': target_samplerate, }, } return comp_name def __generate_audio_encoder(self, mux, format, profile): source = mux['audio'].split(':')[0] encoder = self._desc.globals['formats'][format]['audio'] bitrate = self._desc.globals['profiles'][profile]['audio'] input_samplerate = self._desc.inputs[source]['properties']['samplerate'] samplerate = input_samplerate if 'samplerate' in self._desc.globals['formats'][format]: samplerate = self._desc.globals['formats'][format]['samplerate'] feeder = self.__generate_audio_feeder('input-%s' % (mux['audio']), source, input_samplerate, samplerate) comp_name = 'encode-%s-%s-%i-%i' % (source, encoder, bitrate, samplerate) comp_desc = '%s encoder for %i kbit/s @ %i Hz, from %s' % (encoder, bitrate, samplerate, source), if bitrate == 0: comp_name = 'encode-%s-%s-%i' % (source, encoder, samplerate) comp_desc = '%s encoder @ %i Hz, from %s' % (encoder, samplerate, source), if comp_name in self.flow['encoders-audio']: return comp_name self.flow['encoders-audio'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'feeder': feeder, 'properties': { 'bitrate': bitrate, }, } return comp_name def __generate_video_feeder(self, input, source, input_resolution, target_resolution): if self._desc.globals['resolutions'][target_resolution]['rate'] != self._desc.globals['resolutions'][input_resolution]['rate']: raise Exception("ERROR: video rate conversion is not supported!!!") if target_resolution == input_resolution: return input comp_name = 'resize-%s-%s' % (source, target_resolution) comp_desc = 'resize video from %s to %sx%s' % (source, self._desc.globals['resolutions'][target_resolution]['width'], self._desc.globals['resolutions'][target_resolution]['height']), self.flow['inputs'][comp_name] = { 'type': 'video-resize', 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'feeder': input, 'properties': { 'width': self._desc.globals['resolutions'][target_resolution]['width'], 'height': self._desc.globals['resolutions'][target_resolution]['height'], }, } return comp_name def __generate_video_encoder(self, mux, format, profile): if 'video' not in self._desc.globals['profiles'][profile]: return None source = mux['video'].split(':')[0] encoder = self._desc.globals['formats'][format]['video'] input_resolution = self._desc.inputs[source]['properties']['resolution'] resolution = self._desc.globals['profiles'][profile]['video'] bitrate = self._desc.globals['bitrates'][encoder][resolution] if input_resolution == "": raise Exception("format definition needs video but no video input given") feeder = self.__generate_video_feeder('input-%s' % (mux['video']), source, input_resolution, resolution) comp_name = 'encode-%s-%s-%s' % (source, encoder, resolution) comp_desc = '%s encoder for %sx%s, from %s' % (encoder, self._desc.globals['resolutions'][resolution]['width'], self._desc.globals['resolutions'][resolution]['height'], source), if comp_name in self.flow['encoders-video']: return comp_name self.flow['encoders-video'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'feeder': feeder, 'properties': { 'bitrate': bitrate, }, } return comp_name def __generate_muxer(self, mux_name, format, profile, feeder_audio, feeder_video): muxer = self._desc.globals['formats'][format]['muxer'] comp_name = 'mux-%s-%s-%s' % (mux_name, format, profile) comp_desc = '%s muxer for %s, profile %s' % (format, mux_name, profile), self.flow['muxers'][comp_name] = { 'type': '%s-mux' % muxer, 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'feeder_audio': feeder_audio, 'feeder_video': feeder_video, 'properties': {}, } def _generate_muxes(self): self.flow['encoders-audio'] = {} self.flow['encoders-video'] = {} self.flow['muxers'] = {} for mux_name, mux in self._desc.muxes.items(): for format in mux['formats']: for profile in mux['formats'][format]: audio_encoder = None video_encoder = None if 'audio' in mux: audio_encoder = self.__generate_audio_encoder(mux, format, profile) if 'video' in mux: video_encoder = self.__generate_video_encoder(mux, format, profile) self.__generate_muxer(mux_name, format, profile, audio_encoder, video_encoder) # # streams def __create_porter(self, protocol, stream, idx, port, interface=None): if protocol != "http": raise Exception("unknown porter protocol '%s', currently only http porters are supported" % protocol) comp_name = '%s-%s%i-port-%i' % (protocol, stream, idx + 1, port) addr = '*:%i' % (port) if interface: comp_name = '%s-%s%i-port-%s-%i' % (protocol, stream, idx + 1, interface, port) addr = '%s:%i' % (interface, port) porter = Porter(comp_name, interface, port) self.atmosphere[porter.name] = { 'type': "porter", 'desc': "%s porter %s%i on %s" % (protocol, stream, idx + 1, addr), 'worker': self.__get_worker(comp_name), 'properties': { 'port': port, 'socket-path': porter.socket_path, 'username': porter.username, 'password': porter.password, }, } if interface: self.atmosphere[porter.name]['interface'] = interface return porter def __generate_stream_mux_repeater(self, stream, mux, format, profile, feeder): comp_name = 'repeat-%s-%s-%s-%s' % (stream, mux, format, profile) if comp_name in self.flow['repeaters']: return comp_name self.flow['repeaters'][comp_name] = { 'type': 'repeater', 'desc': "repeater for %s (%s %s-%s)" % (stream, mux, format, profile), 'worker': self.__get_worker(comp_name), 'feeder': feeder, } return comp_name def __set_stream_mux_instance_props(self, comp_name, stream, port, mount_point, hostname_next): for prop in stream: if prop == 'max-con': self.flow['streamers'][comp_name]['properties']['client-limit'] = stream[prop] if hostname_next: self.flow['streamers'][comp_name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) if prop == 'max-bw': self.flow['streamers'][comp_name]['properties']['bandwidth-limit'] = stream[prop] if hostname_next: self.flow['streamers'][comp_name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) if prop == 'burst-on-connect': self.flow['streamers'][comp_name]['properties']['burst-on-connect'] = 'true' self.flow['streamers'][comp_name]['properties']['burst-time'] = stream[prop] def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile): if 'stats' in self._desc.globals: self.flow['streamers'][comp_name]['plugs'] = {} if 'rrd' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['rrd'] = {} self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % ( self._desc.globals['stats']['rrd']['directory'], comp_name) self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (self._desc.globals['stats']['rrd']['directory'], comp_name) if 'sfive' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['sfive'] = {} self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = self._desc.globals['stats']['sfive']['socket'] self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = self._desc.globals['stats']['sfive']['duration'] self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = self._desc.globals['stats']['sfive']['tags'] self.flow['streamers'][comp_name]['plugs']['sfive']['hostname'] = '%s%i' % (stream_name, idx + 1) self.flow['streamers'][comp_name]['plugs']['sfive']['content-id'] = mux self.flow['streamers'][comp_name]['plugs']['sfive']['format'] = format self.flow['streamers'][comp_name]['plugs']['sfive']['quality'] = profile def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter): muxer_feed = 'muxer-%s-%s-%s' % (mux, format, profile) feeder = muxer_feed if 'repeater' in stream: feeder = self.__generate_stream_mux_repeater(stream_name, mux, format, profile, muxer_feed) comp_name = '%s-%s%i-stream-%s-%s-%s' % (stream['type'], stream_name, idx + 1, mux, format, profile) mount_point = '/%s-%s-%s.%s' % (mux, format, profile, self._desc.globals['formats'][format]['muxer']) hostname = stream['hostname'] hostname_next = None if cnt > 1: hostname = stream['hostname'] % (idx + 1) if idx != 0: hostname_next = stream['hostname'] % (idx) else: hostname_next = stream['hostname'] % (cnt) self.flow['streamers'][comp_name] = { 'type': "%s-stream" % stream['type'], 'desc': "%s streamer for %s %s-%s (part %i of %i in %s cluster)" % (stream['type'], mux, format, profile, idx + 1, cnt, stream_name), 'worker': self.__get_worker(comp_name), 'feeder': feeder, 'properties': { 'description': self._desc.globals['description'], 'type': 'slave', 'porter-socket-path': porter.socket_path, 'porter-username': porter.username, 'porter-password': porter.password, 'mount-point': mount_point, 'hostname': hostname, 'port': porter.port, } } self.__set_stream_mux_instance_props(comp_name, stream, porter.port, mount_point, hostname_next) self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile) def __generate_stream_instance(self, stream_name, stream, idx, cnt): port = stream['port'] interface = None if 'interface' in stream: interface = stream['interface'] porter = self.__create_porter(stream['type'], stream_name, idx, port, interface) for mux_name in stream['muxes']: for format_name, format in self._desc.muxes[mux_name]['formats'].items(): for profile_name in format: self.__generate_stream_mux_instance(stream_name, stream, mux_name, format_name, profile_name, idx, cnt, porter) def _generate_streams(self): self.flow['repeaters'] = {} self.flow['streamers'] = {} for stream_name, stream in self._desc.streams.items(): cnt = stream['count'] for idx in range(cnt): self.__generate_stream_instance(stream_name, stream, idx, cnt) # # records def _generate_records(self): self.flow['recorders'] = {} for _, record in self._desc.records.items(): for mux in record['muxes']: format = record['muxes'][mux]["format"] profile = record['muxes'][mux]["profile"] feeder = 'mux-%s-%s-%s' % (mux, format, profile) comp_name = 'record-%s-%s-%s' % (mux, format, profile) comp_desc = 'recorder for %s %s-%s' % (mux, format, profile), self.flow['recorders'][comp_name] = { 'type': "recorder", 'desc': comp_desc, 'worker': self.__get_worker(comp_name), 'feeder': feeder, 'properties': {} } for prop in record: if prop != 'muxes': self.flow['recorders'][comp_name]['properties'][prop] = record[prop] # # all def generate(self): self._generate_inputs() self._generate_muxes() self._generate_streams() self._generate_records() if len(p._unassigned_components) > 0: print("unassinged components: ") for c in p._unassigned_components: print(" " + c) return 1 return 0 # Main ######################################################## # if __name__ == '__main__': import traceback import pprint __pp = pprint.PrettyPrinter(indent=4, width=160) if len(sys.argv) <= 1: print("ERROR: No configuration file given") sys.exit(-1) config_file = sys.argv[1] ret = 0 try: d = Description() d.parse(config_file) p = Planet(d) if p.generate() != 0: sys.exit(1) print("****************************************************") print("** atmosphere **") print("**") __pp.pprint(p.atmosphere) print("**") print("**************************") print("** planet **") print("**") __pp.pprint(p.flow) print("**") print("****************************************************") except Exception as e: print("ERROR: while running app: %s" % e) print(traceback.format_exc()) sys.exit(1) sys.exit(ret)