#!/usr/bin/python3 # # flufigut # # flufigut, the flumotion configuration utility, is a simple tool # that generates flumotion configuration files using pyhton jinja2 # template engine. flufigut generates planet.xml and worker.xml # files from configuration templates and an easy to understand # representation of the flow structure written in json or yaml. # # # Copyright (C) 2018 Christian Pointner # # This file is part of flufigut. # # flufigut is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # any later version. # # flufigut is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with flufigut. If not, see . # import string import random import os import sys import yaml import json import jinja2 import crypt import kubernetes import time import base64 # helper functions ############################################ # def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) # a flufigut stream description ############################### # class Worker: def __init__(self, name, subname=None): if ':' in name: raise Exception("worker name '%s' is invalid (must not contain ':')" % name) if subname and ':' in subname: raise Exception("worker sub-name '%s' of worker '%s' is invalid (must not contain ':')" % (subname, name)) self.name = name self.subname = subname def __str__(self): if self.subname: return "%s:%s" % (self.name, self.subname) return self.name def __repr__(self): return "Worker(%s, %s)" % (self.name, self.subname) class WorkerPattern: def __init__(self, pattern, is_prefix, worker): self.pattern = pattern self.is_prefix = is_prefix self.worker = worker class Description: def __init__(self): self.globals = {} self.inputs = {} self.muxes = {} self.streams = {} self.records = {} self.worker_patterns = {} def __compute_worker_pattern(self, worker, pattern): is_prefix = False search_string = pattern if '*' in pattern: if '*' in pattern[:-1]: raise Exception("invalid worker pattern '%s' in worker '%s' component globs may only have a single '*' at the end)" % (pattern, worker)) is_prefix = True search_string = pattern[:-1] if not search_string: raise Exception("worker '%s' has wildcard or empty pattern which is not allowed" % (worker)) if search_string in self.worker_patterns: raise Exception("worker pattern '%s' is used by at least two workers (%s and %s)" % (pattern, worker, self.worker_patterns[search_string].worker)) self.worker_patterns[search_string] = WorkerPattern(pattern, is_prefix, worker) def _compute_worker_patterns(self): for name, entry in self.globals['workers'].items(): if isinstance(entry, str): self.__compute_worker_pattern(Worker(name), entry) elif isinstance(entry, list): for pattern in entry: self.__compute_worker_pattern(Worker(name), pattern) elif isinstance(entry, dict): for subname, subentry in entry.items(): worker = Worker(name, subname) if isinstance(subentry, str): self.__compute_worker_pattern(worker, subentry) elif isinstance(subentry, list): for pattern in subentry: self.__compute_worker_pattern(worker, pattern) else: raise Exception("globals.workers entry '%s' is invalid" % worker) else: raise Exception("globals.workers entry '%s' is invalid" % name) def parse(self, config_file): cf = open(config_file, 'r') config = yaml.load(cf) cf.close() self.globals = config['globals'] self.inputs = config['inputs'] self.muxes = config['muxes'] self.streams = config['streams'] if 'records' in config: self.records = config['records'] return self._compute_worker_patterns() # a flumtion planet configuration ############################# # class Porter: def __init__(self, name, interface, port): self.name = name self.interface = interface self.port = port self.socket_path = rand_string() self.username = rand_string(size=12) self.password = rand_string(size=20) class Planet: def __init__(self, desc): self.atmosphere = {} self.flow = {} self.workers = {} self._desc = desc self._unassigned_components = [] # # utils def __add_worker(self, worker): if worker.name not in self.workers: self.workers[worker.name] = { 'name': worker.name, 'subs': {}, 'password': rand_string(size=20), 'flags': {} } if not worker.subname: return self.workers[worker.name]['subs'][worker.subname] = { 'fullname': str(worker), 'flags': {} } return if not worker.subname: return if worker.subname not in self.workers[worker.name]['subs']: self.workers[worker.name]['subs'][worker.subname] = { 'fullname': str(worker), 'flags': {} } def __add_worker_flag_exclusive(self, worker, flag_name, flag): if flag_name not in self.workers[worker.name]['flags']: self.workers[worker.name]['flags'][flag_name] = flag else: if flag != self.workers[worker.name]['flags'][flag_name]: raise Exception("can't set exclusive flag '%s' for worker '%s' to '%s', it is already set to '%s'" % ( flag_name, worker.name, flag, self.workers[worker.name]['flags'][flag_name])) if not worker.subname: return if flag_name not in self.workers[worker.name]['subs'][worker.subname]['flags']: self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name] = flag else: if flag != self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name]: raise Exception("can't set exclusive flag '%s' for worker '%s' to '%s', it is already set to '%s'" % ( flag_name, worker.name, flag, self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name])) def __add_worker_flag_multi(self, worker, flag_name, flag): if flag_name not in self.workers[worker.name]['flags']: self.workers[worker.name]['flags'][flag_name] = [flag] else: if flag not in self.workers[worker.name]['flags'][flag_name]: self.workers[worker.name]['flags'][flag_name].append(flag) if not worker.subname: return if flag_name not in self.workers[worker.name]['subs'][worker.subname]['flags']: self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name] = [flag] else: if flag not in self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name]: self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name].append(flag) def __get_worker(self, comp_name): match_prefix_len = 0 match = None for search_string, pattern in self._desc.worker_patterns.items(): if pattern.is_prefix: if comp_name.startswith(search_string): if len(search_string) == match_prefix_len: raise Exception("""component '%s': both patterns '%s' of worker '%s' and '%s' of worker '%s' match the component name and have the same length""" % (comp_name, match.pattern, match.worker, pattern.pattern, pattern.worker)) elif len(search_string) > match_prefix_len: match_prefix_len = len(search_string) match = pattern else: if comp_name == search_string: match = pattern break if not match: self._unassigned_components.append(comp_name) return None self.__add_worker(match.worker) return match.worker # # inputs def __set_input_properties(self, comp_name, props): for prop in props: if prop == 'resolution': self.flow['inputs'][comp_name]['properties']['width'] = self._desc.globals['resolutions'][props[prop]]['width'] self.flow['inputs'][comp_name]['properties']['height'] = self._desc.globals['resolutions'][props[prop]]['height'] self.flow['inputs'][comp_name]['properties']['framerate'] = self._desc.globals['resolutions'][props[prop]]['rate'] else: self.flow['inputs'][comp_name]['properties'][prop] = props[prop] def _generate_inputs(self): self.flow['inputs'] = {} master_cnt = 0 for input_name, input in self._desc.inputs.items(): comp_name = 'capture-%s' % input_name comp_desc = 'capture raw data from %s' % (input_name) worker = self.__get_worker(comp_name) self.flow['inputs'][comp_name] = { 'type': input['type'], 'desc': comp_desc, 'worker': str(worker), 'master': input['master'], 'properties': {}, } if input['master']: master_cnt += 1 self.__add_worker_flag_multi(worker, "input", input_name) self.__set_input_properties(comp_name, input['properties']) if master_cnt == 0: raise Exception("You have not configured any master clock device!") elif master_cnt > 1: raise Exception("You have configured multiple master clock devices!") # # muxes def __generate_audio_feeder(self, input_feeder, input_name, input_samplerate, target_samplerate): if target_samplerate == input_samplerate: return input_feeder comp_name = 'resample-%s-%s' % (input_name, target_samplerate) comp_desc = 'resample audio from %s to %s Hz' % (input_name, target_samplerate) self.flow['inputs'][comp_name] = { 'type': 'audio-resample', 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': input_feeder, 'properties': { 'samplerate': target_samplerate, }, } return comp_name def __generate_audio_encoder(self, mux, format, profile): input_name = mux['audio'].split(':')[0] encoder = self._desc.globals['formats'][format]['audio'] bitrate = self._desc.globals['profiles'][profile]['audio'] input_samplerate = self._desc.inputs[input_name]['properties']['samplerate'] samplerate = input_samplerate if 'samplerate' in self._desc.globals['formats'][format]: samplerate = self._desc.globals['formats'][format]['samplerate'] feeder = self.__generate_audio_feeder('capture-%s' % (mux['audio']), input_name, input_samplerate, samplerate) comp_name = 'encode-%s-%s-%i-%i' % (input_name, encoder, bitrate, samplerate) comp_desc = '%s encoder for %i kbit/s @ %i Hz, from %s' % (encoder, bitrate, samplerate, input_name) if bitrate == 0: comp_name = 'encode-%s-%s-%i' % (input_name, encoder, samplerate) comp_desc = '%s encoder @ %i Hz, from %s' % (encoder, samplerate, input_name) if comp_name in self.flow['encoders-audio']: return comp_name self.flow['encoders-audio'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'bitrate': bitrate, }, } return comp_name def __generate_video_feeder(self, input_feeder, input_name, input_resolution, target_resolution): if self._desc.globals['resolutions'][target_resolution]['rate'] != self._desc.globals['resolutions'][input_resolution]['rate']: raise Exception("ERROR: video rate conversion is not supported!!!") if target_resolution == input_resolution: return input_feeder comp_name = 'resize-%s-%s' % (input_name, target_resolution) comp_desc = 'resize video from %s to %sx%s' % (input_name, self._desc.globals['resolutions'][target_resolution]['width'], self._desc.globals['resolutions'][target_resolution]['height']) self.flow['inputs'][comp_name] = { 'type': 'video-resize', 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': input_feeder, 'properties': { 'width': self._desc.globals['resolutions'][target_resolution]['width'], 'height': self._desc.globals['resolutions'][target_resolution]['height'], }, } return comp_name def __generate_video_encoder(self, mux, format, profile): if 'video' not in self._desc.globals['profiles'][profile]: return None input_name = mux['video'].split(':')[0] encoder = self._desc.globals['formats'][format]['video'] input_resolution = self._desc.inputs[input_name]['properties']['resolution'] resolution = self._desc.globals['profiles'][profile]['video'] bitrate = self._desc.globals['bitrates'][encoder][resolution] if input_resolution == "": raise Exception("format definition needs video but no video input given") feeder = self.__generate_video_feeder('capture-%s' % (mux['video']), input_name, input_resolution, resolution) comp_name = 'encode-%s-%s-%s' % (input_name, encoder, resolution) comp_desc = '%s encoder for %sx%s, from %s' % (encoder, self._desc.globals['resolutions'][resolution]['width'], self._desc.globals['resolutions'][resolution]['height'], input_name) if comp_name in self.flow['encoders-video']: return comp_name self.flow['encoders-video'][comp_name] = { 'type': '%s-encode' % encoder, 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, 'properties': { 'bitrate': bitrate, }, } return comp_name def __generate_muxer(self, mux_name, format, profile, feeder_audio, feeder_video): muxer = self._desc.globals['formats'][format]['muxer'] comp_name = 'mux-%s-%s-%s' % (mux_name, format, profile) comp_desc = '%s muxer for %s, profile %s' % (format, mux_name, profile) self.flow['muxers'][comp_name] = { 'type': '%s-mux' % muxer, 'desc': comp_desc, 'worker': str(self.__get_worker(comp_name)), 'feeder_audio': feeder_audio, 'feeder_video': feeder_video, 'properties': {}, } def _generate_muxes(self): self.flow['encoders-audio'] = {} self.flow['encoders-video'] = {} self.flow['muxers'] = {} for mux_name, mux in self._desc.muxes.items(): for format in mux['formats']: for profile in mux['formats'][format]: audio_encoder = None video_encoder = None if 'audio' in mux: audio_encoder = self.__generate_audio_encoder(mux, format, profile) if 'video' in mux: video_encoder = self.__generate_video_encoder(mux, format, profile) self.__generate_muxer(mux_name, format, profile, audio_encoder, video_encoder) # # streams def __create_porter(self, protocol, stream, idx, port, interface=None): if protocol != "http": raise Exception("unknown porter protocol '%s', currently only http porters are supported" % protocol) comp_name = '%s-%s%i-port-%i' % (protocol, stream, idx + 1, port) addr = '*:%i' % (port) if interface: comp_name = '%s-%s%i-port-%s-%i' % (protocol, stream, idx + 1, interface, port) addr = '%s:%i' % (interface, port) worker = self.__get_worker(comp_name) porter = Porter(comp_name, interface, port) self.atmosphere[porter.name] = { 'type': "porter", 'desc': "%s porter %s%i on %s" % (protocol, stream, idx + 1, addr), 'worker': str(worker), 'properties': { 'port': port, 'socket-path': porter.socket_path, 'username': porter.username, 'password': porter.password, }, } if interface: self.atmosphere[porter.name]['properties']['interface'] = interface self.__add_worker_flag_multi(worker, "port", port) return porter def __generate_stream_mux_repeater(self, stream, mux, format, profile, feeder): comp_name = 'repeat-%s-%s-%s-%s' % (stream, mux, format, profile) if comp_name in self.flow['repeaters']: return comp_name self.flow['repeaters'][comp_name] = { 'type': 'repeater', 'desc': "repeater for %s (%s %s-%s)" % (stream, mux, format, profile), 'worker': str(self.__get_worker(comp_name)), 'feeder': feeder, } return comp_name def __set_stream_mux_instance_props(self, comp_name, stream, port, mount_point, hostname_next): next_url = "http://%s:%i%s" % (hostname_next, port, mount_point) for prop in stream: if prop == 'max-con': self.flow['streamers'][comp_name]['properties']['client-limit'] = stream[prop] if hostname_next: self.flow['streamers'][comp_name]['properties']['redirect-on-overflow'] = next_url if prop == 'max-bw': self.flow['streamers'][comp_name]['properties']['bandwidth-limit'] = stream[prop] if hostname_next: self.flow['streamers'][comp_name]['properties']['redirect-on-overflow'] = next_url if prop == 'burst-on-connect': self.flow['streamers'][comp_name]['properties']['burst-on-connect'] = 'true' self.flow['streamers'][comp_name]['properties']['burst-time'] = stream[prop] def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile): if 'stats' in self._desc.globals: self.flow['streamers'][comp_name]['plugs'] = {} if 'rrd' in self._desc.globals['stats']: self.flow['streamers'][comp_name]['plugs']['rrd'] = {} self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % ( self._desc.globals['stats']['rrd']['directory'], comp_name) self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % ( self._desc.globals['stats']['rrd']['directory'], comp_name) if 'sfive' in self._desc.globals['stats'] and self._desc.globals['stats']['sfive']['type'] == 'flumotion-plug': self.flow['streamers'][comp_name]['plugs']['sfive'] = {} self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = self._desc.globals['stats']['sfive']['socket'] self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = self._desc.globals['stats']['sfive']['duration'] self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = self._desc.globals['stats']['sfive']['tags'] self.flow['streamers'][comp_name]['plugs']['sfive']['hostname'] = '%s%i' % (stream_name, idx + 1) self.flow['streamers'][comp_name]['plugs']['sfive']['content-id'] = mux self.flow['streamers'][comp_name]['plugs']['sfive']['format'] = format self.flow['streamers'][comp_name]['plugs']['sfive']['quality'] = profile def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter): muxer_feed = 'mux-%s-%s-%s' % (mux, format, profile) feeder = muxer_feed if 'repeater' in stream and stream['repeater']: feeder = self.__generate_stream_mux_repeater(stream_name, mux, format, profile, muxer_feed) comp_name = '%s-%s%i-stream-%s-%s-%s' % (stream['type'], stream_name, idx + 1, mux, format, profile) mount_point = '/%s-%s-%s.%s' % (mux, format, profile, self._desc.globals['formats'][format]['muxer']) worker = self.__get_worker(comp_name) hostname = stream['hostname'] hostname_next = None if cnt > 1: hostname = stream['hostname'] % (idx + 1) if idx != 0: hostname_next = stream['hostname'] % (idx) else: hostname_next = stream['hostname'] % (cnt) self.flow['streamers'][comp_name] = { 'type': "%s-stream" % stream['type'], 'desc': "%s streamer for %s %s-%s (part %i of %i in %s cluster)" % ( stream['type'], mux, format, profile, idx + 1, cnt, stream_name), 'worker': str(worker), 'feeder': feeder, 'properties': { 'description': self._desc.globals['description'], 'type': 'slave', 'porter-socket-path': porter.socket_path, 'porter-username': porter.username, 'porter-password': porter.password, 'mount-point': mount_point, 'hostname': hostname, 'port': porter.port, } } self.__set_stream_mux_instance_props(comp_name, stream, porter.port, mount_point, hostname_next) self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile) self.__add_worker_flag_exclusive(worker, "stream", stream_name) self.__add_worker_flag_exclusive(worker, "stream-hostname", hostname) self.__add_worker_flag_exclusive(worker, "stream-index", idx) if 'onion-service' in stream and stream['onion-service']: self.__add_worker_flag_exclusive(worker, "stream-onion", stream['onion-service']) if 'sfive' in self._desc.globals['stats']: self.__add_worker_flag_exclusive(worker, "sfive", self._desc.globals['stats']['sfive']['type']) def __generate_stream_instance(self, stream_name, stream, idx, cnt): port = stream['port'] interface = None if 'interface' in stream: interface = stream['interface'] porter = self.__create_porter(stream['type'], stream_name, idx, port, interface) for mux_name in stream['muxes']: for format_name, format in self._desc.muxes[mux_name]['formats'].items(): for profile_name in format: self.__generate_stream_mux_instance(stream_name, stream, mux_name, format_name, profile_name, idx, cnt, porter) def _generate_streams(self): self.flow['repeaters'] = {} self.flow['streamers'] = {} for stream_name, stream in self._desc.streams.items(): cnt = stream['count'] for idx in range(cnt): self.__generate_stream_instance(stream_name, stream, idx, cnt) # # records def _generate_records(self): self.flow['recorders'] = {} for record_name, record in self._desc.records.items(): mux = record['mux'] format = record['format'] profile = record['profile'] feeder = 'mux-%s-%s-%s' % (mux, format, profile) comp_name = 'record-%s-%s-%s' % (mux, format, profile) comp_desc = 'recorder for %s %s-%s' % (mux, format, profile) worker = self.__get_worker(comp_name) self.flow['recorders'][comp_name] = { 'type': "recorder", 'desc': comp_desc, 'worker': str(worker), 'feeder': feeder, 'properties': {}, '_recorder_id': record_name } for prop in record: if prop not in ['mux', 'format', 'profile']: self.flow['recorders'][comp_name]['properties'][prop] = record[prop] self.__add_worker_flag_multi(worker, "record", record_name) # # all def generate(self): self._generate_inputs() self._generate_muxes() self._generate_streams() self._generate_records() return self._unassigned_components def planet_xml(self, template_dir): loader = jinja2.FileSystemLoader(os.path.join(template_dir, self._desc.globals['templates'], 'flumotion')) env = jinja2.Environment(loader=loader, line_statement_prefix='%%') template = env.get_template('planet.xml.j2') planet_xml = template.render(globals=self._desc.globals, atmosphere=self.atmosphere, flow=self.flow) return planet_xml + "\n" def htpasswd(self): salt = rand_string(6) out = "%s:%s\n" % (self._desc.globals['admin']['username'], crypt.crypt(self._desc.globals['admin']['password'], salt)) for _, worker in self.workers.items(): salt = rand_string(6) out += "%s:%s\n" % (worker['name'], crypt.crypt(worker['password'], salt)) return out.encode('utf-8') def sfive_proxy_config(self, worker_name, for_onion=False): # TODO: hardcoded values hostname_prefix = "streamer" addr = "" port = 8000 if for_onion: hostname_prefix = "streamer-onion" addr = "127.0.0.1" port = 8001 listen = "%s:%i" % (addr, port) hostname = "%s-%s%d" % (hostname_prefix, self.workers[worker_name]['flags']['stream'], (self.workers[worker_name]['flags']['stream-index'] + 1)) conf = {'listen': listen, 'protocol': 'http'} if not for_onion: conf['protocol'] = 'http+https' conf['tls'] = {'min-protocol-version': 'TLSv1', 'prefer-server-ciphers': True} conf['tls']['certificate'] = '/srv/acme/fullchain' conf['tls']['certificate-key'] = '/srv/acme/privkey' conf['tls']['ciphers'] = ['ECDHE_RSA_WITH_AES_256_GCM_SHA384', 'ECDHE_RSA_WITH_AES_256_CBC_SHA', 'RSA_WITH_AES_256_GCM_SHA384', 'RSA_WITH_AES_256_CBC_SHA'] conf['tls']['ecdh-curves'] = ['secp521r1', 'secp384r1', 'secp256r1'] conf['connect'] = 'http://flumotion-worker-' + self.workers[worker_name]['name'] + ':8000' conf['request_header'] = [{'op': 'del', 'header': 'X-Forwarded-For'}] conf['response_header'] = [{'op': 'set', 'header': 'Cache-Control', 'value': 'no-cache'}, {'op': 'add', 'header': 'Cache-Control', 'value': 'no-store'}, {'op': 'add', 'header': 'Cache-Control', 'value': 'must-revalidate'}, {'op': 'add', 'header': 'Cache-Control', 'value': 'max-age=0'}, {'op': 'time', 'header': 'Expires', 'value': '-1s'}] conf['backends'] = {} conf['backends']['/hls/'] = {'connect': 'http://nginx-streamer-' + self.workers[worker_name]['name'] + ':8000'} conf['backends']['/dash/'] = {'connect': 'http://nginx-streamer-' + self.workers[worker_name]['name'] + ':8000'} conf['sfive'] = {'hostname': hostname} conf['sfive']['socket'] = self._desc.globals['stats']['sfive']['socket'] conf['sfive']['duration'] = self._desc.globals['stats']['sfive']['duration'] conf['sfive']['tags'] = self._desc.globals['stats']['sfive']['tags'] conf['sfive']['matches'] = [{'format': '/${content}-${format}-${quality}', 'streams': 'av-orig/flash,webm/high,medium,low,mini'}, {'format': '/${content}-${format}-${quality}', 'streams': 'audio-orig/ogg,mp3/high,medium,low,mini'}, {'format': '/${format}/${content}-${quality}', 'streams': 'av-orig/dash,hls/high,medium,low,mini'}] return conf # kubernetes handling ############################# # class K8sDeployment: def __init__(self, desc, planet, namespace=None): self._desc = desc self._planet = planet self._namespace = namespace if not self._namespace: self._namespace = self._desc.globals['name'] kubernetes.config.load_kube_config() kubernetes.client.user_agent = 'flufigut' self.__has_onion_service = False self.__has_sfive = False self.__has_sfive_onion = False def __create_namespace(self, v1): ns = kubernetes.client.V1Namespace() ns.metadata = kubernetes.client.V1ObjectMeta() ns.metadata.name = self._namespace v1.create_namespace(ns) def __delete_namespace(self, v1): try: opts = kubernetes.client.V1DeleteOptions() v1.delete_namespace(self._namespace, opts, grace_period_seconds=0) except kubernetes.client.rest.ApiException as e: if e.status == 404: pass def __generate_object(self, tmpl_env, template_file, deploy={}): tmpl = tmpl_env.get_template(template_file + '.j2') deploy['namespace'] = self._namespace object_yaml = tmpl.render(desc=self._desc, planet=self._planet, deploy=deploy) return yaml.load(object_yaml) def _deploy_flumotion_manager(self, template_dir, tmpl_env, v1, appsV1): svc = self.__generate_object(tmpl_env, 'flumotion-manager-svc.yml') v1.create_namespaced_service(self._namespace, svc) cm = self.__generate_object(tmpl_env, 'flumotion-manager-cm.yml') cm['data']['planet.xml'] = self._planet.planet_xml(template_dir) v1.create_namespaced_config_map(self._namespace, cm) secret = self.__generate_object(tmpl_env, 'flumotion-manager-secret.yml') secret['data']['htpasswd'] = base64.b64encode(self._planet.htpasswd()).decode('ascii') v1.create_namespaced_secret(self._namespace, secret) deploy = self.__generate_object(tmpl_env, 'flumotion-manager-deploy.yml') appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_flumotion_worker(self, template_dir, tmpl_env, v1, appsV1, worker): secret = self.__generate_object(tmpl_env, 'flumotion-worker-secret.yml', {'worker': worker}) secret['data']['password'] = base64.b64encode(worker['password'].encode('utf-8')).decode('ascii') v1.create_namespaced_secret(self._namespace, secret) if 'port' in worker['flags']: svc = self.__generate_object(tmpl_env, 'flumotion-worker-svc.yml', {'worker': worker}) v1.create_namespaced_service(self._namespace, svc) deploy = self.__generate_object(tmpl_env, 'flumotion-worker-deploy.yml', {'worker': worker}) appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_nginx_worker(self, template_dir, tmpl_env, v1, appsV1, worker): needs_nginx = False if 'stream' in worker['flags']: stream_name = worker['flags']['stream'] if 'nginx-muxes' in self._desc.streams[stream_name] and len(self._desc.streams[stream_name]['nginx-muxes']) > 0: needs_nginx = True if not needs_nginx: return cm = self.__generate_object(tmpl_env, 'nginx-streamer-cm.yml', {'worker': worker}) v1.create_namespaced_config_map(self._namespace, cm) svc = self.__generate_object(tmpl_env, 'nginx-streamer-svc.yml', {'worker': worker}) v1.create_namespaced_service(self._namespace, svc) deploy = self.__generate_object(tmpl_env, 'nginx-streamer-deploy.yml', {'worker': worker}) appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_sfive_worker(self, template_dir, tmpl_env, v1, appsV1, worker): if 'sfive' not in worker['flags']: return self.__has_sfive = True cm = self.__generate_object(tmpl_env, 'sfive-cm.yml', {'worker': worker}) if 'data' not in cm or not cm['data']: cm['data'] = {} if worker['flags']['sfive'] == 'proxy' and 'stream' in worker['flags']: cm['data']['proxy.json'] = json.dumps(self._planet.sfive_proxy_config(worker['name'])) if 'stream-onion' in worker['flags']: self.__has_onion_service = True self.__has_sfive_onion = True cm['data']['proxy-onion.json'] = json.dumps(self._planet.sfive_proxy_config(worker['name'], True)) v1.create_namespaced_config_map(self._namespace, cm) deploy = self.__generate_object(tmpl_env, 'sfive-deploy.yml', {'worker': worker}) appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_onion_service_config(self, template_dir, tmpl_env, v1, stream_name, stream): deploy = {'stream': stream_name} deploy['onion_services'] = {} # TODO: hardcoded value (sync with sfive_proxy_config) deploy['onion_services'][stream['port']] = {'host': '127.0.0.1', 'port': 8001} # TODO: add port 80 -> onion streaming site cm = self.__generate_object(tmpl_env, 'onion-service-cm.yml', deploy) v1.create_namespaced_config_map(self._namespace, cm) def _deploy_stream_loadbalancer(self, template_dir, tmpl_env, v1, appsV1, stream_name, stream): streamers = [] for idx in range(stream['count']): streamers.append(stream['hostname'] % (idx + 1)) cm = self.__generate_object(tmpl_env, 'stream-lb-cm.yml', {'stream': stream_name, 'streamers': streamers}) v1.create_namespaced_config_map(self._namespace, cm) svc = self.__generate_object(tmpl_env, 'stream-lb-svc.yml', {'stream': stream_name, 'streamers': streamers}) v1.create_namespaced_service(self._namespace, svc) deploy = self.__generate_object(tmpl_env, 'stream-lb-deploy.yml', {'stream': stream_name, 'streamers': streamers}) appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_stream_website(self, template_dir, tmpl_env, v1, appsV1, extV1beta1, stream_name, stream): cm = self.__generate_object(tmpl_env, 'stream-site-cm.yml', {'stream': stream_name}) hostname = stream['hostname'] if 'lb-hostname' in stream: hostname = stream['lb-hostname'] site_config = {'resolutions': self._desc.globals['resolutions'], 'profiles': self._desc.globals['profiles'], 'muxes': {}, 'streamBaseUrl': 'https://%s:%i' % (hostname, stream['port'])} for mux in stream['muxes']: site_config['muxes'][mux] = self._desc.muxes[mux] cm['data']['config.js'] = "var config = %s;\n" % (json.dumps(site_config)) v1.create_namespaced_config_map(self._namespace, cm) svc = self.__generate_object(tmpl_env, 'stream-site-svc.yml', {'stream': stream_name}) v1.create_namespaced_service(self._namespace, svc) deploy = self.__generate_object(tmpl_env, 'stream-site-deploy.yml', {'stream': stream_name}) appsV1.create_namespaced_deployment(self._namespace, deploy) # TODO: install TLS secrets ingress = self.__generate_object(tmpl_env, 'stream-site-ingress.yml', {'stream': stream_name}) extV1beta1.create_namespaced_ingress(self._namespace, ingress) pass def _deploy_onionbalance(self, template_dir, tmpl_env, v1, appsV1, rbacV1): sa = self.__generate_object(tmpl_env, 'onionbalance-sa.yml') v1.create_namespaced_service_account(self._namespace, sa) role = self.__generate_object(tmpl_env, 'onionbalance-role.yml') rbacV1.create_namespaced_role(self._namespace, role) rb = self.__generate_object(tmpl_env, 'onionbalance-rolebinding.yml') rbacV1.create_namespaced_role_binding(self._namespace, rb) # secret = self.__generate_object(tmpl_env, 'onionbalance-secret.yml') # TODO: for _, stream in self._desc.streams: # if 'onion-service' in stream: # key = ~~~~get_key(stream['onion-service']) # secret['data'][stream['onion-service']] = base64.b64encode(key).decode('ascii') # v1.create_namespaced_secret(self._namespace, secret) worker = self._planet.workers[self._desc.globals['deployment']['parameter']['onionbalance_worker']] deploy = self.__generate_object(tmpl_env, 'onionbalance-deploy.yml', {'worker': worker}) appsV1.create_namespaced_deployment(self._namespace, deploy) def deploy(self, template_dir): v1 = kubernetes.client.CoreV1Api() appsV1 = kubernetes.client.AppsV1Api() extV1beta1 = kubernetes.client.ExtensionsV1beta1Api() rbacV1 = kubernetes.client.RbacAuthorizationV1Api() self.__create_namespace(v1) loader = jinja2.FileSystemLoader(os.path.join(template_dir, self._desc.globals['templates'], 'kubernetes')) tmpl_env = jinja2.Environment(loader=loader, line_statement_prefix='%%') self._deploy_flumotion_manager(template_dir, tmpl_env, v1, appsV1) for _, worker in self._planet.workers.items(): if not len(worker['subs']): worker['subs']['worker'] = { 'fullname': worker['name'], 'flags': worker['flags'] } self._deploy_flumotion_worker(template_dir, tmpl_env, v1, appsV1, worker) self._deploy_nginx_worker(template_dir, tmpl_env, v1, appsV1, worker) self._deploy_sfive_worker(template_dir, tmpl_env, v1, appsV1, worker) for stream_name, stream in self._desc.streams.items(): if 'lb-hostname' in stream: self._deploy_stream_loadbalancer(template_dir, tmpl_env, v1, appsV1, stream_name, stream) if 'onion-service' in stream: self._deploy_onion_service_config(template_dir, tmpl_env, v1, stream_name, stream) self._deploy_stream_website(template_dir, tmpl_env, v1, appsV1, extV1beta1, stream_name, stream) if self.__has_onion_service: role = self.__generate_object(tmpl_env, 'onion-service-role.yml') rbacV1.create_namespaced_role(self._namespace, role) self._deploy_onionbalance(template_dir, tmpl_env, v1, appsV1, rbacV1) if self.__has_sfive: sa = self.__generate_object(tmpl_env, 'sfive-sa.yml') v1.create_namespaced_service_account(self._namespace, sa) if self.__has_sfive_onion: rb = self.__generate_object(tmpl_env, 'sfive-onion-rolebinding.yml') rbacV1.create_namespaced_role_binding(self._namespace, rb) def wipe(self): v1 = kubernetes.client.CoreV1Api() self.__delete_namespace(v1) while True: lst = v1.list_namespace() found = False for ns in lst.items: if ns.metadata.name == self._namespace: found = True break if not found: break time.sleep(1) # Main ######################################################## # if __name__ == '__main__': import traceback import pprint __pp = pprint.PrettyPrinter(indent=4, width=160) if len(sys.argv) <= 1: print("ERROR: No configuration file given") sys.exit(-1) config_file = sys.argv[1] ret = 0 try: d = Description() d.parse(config_file) p = Planet(d) unassigned = p.generate() if len(unassigned) > 0: print("unassinged components: ") for c in p._unassigned_components: print("- " + c) sys.exit(1) # print("****************************************************") # print("** atmosphere **") # print("**") # __pp.pprint(p.atmosphere) # print("**") # print("**************************") # print("** planet **") # print("**") # __pp.pprint(p.flow) # print("**") # print("**************************") # print("** _workers_ **") # print("**") # __pp.pprint(p.workers) # print("**") # print("****************************************************") k8s = K8sDeployment(d, p) print("wiping exisiting namespace ...") k8s.wipe() print("deploying new planet ...") k8s.deploy('../templates') print("done.") except Exception as e: print("ERROR: while running app: %s" % e) print(traceback.format_exc()) sys.exit(1) sys.exit(ret)