diff options
Diffstat (limited to 'src/flufigut.py')
-rwxr-xr-x | src/flufigut.py | 579 |
1 files changed, 64 insertions, 515 deletions
diff --git a/src/flufigut.py b/src/flufigut.py index 25e86ac..42406d3 100755 --- a/src/flufigut.py +++ b/src/flufigut.py @@ -1,16 +1,15 @@ -#!/usr/bin/python +#!/usr/bin/python3 # # flufigut # # flufigut, the flumotion configuration utility, is a simple tool # that generates flumotion configuration files using pyhton jinja2 -# template engine and simplejson. flufigut generates planet.xml -# and worker.xml files from configuration templates and an easy to -# understand representation of the flow structure written in json. +# template engine. flufigut generates planet.xml and worker.xml +# files from configuration templates and an easy to understand +# representation of the flow structure written in json or yaml. # # -# Copyright (C) 2012-2014 Christian Pointner <equinox@spreadspace.org> -# Michael Gebetsroither <michael@mgeb.org> +# Copyright (C) 2018 Christian Pointner <equinox@spreadspace.org> # # This file is part of flufigut. # @@ -31,535 +30,85 @@ import string import random import sys -import simplejson as json -from exceptions import * -from jinja2 import Environment, FileSystemLoader -import shutil -import os -import crypt -import getpass +import yaml +# from jinja2 import Environment, FileSystemLoader -### helper functions ############################################ +# helper functions ############################################ # -def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits): - return ''.join(random.choice(chars) for x in range(size)) - -### parse json file ############################################# -# -if len(sys.argv) <= 2: - raise SystemExit("ERROR: No output format and or configuration file given") -output_format = sys.argv[1] +def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for x in range(size)) -cf = open(sys.argv[2], 'r') -config = json.load(cf); -cf.close(); -### initialization ############################################## +# a flufigut stream description ############################### # -globals = config['globals'] -input = config['input'] -mux = config['mux'] -stream = config['stream'] -if 'record' in config: - record = config['record'] -else: - record = {} - -if output_format == 'flumotion': - ### init flumotion configuration generator ###################### - - atmosphere = {} - flow = {} - ### sanity checks ############################################### - machines = {} - worker = {} - - for machine in globals['machines']: - for w in globals['machines'][machine]: - if w in worker: - raise SystemExit("ERROR: worker '%s' is assigned to more than one machine!!" % w) - else: - worker[w] = 0 - - ### generate input components ################################### - - - - flow['input'] = {} - - master = 0 - for source in input: - name = 'input-%s' % source - if name not in worker: - worker[name] = -1 - else: - worker[name] = 1 - flow['input'][name] = { - 'type': input[source]['type'], - 'desc': "capture raw data from %s" % (source), - 'worker': name, - 'master': input[source]['master'], - 'properties': {}, - } - if input[source]['master']: - master += 1 - properties = input[source]['properties'] - for property in properties.keys(): - if property == 'resolution': - flow['input'][name]['properties']['width'] = globals['resolutions'][properties[property]]['width'] - flow['input'][name]['properties']['height'] = globals['resolutions'][properties[property]]['height'] - flow['input'][name]['properties']['framerate'] = globals['resolutions'][properties[property]]['rate'] - else: - flow['input'][name]['properties'][property] = properties[property] - - if master == 0: - raise SystemExit("You have not configured any master clock device!") - elif master > 1: - raise SystemExit("You have configured multiple master clock devices!") - - - for mux_name in mux.keys(): - for format in mux[mux_name]['formats'].keys(): - if 'audio' in mux[mux_name]: - source = mux[mux_name]['audio'].split(':')[0] - input_samplerate = input[source]['properties']['samplerate'] - if 'samplerate' in globals['formats'][format]: - samplerate = globals['formats'][format]['samplerate'] - if samplerate != input_samplerate: - worker_name = 'resample-%s' % (source) - if worker_name not in worker: - worker[worker_name] = -1 - else: - worker[worker_name] = 1 - feeder = 'input-%s' % (mux[mux_name]['audio']) - flow['input']['resample-%s-%s' % (source, samplerate)] = { - 'type': 'audio-resample', - 'desc': "resample audio from %s to %s Hz" % (source, samplerate), - 'worker': worker_name, - 'feeder': feeder, - 'properties': { - 'samplerate': samplerate, - }, - } - - if 'video' in mux[mux_name]: - source = mux[mux_name]['video'].split(':')[0] - input_resolution = input[source]['properties']['resolution'] - for profile in mux[mux_name]['formats'][format]: - if 'video' in globals['profiles'][profile]: - if input_resolution == "": - raise SystemExit("format definition needs video but no video input given") - resolution = globals['profiles'][profile]['video'] - if input_resolution != resolution: - if globals['resolutions'][resolution]['rate'] != globals['resolutions'][input_resolution]['rate']: - raise SystemExit("ERROR: video rate conversion is not yet supported!!!") - worker_name = 'resize-%s' % (source) - if worker_name not in worker: - worker[worker_name] = -1 - else: - worker[worker_name] = 1 - feeder = 'input-%s' % (mux[mux_name]['video']) - flow['input']['resize-%s-%s' % (source, resolution)] = { - 'type': 'video-resize', - 'desc': "resize video from %s to %sx%s" % (source, globals['resolutions'][resolution]['width'], globals['resolutions'][resolution]['height']), - 'worker': worker_name, - 'feeder': feeder, - 'properties': { - 'width': globals['resolutions'][resolution]['width'], - 'height': globals['resolutions'][resolution]['height'], - }, - } - - ### generate encoder and muxer components ####################### - flow['encoder_video'] = {} - flow['encoder_audio'] = {} - flow['muxer'] = {} - - - for mux_name in mux.keys(): - for format in mux[mux_name]['formats'].keys(): - for profile in mux[mux_name]['formats'][format]: - audio_encoder = 'none' - if 'audio' in mux[mux_name]: - encoder = globals['formats'][format]['audio'] - bitrate = globals['profiles'][profile]['audio'] - if 'samplerate' in globals['formats'][format]: - samplerate = globals['formats'][format]['samplerate'] - else: - samplerate = input_samplerate - - source = mux[mux_name]['audio'].split(':')[0] - input_samplerate = input[source]['properties']['samplerate'] - if samplerate != input_samplerate: - feeder = 'resample-%s-%s' % (source, samplerate) - else: - feeder = 'input-%s' % (mux[mux_name]['audio']) - audio_encoder = 'encode-%s-%s-%i-%i' % (source, encoder, bitrate, samplerate) - if audio_encoder not in flow['encoder_audio']: - worker_name = 'encoder-%s-%s-%s' % (source, encoder, bitrate) - if worker_name not in worker: - worker[worker_name] = -1 - else: - worker[worker_name] = 1 - flow['encoder_audio'][audio_encoder] = { - 'type': '%s-encode' % encoder, - 'desc': "%s encoder for %i kbit/s @ %i Hz, from %s" % (encoder, bitrate, samplerate, source), - 'worker': worker_name, - 'feeder': feeder, - 'properties': { - 'bitrate': bitrate, - }, - } - - video_encoder = 'none' - if 'video' in mux[mux_name]: - encoder = globals['formats'][format]['video'] - resolution = globals['profiles'][profile]['video'] - bitrate = globals['bitrates'][encoder][resolution] - source = mux[mux_name]['video'].split(':')[0] - input_resolution = input[source]['properties']['resolution'] - if resolution != input_resolution: - feeder = 'resize-%s-%s' % (source, resolution) - else: - feeder = 'input-%s' % (mux[mux_name]['video']) - video_encoder = 'encode-%s-%s-%s' % (source, encoder, resolution) - if video_encoder not in flow['encoder_video'].keys(): - worker_name = 'encoder-%s-%s-%s' % (source, encoder, resolution) - if worker_name not in worker: - worker[worker_name] = -1 - else: - worker[worker_name] = 1 - flow['encoder_video'][video_encoder] = { - 'type': '%s-encode' % encoder, - 'desc': "%s encoder for %sx%s, from %s" % (encoder, globals['resolutions'][resolution]['width'], globals['resolutions'][resolution]['height'], source), - 'worker': worker_name, - 'feeder': feeder, - 'properties': { - 'bitrate': bitrate, - }, - } - - muxer = globals['formats'][format]['muxer'] - worker_name = 'muxer-%s-%s-%s' % (mux_name, format, profile) - if worker_name not in worker: - worker[worker_name] = -1 - else: - worker[worker_name] = 1 - flow['muxer']['muxer-%s-%s-%s' % (mux_name, format, profile)] = { - 'type': '%s-mux' % muxer, - 'desc': "%s muxer for %s, profile %s" % (format, mux_name, profile), - 'worker': worker_name, - 'feeder_audio': audio_encoder, - 'feeder_video': video_encoder, - 'properties': {}, - } - - ### generate streamer components ################################ - flow['streamer'] = {} - flow['repeater'] = {} - - for cluster in stream.keys(): - streamer_cnt = stream[cluster]['count'] - port = stream[cluster]['port'] - localdup = None - if 'localdup' in stream[cluster].keys(): - localdup = stream[cluster]['localdup'] - for idx in range(streamer_cnt): - stream_worker = '%s%i'%(cluster, idx+1) - if stream_worker not in worker: - worker[stream_worker] = -1 - else: - worker[stream_worker] = 1 - for machine in globals['machines'].keys(): - if stream_worker in globals['machines'][machine]: - if machine in machines: - if 'porter' in machines[machine]: - if port in machines[machine]['porter']: - raise SystemExit("ERROR: porter cannot be created because machine '%s' already uses port %i" % (machine, port)) - else: - machines[machine]['porter'] = {} - else: - machines[machine] = { 'porter': {} } - - machines[machine]['porter'][port] = { - 'socket-path': "porter-%s"%(rand_string()), - 'username': rand_string(size=12), - 'password': rand_string(size=12), - } - - atmosphere['porter-%s-%i'%(machine, port)] = { - 'type': "porter", - 'desc': "Porter for %s on port %i"%(machine, port), - 'worker': stream_worker, - 'properties': { - 'port': port, - 'socket-path': machines[machine]['porter'][port]['socket-path'], - 'username': machines[machine]['porter'][port]['username'], - 'password': machines[machine]['porter'][port]['password'], - }, - } - - if localdup: - if 'porter' in machines[machine]: - if localdup['port'] in machines[machine]['porter']: - raise SystemExit("ERROR: porter cannot be created because machine '%s' already uses port %i" % (machine, localdup['port'])) - else: - machines[machine]['porter'] = {} - machines[machine]['porter'][localdup['port']] = { - 'socket-path': "porter-%s"%(rand_string()), - 'username': rand_string(size=12), - 'password': rand_string(size=12), - } - - atmosphere['porter-%s-%i'%(machine, localdup['port'])] = { - 'type': "porter", - 'desc': "Porter (local-only) for %s on port %i"%(machine, localdup['port']), - 'worker': stream_worker, - 'properties': { - 'port': localdup['port'], - 'interface': "localhost", - 'socket-path': machines[machine]['porter'][localdup['port']]['socket-path'], - 'username': machines[machine]['porter'][localdup['port']]['username'], - 'password': machines[machine]['porter'][localdup['port']]['password'], - }, - } - - for mux_name in stream[cluster]['muxes']: - for format in mux[mux_name]['formats'].keys(): - for profile in mux[mux_name]['formats'][format]: - muxer_feed = 'muxer-%s-%s-%s' % (mux_name, format, profile) - if 'repeater' in stream[cluster]: - repeater_worker = stream[cluster]['repeater'] - if repeater_worker not in worker: - worker[repeater_worker] = -1 - else: - worker[repeater_worker] = 1 - repeater_name = '%s-%s-%s-%s' % (stream[cluster]['repeater'], mux_name, format, profile) - flow['repeater'][repeater_name] = { - 'type': "repeater", - 'desc': "%s for %s %s-%s" % (stream[cluster]['repeater'], mux_name, format, profile), - 'worker': repeater_worker, - 'feeder': muxer_feed } - feeder = repeater_name - else: - feeder = muxer_feed - - if localdup and format in localdup['formats']: - repeater_name = 'localdup-%s%i-%s-%s-%s' % (cluster, idx+1, mux_name, format, profile) - flow['repeater'][repeater_name] = { - 'type': "repeater", - 'desc': "local duplicate on %s%i for %s %s-%s" % (cluster, idx+1, mux_name, format, profile), - 'worker': stream_worker, - 'feeder': feeder } - feeder = repeater_name - - name = '%s-local-%s%i-%s-%s-%s' % (stream[cluster]['type'], cluster, idx+1, mux_name, format, profile) - mount_point = '/%s-%s-%s.%s' % (mux_name, format, profile, globals['formats'][format]['muxer']) - flow['streamer'][name] = { - 'type': "%s-stream" % stream[cluster]['type'], - 'desc': "%s streamer for %s %s-%s (part %i of %s cluster, local duplicate)" % (stream[cluster]['type'], mux_name, format, profile, idx+1, cluster), - 'worker': stream_worker, - 'feeder': feeder, - 'properties': { - 'description': globals['description'], - 'type': 'slave', - 'porter-socket-path': machines[machine]['porter'][localdup['port']]['socket-path'], - 'porter-username': machines[machine]['porter'][localdup['port']]['username'], - 'porter-password': machines[machine]['porter'][localdup['port']]['password'], - 'mount-point': mount_point, - } - } - if 'burst-on-connect' in stream[cluster]['localdup'].keys(): - flow['streamer'][name]['properties']['burst-on-connect'] = 'true' - flow['streamer'][name]['properties']['burst-time'] = stream[cluster]['localdup']['burst-on-connect'] - - - name = '%s-%s%i-%s-%s-%s' % (stream[cluster]['type'], cluster, idx+1, mux_name, format, profile) - mount_point = '/%s-%s-%s.%s' % (mux_name, format, profile, globals['formats'][format]['muxer']) - if streamer_cnt > 1: - hostname = "%s.%s" % (stream[cluster]['hostname'] % (idx+1), globals['domain']) - if idx != 0: - hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (idx), globals['domain']) - else: - hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (streamer_cnt), globals['domain']) +class Description: + + def __init__(self): + self.globals = {} + self.input = {} + self.mux = {} + self.stream = {} + self.record = {} + + def _sanity_check(self): + # TODO: add more sanity checks + components = {} + for _, worker in self.globals['workers'].items(): + for c in worker: + if c in components: + raise Exception("ERROR: component '%s' is assigned to more than one worker!" % c) else: - hostname = "%s.%s" % (stream[cluster]['hostname'], globals['domain']) - flow['streamer'][name] = { - 'type': "%s-stream" % stream[cluster]['type'], - 'desc': "%s streamer for %s %s-%s (part %i of %s cluster)" % (stream[cluster]['type'], mux_name, format, profile, idx+1, cluster), - 'worker': stream_worker, - 'feeder': feeder, - 'properties': { - 'description': globals['description'], - 'type': 'slave', - 'porter-socket-path': machines[machine]['porter'][port]['socket-path'], - 'porter-username': machines[machine]['porter'][port]['username'], - 'porter-password': machines[machine]['porter'][port]['password'], - 'mount-point': mount_point, - 'hostname': hostname, - 'port': port, - } - } - - if 'stats' in globals.keys(): - flow['streamer'][name]['plugs'] = {} - if 'rrd' in globals['stats'].keys(): - flow['streamer'][name]['plugs']['rrd'] = {} - flow['streamer'][name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % (globals['stats']['rrd']['directory'], name) - flow['streamer'][name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (globals['stats']['rrd']['directory'], name) - - if 'sfive' in globals['stats'].keys(): - flow['streamer'][name]['plugs']['sfive'] = {} - flow['streamer'][name]['plugs']['sfive']['socket'] = globals['stats']['sfive']['socket'] - flow['streamer'][name]['plugs']['sfive']['duration'] = globals['stats']['sfive']['duration'] - flow['streamer'][name]['plugs']['sfive']['tags'] = globals['stats']['sfive']['tags'] - flow['streamer'][name]['plugs']['sfive']['hostname'] = '%s%i' % (cluster, idx+1) - flow['streamer'][name]['plugs']['sfive']['content-id'] = mux_name - flow['streamer'][name]['plugs']['sfive']['format'] = format - flow['streamer'][name]['plugs']['sfive']['quality'] = profile - - for prop in stream[cluster]: - if prop == 'max-con': - flow['streamer'][name]['properties']['client-limit'] = stream[cluster][prop] - if streamer_cnt > 1: - flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) - if prop == 'max-bw': - flow['streamer'][name]['properties']['bandwidth-limit'] = stream[cluster][prop] - if streamer_cnt > 1: - flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) - if prop == 'burst-on-connect': - flow['streamer'][name]['properties']['burst-on-connect'] = 'true' - flow['streamer'][name]['properties']['burst-time'] = stream[cluster][prop] - - ### generate record components ################################ - flow['recorder'] = {} - - for recorder in record.keys(): - record_worker = 'recorder-%s'%(recorder) - if record_worker not in worker: - worker[record_worker] = -1 - else: - worker[record_worker] = 1 - - for mux_name in record[recorder]['muxes']: - format = record[recorder]['muxes'][mux_name]["format"] - profile = record[recorder]['muxes'][mux_name]["profile"] - feeder = 'muxer-%s-%s-%s' % (mux_name, format, profile) + components[c] = 1 - name = 'recorder-%s-%s-%s' % (mux_name, format, profile) - flow['recorder'][name] = { - 'type': "recorder", - 'desc': "recorder for %s %s-%s" % (mux_name, format, profile), - 'worker': record_worker, - 'feeder': feeder, - 'properties': { } - } - for prop in record[recorder]: - if prop != 'muxes': - if prop == 'filename': - flow['recorder'][name]['properties'][prop] = record[recorder][prop] - else: - flow['recorder'][name]['properties'][prop] = record[recorder][prop] + def parse(self, config_file): + cf = open(config_file, 'r') + config = yaml.load(cf) + cf.close() - ### sanity checks, cont'd ####################################### + self.globals = config['globals'] + self.input = config['input'] + self.mux = config['mux'] + self.stream = config['stream'] + if 'record' in config: + self.record = config['record'] - error = 0 - for w in worker: - if worker[w] == 0: - print "WARNING: worker '%s' is not used" % w - elif worker[w] < 0: - error = error +1 - print "ERROR: worker '%s' is not assigned to any machine" % w + return self._sanity_check() - if error != 0: - raise SystemExit("%i Errors found - not generating any configuration" % error) - ### initialize and render templates ############################# - # - - import shutil - shutil.rmtree('%s/%s' % (globals['name'], output_format), ignore_errors=True) - - env = Environment(loader=FileSystemLoader('../templates/%s/%s/' % (output_format,globals['templates'])), line_statement_prefix = '%%') - - print "generating planet '%s/%s'" % (globals['manager']['machine'], globals['name']) - template = env.get_template('planet.xml') - planet = template.render(globals=globals, atmosphere=atmosphere, flow=flow) - - dir = '%s/%s/%s/managers/%s/' % (globals['name'], output_format, globals['manager']['machine'], globals['name']) - if not os.path.exists(dir): - os.makedirs(dir) - f = open('%s/planet.xml' % (dir), 'w') - f.write(planet.encode("utf8")) - f.write('\n') - f.close() - - passwd = open('%s/%s/%s/%s.passwd' % (globals['name'], output_format, globals['manager']['machine'], globals['name']), 'w') - - port = 9000 - for w in worker: - if worker[w] > 0: - machine_name = "" - for machine in globals['machines']: - if w in globals['machines'][machine]: - machine_name = machine - break - print "generating worker '%s/%s'" % (machine_name, w) - password = rand_string(12) - template = env.get_template('worker.xml') - ports = "%i-%i" % (port, port+1) - port+=2 - workerconf = template.render(globals=globals, name=w, password=password, portrange=ports) - dir = '%s/%s/%s/workers' % (globals['name'], output_format, machine) - if not os.path.exists(dir): - os.makedirs(dir) - f = open('%s/%s-%s.xml' % (dir, globals['name'], w), 'w') - f.write(workerconf.encode("utf8")) - f.write('\n') - f.close() - salt = rand_string(6) - passwd.write("%s:%s\n" % (w, crypt.crypt(password, salt))); - - if "admin" in globals: - user = globals['admin']['username'] - password = globals['admin']['password'] - else: - user = raw_input("username of administrator (leave empty for none): ") - if user == "": - raise SystemExit("WARN: empty username (not creating admin account)") - - password = getpass.getpass("password for '%s': " % user) - password2 = getpass.getpass("retype password for '%s': " % user) - if password != password2: - raise SystemExit("WARN: passwords don't match (not creating admin account)") - - salt = rand_string(6) - passwd.write("%s:%s\n" % (user, crypt.crypt(password, salt))); - - ### end flumotion configuration generator ####################### - -elif output_format == 'player': +# a flumtion planet configuration ############################# +# - ### init stream player configuration generator ################## +class Planet: - ## tba... - print "nothing here!" + def __init__(self): + self.atmosphere = {} + self.flow = {} - ### end stream player configuration generator ################### -elif output_format == 'stats': +# Main ######################################################## +# - ### init rrd statistic script generator ######################### +if __name__ == '__main__': + import traceback + import pprint + __pp = pprint.PrettyPrinter(indent=4, width=160) - ## tba... - print "nothing here!" + if len(sys.argv) <= 1: + print("ERROR: No configuration file given") + sys.exit(-1) + config_file = sys.argv[1] - ### en rrd statistic script generator ########################### + ret = 0 + try: + desc = Description() + desc.parse(config_file) -else: - raise SystemExit("OUTPUT FORMAT: unknown format '%s'" % output_format) + except Exception as e: + print("ERROR: while running app: %s" % e) + # print(traceback.format_exc()) + sys.exit(1) -### end ######################################################### + sys.exit(ret) |