#!/usr/bin/python3 # # flufigut # # flufigut, the flumotion configuration utility, is a simple tool # that generates flumotion configuration files using pyhton jinja2 # template engine. flufigut generates planet.xml and worker.xml # files from configuration templates and an easy to understand # representation of the flow structure written in json or yaml. # # # Copyright (C) 2018 Christian Pointner # # This file is part of flufigut. # # flufigut is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # any later version. # # flufigut is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with flufigut. If not, see . # import string import random import os import sys import yaml import jinja2 import crypt import kubernetes import time import base64 # helper functions ############################################ # def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) # a flufigut stream description ############################### # class Worker: def __init__(self, name, subname=None): if ':' in name: raise Exception("worker name '%s' is invalid (must not contain ':')" % name) if subname and ':' in subname: raise Exception("worker sub-name '%s' of worker '%s' is invalid (must not contain ':')" % (subname, name)) self.name = name self.subname = subname def __str__(self): if self.subname: return "%s:%s" % (self.name, self.subname) return self.name def __repr__(self): return "Worker(%s, %s)" % (self.name, self.subname) class WorkerPattern: def __init__(self, pattern, is_prefix, worker): self.pattern = pattern self.is_prefix = is_prefix self.worker = worker class Description: def __init__(self): self.globals = {} self.inputs = {} self.muxes = {} self.streams = {} self.records = {} self.worker_patterns = {} def __compute_worker_pattern(self, worker, pattern): is_prefix = False search_string = pattern if '*' in pattern: if '*' in pattern[:-1]: raise Exception("invalid worker pattern '%s' in worker '%s' component globs may only have a single '*' at the end)" % (pattern, worker)) is_prefix = True search_string = pattern[:-1] if not search_string: raise Exception("worker '%s' has wildcard or empty pattern which is not allowed" % (worker)) if search_string in self.worker_patterns: raise Exception("worker pattern '%s' is used by at least two workers (%s and %s)" % (pattern, worker, self.worker_patterns[search_string].worker)) self.worker_patterns[search_string] = WorkerPattern(pattern, is_prefix, worker) def _compute_worker_patterns(self): for name, entry in self.globals['workers'].items(): if isinstance(entry, str): self.__compute_worker_pattern(Worker(name), entry) elif isinstance(entry, list): for pattern in entry: self.__compute_worker_pattern(Worker(name), pattern) elif isinstance(entry, dict): for subname, subentry in entry.items(): worker = Worker(name, subname) if isinstance(subentry, str): self.__compute_worker_pattern(worker, subentry) elif isinstance(subentry, list): for pattern in subentry: self.__compute_worker_pattern(worker, pattern) else: raise Exception("globals.workers entry '%s' is invalid" % worker) else: raise Exception("globals.workers entry '%s' is invalid" % name) def parse(self, config_file): cf = open(config_file, 'r') config = yaml.load(cf) cf.close() self.globals = config['globals'] self.inputs = config['inputs'] self.muxes = config['muxes'] self.streams = config['streams'] if 'records' in config: self.records = config['records'] return self._compute_worker_patterns() # a flumtion planet configuration ############################# # class Porter: def __init__(self, name, interface, port): self.name = name self.interface = interface self.port = port self.socket_path = rand_string() self.username = rand_string(size=12) self.password = rand_string(size=20) class Planet: def __init__(self, desc): self.atmosphere = {} self.flow = {} self.workers = {} self._desc = desc self._unassigned_components = [] # # utils def __add_worker(self, worker): if worker.name not in self.workers: self.workers[worker.name] = { 'name': worker.name, 'subs': {}, 'password': rand_string(size=20), 'flags': {} } if not worker.subname: return self.workers[worker.name]['subs'][worker.subname] = { 'fullname': str(worker), 'flags': {} } return if not worker.subname: return if worker.subname not in self.workers[worker.name]['subs']: self.workers[worker.name]['subs'][worker.subname] = { 'fullname': str(worker), 'flags': {} } def __add_worker_flag(self, worker, flag_name, flag): if flag_name not in self.workers[worker.name]['flags']: self.workers[worker.name]['flags'][flag_name] = [flag] else: if flag not in self.workers[worker.name]['flags'][flag_name]: self.workers[worker.name]['flags'][flag_name].append(flag) if not worker.subname: return if flag_name not in self.workers[worker.name]['subs'][worker.subname]['flags']: self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name] = [flag] else: if flag not in self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name]: self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name].append(flag) def __get_worker(self, comp_name): match_prefix_len = 0 match = None for search_string, pattern in self._desc.worker_patterns.items(): if pattern.is_prefix: if comp_name.startswith(search_string): if len(search_string) == match_prefix_len: raise Exception("component '%s': both patterns '%s' of worker '%s' and '%s' of worker '%s' match the component name and have the same length" % (comp_name, match.pattern, match.worker, pattern.pattern, pattern.worker)) elif len(search_string) > match_prefix_len: match_prefix_len = len(search_string) match = pattern else: if comp_name == search_string: match = pattern break if not match: self._unassigned_components.append(comp_name) return None self.__add_worker(match.worker) return match.worker # # inputs def __set_input_properties(self, comp_name, props): for prop in props: if prop == 'resolution': self.flow['inputs'][comp_name]['properties']['width'] = self._desc.globals['resolutions'][props[prop]]['width'] self.flow['inputs'][comp_name]['properties']['height'] = self._desc.globals['resolutions'][props[prop]]['height'] self.flow['inputs'][comp_name]['properties']['framerate'] = self._desc.globals['resolutions'][props[prop]]['rate'] else: self.flow['inputs'][comp_name]['properties'][prop] = props[prop] def _generate_inputs(self): self.flow['inputs'] = {} master_cnt = 0 for source, input in self._desc.inputs.items(): comp_name = 'capture-%s' % source comp_desc = 'capture raw data from %s' % (source) worker = self.__get_worker(comp_name) self.flow['inputs'][comp_name] = { 'type': input['type'], 'desc': comp_desc, 'worker': str(worker), 'master': input['master'], 'properties': {}, } if input['master']: master_cnt += 1 self.__add_worker_flag(worker, "input", source) self.__set_input_properties(comp_name, input['properties']) if master_cnt == 0: raise Exception("You have not configured any master clock device!") elif master_cnt > 1: raise Exception("You have configured multiple master clock devices!") # # muxes def __generate_audio_feeder(self, input, source, input_samplerate, target_samplerate): if target_samplerate == input_samplerate: return input comp_name = 'resample-%s-%s' % (source, target_samplerate) comp_desc = 'resample audio from %s to %s Hz' % (source, target_samplerate) self.flow['inputs'][comp_name] = { 'type': 'audio-resample', 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': input, 'properties': { 'samplerate': target_samplerate, }, } return comp_name def __generate_audio_encoder(self, mux, format, profile): source = mux['audio'].split(':')[0] encoder = self._desc.globals['formats'][format]['audio'] bitrate = self._desc.globals['profiles'][profile]['audio'] input_samplerate = self._desc.inputs[source]['properties']['samplerate'] samplerate = input_samplerate if 'samplerate' in self._desc.globals['formats'][format]: samplerate = self._desc.globals['formats'][format]['samplerate'] feeder = self.__generate_audio_feeder('capture-%s' % (mux['audio']), source, input_samplerate, samplerate) comp_name = 'encode-%s-%s-%i-%i' % (source, encoder, bitrate, samplerate) comp_desc = '%s encoder for %i kbit/s @ %i Hz, from %s' % (encoder, bitrate, samplerate, source) if bitrate == 0: comp_name = 'encode-%s-%s-%i' % (source, encoder, samplerate) comp_desc = '%s encoder @ %i Hz, from %s' % (encoder, samplerate, source) if comp_name in self.flow['encoders-audio']: return comp_name self.flow['encoders-audio'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'bitrate': bitrate, }, } return comp_name def __generate_video_feeder(self, input, source, input_resolution, target_resolution): if self._desc.globals['resolutions'][target_resolution]['rate'] != self._desc.globals['resolutions'][input_resolution]['rate']: raise Exception("ERROR: video rate conversion is not supported!!!") if target_resolution == input_resolution: return input comp_name = 'resize-%s-%s' % (source, target_resolution) comp_desc = 'resize video from %s to %sx%s' % (source, self._desc.globals['resolutions'][target_resolution]['width'], self._desc.globals['resolutions'][target_resolution]['height']) self.flow['inputs'][comp_name] = { 'type': 'video-resize', 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': input, 'properties': { 'width': self._desc.globals['resolutions'][target_resolution]['width'], 'height': self._desc.globals['resolutions'][target_resolution]['height'], }, } return comp_name def __generate_video_encoder(self, mux, format, profile): if 'video' not in self._desc.globals['profiles'][profile]: return None source = mux['video'].split(':')[0] encoder = self._desc.globals['formats'][format]['video'] input_resolution = self._desc.inputs[source]['properties']['resolution'] resolution = self._desc.globals['profiles'][profile]['video'] bitrate = self._desc.globals['bitrates'][encoder][resolution] if input_resolution == "": raise Exception("format definition needs video but no video input given") feeder = self.__generate_video_feeder('capture-%s' % (mux['video']), source, input_resolution, resolution) comp_name = 'encode-%s-%s-%s' % (source, encoder, resolution) comp_desc = '%s encoder for %sx%s, from %s' % (encoder, self._desc.globals['resolutions'][resolution]['width'], self._desc.globals['resolutions'][resolution]['height'], source) if comp_name in self.flow['encoders-video']: return comp_name self.flow['encoders-video'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'bitrate': bitrate, }, } return comp_name def __generate_muxer(self, mux_name, format, profile, feeder_audio, feeder_video): muxer = self._desc.globals['formats'][format]['muxer'] comp_name = 'mux-%s-%s-%s' % (mux_name, format, profile) comp_desc = '%s muxer for %s, profile %s' % (format, mux_name, profile) self.flow['muxers'][comp_name] = { 'type': '%s-mux' % muxer, 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder_audio': feeder_audio, 'feeder_video': feeder_video, 'properties': {}, } def _generate_muxes(self): self.flow['encoders-audio'] = {} self.flow['encoders-video'] = {} self.flow['muxers'] = {} for mux_name, mux in self._desc.muxes.items(): for format in mux['formats']: for profile in mux['formats'][format]: audio_encoder = None video_encoder = None if 'audio' in mux: audio_encoder = self.__generate_audio_encoder(mux, format, profile) if 'video' in mux: video_encoder = self.__generate_video_encoder(mux, format, profile) self.__generate_muxer(mux_name, format, profile, audio_encoder, video_encoder) # # streams def __create_porter(self, protocol, stream, idx, port, interface=None): if protocol != "http": raise Exception("unknown porter protocol '%s', currently only http porters are supported" % protocol) comp_name = '%s-%s%i-port-%i' % (protocol, stream, idx + 1, port) addr = '*:%i' % (port) if interface: comp_name = '%s-%s%i-port-%s-%i' % (protocol, stream, idx + 1, interface, port) addr = '%s:%i' % (interface, port) worker = self.__get_worker(comp_name) porter = Porter(comp_name, interface, port) self.atmosphere[porter.name] = { 'type': "porter", 'desc': "%s porter %s%i on %s" % (protocol, stream, idx + 1, addr), 'worker': str(worker), 'properties': { 'port': port, 'socket-path': porter.socket_path, 'username': porter.username, 'password': porter.password, }, } if interface: self.atmosphere[porter.name]['properties']['interface'] = interface self.__add_worker_flag(worker, "port", port) return porter def __generate_stream_mux_repeater(self, stream, mux, format, profile, feeder): comp_name = 'repeat-%s-%s-%s-%s' % (stream, mux, format, profile) if comp_name in self.flow['repeaters']: return comp_name self.flow['repeaters'][comp_name] = { 'type': 'repeater', 'desc': "repeater for %s (%s %s-%s)" % (stream, mux, format, profile), 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, } return comp_name def __set_stream_mux_instance_props(self, comp_name, stream, port, mount_point, hostname_next): for prop in stream: if prop == 'max-con': self.flow['streamers'][comp_name]['properties']['client-limit'] = stream[prop] if hostname_next: self.flow['streamers'][comp_name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) if prop == 'max-bw': self.flow['streamers'][comp_name]['properties']['bandwidth-limit'] = stream[prop] if hostname_next: self.flow['streamers'][comp_name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) if prop == 'burst-on-connect': self.flow['streamers'][comp_name]['properties']['burst-on-connect'] = 'true' self.flow['streamers'][comp_name]['properties']['burst-time'] = stream[prop] def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile): if 'stats' in self._desc.globals: self.flow['streamers'][comp_name]['plugs'] = {} if 'rrd' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['rrd'] = {} self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % ( self._desc.globals['stats']['rrd']['directory'], comp_name) self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (self._desc.globals['stats']['rrd']['directory'], comp_name) if 'sfive' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['sfive'] = {} self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = self._desc.globals['stats']['sfive']['socket'] self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = self._desc.globals['stats']['sfive']['duration'] self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = self._desc.globals['stats']['sfive']['tags'] self.flow['streamers'][comp_name]['plugs']['sfive']['hostname'] = '%s%i' % (stream_name, idx + 1) self.flow['streamers'][comp_name]['plugs']['sfive']['content-id'] = mux self.flow['streamers'][comp_name]['plugs']['sfive']['format'] = format self.flow['streamers'][comp_name]['plugs']['sfive']['quality'] = profile def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter): muxer_feed = 'mux-%s-%s-%s' % (mux, format, profile) feeder = muxer_feed if 'repeater' in stream: feeder = self.__generate_stream_mux_repeater(stream_name, mux, format, profile, muxer_feed) comp_name = '%s-%s%i-stream-%s-%s-%s' % (stream['type'], stream_name, idx + 1, mux, format, profile) mount_point = '/%s-%s-%s.%s' % (mux, format, profile, self._desc.globals['formats'][format]['muxer']) hostname = stream['hostname'] hostname_next = None if cnt > 1: hostname = stream['hostname'] % (idx + 1) if idx != 0: hostname_next = stream['hostname'] % (idx) else: hostname_next = stream['hostname'] % (cnt) self.flow['streamers'][comp_name] = { 'type': "%s-stream" % stream['type'], 'desc': "%s streamer for %s %s-%s (part %i of %i in %s cluster)" % (stream['type'], mux, format, profile, idx + 1, cnt, stream_name), 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'description': self._desc.globals['description'], 'type': 'slave', 'porter-socket-path': porter.socket_path, 'porter-username': porter.username, 'porter-password': porter.password, 'mount-point': mount_point, 'hostname': hostname, 'port': porter.port, } } self.__set_stream_mux_instance_props(comp_name, stream, porter.port, mount_point, hostname_next) self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile) def __generate_stream_instance(self, stream_name, stream, idx, cnt): port = stream['port'] interface = None if 'interface' in stream: interface = stream['interface'] porter = self.__create_porter(stream['type'], stream_name, idx, port, interface) for mux_name in stream['muxes']: for format_name, format in self._desc.muxes[mux_name]['formats'].items(): for profile_name in format: self.__generate_stream_mux_instance(stream_name, stream, mux_name, format_name, profile_name, idx, cnt, porter) def _generate_streams(self): self.flow['repeaters'] = {} self.flow['streamers'] = {} for stream_name, stream in self._desc.streams.items(): cnt = stream['count'] for idx in range(cnt): self.__generate_stream_instance(stream_name, stream, idx, cnt) # # records def _generate_records(self): self.flow['recorders'] = {} for record_name, record in self._desc.records.items(): mux = record['mux'] format = record['format'] profile = record['profile'] feeder = 'mux-%s-%s-%s' % (mux, format, profile) comp_name = 'record-%s-%s-%s' % (mux, format, profile) comp_desc = 'recorder for %s %s-%s' % (mux, format, profile) worker = self.__get_worker(comp_name) self.flow['recorders'][comp_name] = { 'type': "recorder", 'desc': comp_desc, 'worker': str(worker), 'feeder': feeder, 'properties': {}, '_recorder_id': record_name } for prop in record: if prop not in ['mux', 'format', 'profile']: self.flow['recorders'][comp_name]['properties'][prop] = record[prop] self.__add_worker_flag(worker, "record", record_name) # # all def generate(self): self._generate_inputs() self._generate_muxes() self._generate_streams() self._generate_records() return self._unassigned_components def planet_xml(self, template_dir): loader = jinja2.FileSystemLoader(os.path.join(template_dir, self._desc.globals['templates'], 'flumotion')) env = jinja2.Environment(loader=loader, line_statement_prefix='%%') template = env.get_template('planet.xml.j2') planet_xml = template.render(globals=self._desc.globals, atmosphere=self.atmosphere, flow=self.flow) return planet_xml + "\n" def htpasswd(self): salt = rand_string(6) out = "%s:%s\n" % (self._desc.globals['admin']['username'], crypt.crypt(self._desc.globals['admin']['password'], salt)) for _, worker in self.workers.items(): salt = rand_string(6) out += "%s:%s\n" % (worker['name'], crypt.crypt(worker['password'], salt)) return out.encode('utf-8') # kubernetes handling ############################# # class K8sDeployment: def __init__(self, desc, planet, namespace=None): self._desc = desc self._planet = planet self._namespace = namespace if not self._namespace: self._namespace = self._desc.globals['name'] kubernetes.config.load_kube_config() kubernetes.client.user_agent = 'flufigut' def __create_namespace(self, v1): ns = kubernetes.client.V1Namespace() ns.metadata = kubernetes.client.V1ObjectMeta() ns.metadata.name = self._namespace v1.create_namespace(ns) def __delete_namespace(self, v1): try: opts = kubernetes.client.V1DeleteOptions() v1.delete_namespace(self._namespace, opts, grace_period_seconds=0) except kubernetes.client.rest.ApiException as e: if e.status == 404: pass def __generate_object(self, tmpl_env, template_file, worker=None): tmpl = tmpl_env.get_template(template_file + '.j2') object_yaml = tmpl.render(namespace=self._namespace, desc=self._desc, planet=self._planet, worker=worker) return yaml.load(object_yaml) def _deploy_manager(self, template_dir, tmpl_env, v1, appsV1): svc = self.__generate_object(tmpl_env, 'flumotion-manager-svc.yml') v1.create_namespaced_service(self._namespace, svc) cm = self.__generate_object(tmpl_env, 'flumotion-manager-cm.yml') cm['data']['planet.xml'] = self._planet.planet_xml(template_dir) v1.create_namespaced_config_map(self._namespace, cm) secret = self.__generate_object(tmpl_env, 'flumotion-manager-secret.yml') secret['data']['htpasswd'] = base64.b64encode(self._planet.htpasswd()).decode('ascii') v1.create_namespaced_secret(self._namespace, secret) deploy = self.__generate_object(tmpl_env, 'flumotion-manager-deploy.yml') appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_worker(self, template_dir, tmpl_env, v1, appsV1, worker): if not len(worker['subs']): worker['subs']['worker'] = { 'fullname': worker['name'], 'flags': worker['flags'] } secret = self.__generate_object(tmpl_env, 'flumotion-worker-secret.yml', worker) secret['data']['password'] = base64.b64encode(worker['password'].encode('utf-8')).decode('ascii') v1.create_namespaced_secret(self._namespace, secret) if 'port' in worker['flags']: svc = self.__generate_object(tmpl_env, 'flumotion-worker-svc.yml', worker) v1.create_namespaced_service(self._namespace, svc) deploy = self.__generate_object(tmpl_env, 'flumotion-worker-deploy.yml', worker) appsV1.create_namespaced_deployment(self._namespace, deploy) def deploy(self, template_dir): v1 = kubernetes.client.CoreV1Api() appsV1 = kubernetes.client.AppsV1Api() self.__create_namespace(v1) loader = jinja2.FileSystemLoader(os.path.join(template_dir, self._desc.globals['templates'], 'kubernetes')) tmpl_env = jinja2.Environment(loader=loader, line_statement_prefix='%%') self._deploy_manager(template_dir, tmpl_env, v1, appsV1) for _, worker in self._planet.workers.items(): self._deploy_worker(template_dir, tmpl_env, v1, appsV1, worker) def wipe(self): v1 = kubernetes.client.CoreV1Api() self.__delete_namespace(v1) while True: lst = v1.list_namespace() found = False for ns in lst.items: if ns.metadata.name == self._namespace: found = True break if not found: break time.sleep(1) # Main ######################################################## # if __name__ == '__main__': import traceback import pprint __pp = pprint.PrettyPrinter(indent=4, width=160) if len(sys.argv) <= 1: print("ERROR: No configuration file given") sys.exit(-1) config_file = sys.argv[1] ret = 0 try: d = Description() d.parse(config_file) p = Planet(d) unassigned = p.generate() if len(unassigned) > 0: print("unassinged components: ") for c in p._unassigned_components: print("- " + c) sys.exit(1) # print("****************************************************") # print("** atmosphere **") # print("**") # __pp.pprint(p.atmosphere) # print("**") # print("**************************") # print("** planet **") # print("**") # __pp.pprint(p.flow) # print("**") # print("**************************") # print("** _workers_ **") # print("**") # __pp.pprint(p.workers) # print("**") # print("****************************************************") k8s = K8sDeployment(d, p) print("wiping exisiting namespace ...") k8s.wipe() print("deploying new planet ...") k8s.deploy('../templates') print("done.") except Exception as e: print("ERROR: while running app: %s" % e) print(traceback.format_exc()) sys.exit(1) sys.exit(ret)