summaryrefslogtreecommitdiff
path: root/src/flufigut.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/flufigut.py')
-rwxr-xr-xsrc/flufigut.py579
1 files changed, 64 insertions, 515 deletions
diff --git a/src/flufigut.py b/src/flufigut.py
index 25e86ac..42406d3 100755
--- a/src/flufigut.py
+++ b/src/flufigut.py
@@ -1,16 +1,15 @@
-#!/usr/bin/python
+#!/usr/bin/python3
#
# flufigut
#
# flufigut, the flumotion configuration utility, is a simple tool
# that generates flumotion configuration files using pyhton jinja2
-# template engine and simplejson. flufigut generates planet.xml
-# and worker.xml files from configuration templates and an easy to
-# understand representation of the flow structure written in json.
+# template engine. flufigut generates planet.xml and worker.xml
+# files from configuration templates and an easy to understand
+# representation of the flow structure written in json or yaml.
#
#
-# Copyright (C) 2012-2014 Christian Pointner <equinox@spreadspace.org>
-# Michael Gebetsroither <michael@mgeb.org>
+# Copyright (C) 2018 Christian Pointner <equinox@spreadspace.org>
#
# This file is part of flufigut.
#
@@ -31,535 +30,85 @@
import string
import random
import sys
-import simplejson as json
-from exceptions import *
-from jinja2 import Environment, FileSystemLoader
-import shutil
-import os
-import crypt
-import getpass
+import yaml
+# from jinja2 import Environment, FileSystemLoader
-### helper functions ############################################
+# helper functions ############################################
#
-def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits):
- return ''.join(random.choice(chars) for x in range(size))
-
-### parse json file #############################################
-#
-if len(sys.argv) <= 2:
- raise SystemExit("ERROR: No output format and or configuration file given")
-output_format = sys.argv[1]
+def rand_string(size=8, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits):
+ return ''.join(random.choice(chars) for x in range(size))
-cf = open(sys.argv[2], 'r')
-config = json.load(cf);
-cf.close();
-### initialization ##############################################
+# a flufigut stream description ###############################
#
-globals = config['globals']
-input = config['input']
-mux = config['mux']
-stream = config['stream']
-if 'record' in config:
- record = config['record']
-else:
- record = {}
-
-if output_format == 'flumotion':
- ### init flumotion configuration generator ######################
-
- atmosphere = {}
- flow = {}
- ### sanity checks ###############################################
- machines = {}
- worker = {}
-
- for machine in globals['machines']:
- for w in globals['machines'][machine]:
- if w in worker:
- raise SystemExit("ERROR: worker '%s' is assigned to more than one machine!!" % w)
- else:
- worker[w] = 0
-
- ### generate input components ###################################
-
-
-
- flow['input'] = {}
-
- master = 0
- for source in input:
- name = 'input-%s' % source
- if name not in worker:
- worker[name] = -1
- else:
- worker[name] = 1
- flow['input'][name] = {
- 'type': input[source]['type'],
- 'desc': "capture raw data from %s" % (source),
- 'worker': name,
- 'master': input[source]['master'],
- 'properties': {},
- }
- if input[source]['master']:
- master += 1
- properties = input[source]['properties']
- for property in properties.keys():
- if property == 'resolution':
- flow['input'][name]['properties']['width'] = globals['resolutions'][properties[property]]['width']
- flow['input'][name]['properties']['height'] = globals['resolutions'][properties[property]]['height']
- flow['input'][name]['properties']['framerate'] = globals['resolutions'][properties[property]]['rate']
- else:
- flow['input'][name]['properties'][property] = properties[property]
-
- if master == 0:
- raise SystemExit("You have not configured any master clock device!")
- elif master > 1:
- raise SystemExit("You have configured multiple master clock devices!")
-
-
- for mux_name in mux.keys():
- for format in mux[mux_name]['formats'].keys():
- if 'audio' in mux[mux_name]:
- source = mux[mux_name]['audio'].split(':')[0]
- input_samplerate = input[source]['properties']['samplerate']
- if 'samplerate' in globals['formats'][format]:
- samplerate = globals['formats'][format]['samplerate']
- if samplerate != input_samplerate:
- worker_name = 'resample-%s' % (source)
- if worker_name not in worker:
- worker[worker_name] = -1
- else:
- worker[worker_name] = 1
- feeder = 'input-%s' % (mux[mux_name]['audio'])
- flow['input']['resample-%s-%s' % (source, samplerate)] = {
- 'type': 'audio-resample',
- 'desc': "resample audio from %s to %s Hz" % (source, samplerate),
- 'worker': worker_name,
- 'feeder': feeder,
- 'properties': {
- 'samplerate': samplerate,
- },
- }
-
- if 'video' in mux[mux_name]:
- source = mux[mux_name]['video'].split(':')[0]
- input_resolution = input[source]['properties']['resolution']
- for profile in mux[mux_name]['formats'][format]:
- if 'video' in globals['profiles'][profile]:
- if input_resolution == "":
- raise SystemExit("format definition needs video but no video input given")
- resolution = globals['profiles'][profile]['video']
- if input_resolution != resolution:
- if globals['resolutions'][resolution]['rate'] != globals['resolutions'][input_resolution]['rate']:
- raise SystemExit("ERROR: video rate conversion is not yet supported!!!")
- worker_name = 'resize-%s' % (source)
- if worker_name not in worker:
- worker[worker_name] = -1
- else:
- worker[worker_name] = 1
- feeder = 'input-%s' % (mux[mux_name]['video'])
- flow['input']['resize-%s-%s' % (source, resolution)] = {
- 'type': 'video-resize',
- 'desc': "resize video from %s to %sx%s" % (source, globals['resolutions'][resolution]['width'], globals['resolutions'][resolution]['height']),
- 'worker': worker_name,
- 'feeder': feeder,
- 'properties': {
- 'width': globals['resolutions'][resolution]['width'],
- 'height': globals['resolutions'][resolution]['height'],
- },
- }
-
- ### generate encoder and muxer components #######################
- flow['encoder_video'] = {}
- flow['encoder_audio'] = {}
- flow['muxer'] = {}
-
-
- for mux_name in mux.keys():
- for format in mux[mux_name]['formats'].keys():
- for profile in mux[mux_name]['formats'][format]:
- audio_encoder = 'none'
- if 'audio' in mux[mux_name]:
- encoder = globals['formats'][format]['audio']
- bitrate = globals['profiles'][profile]['audio']
- if 'samplerate' in globals['formats'][format]:
- samplerate = globals['formats'][format]['samplerate']
- else:
- samplerate = input_samplerate
-
- source = mux[mux_name]['audio'].split(':')[0]
- input_samplerate = input[source]['properties']['samplerate']
- if samplerate != input_samplerate:
- feeder = 'resample-%s-%s' % (source, samplerate)
- else:
- feeder = 'input-%s' % (mux[mux_name]['audio'])
- audio_encoder = 'encode-%s-%s-%i-%i' % (source, encoder, bitrate, samplerate)
- if audio_encoder not in flow['encoder_audio']:
- worker_name = 'encoder-%s-%s-%s' % (source, encoder, bitrate)
- if worker_name not in worker:
- worker[worker_name] = -1
- else:
- worker[worker_name] = 1
- flow['encoder_audio'][audio_encoder] = {
- 'type': '%s-encode' % encoder,
- 'desc': "%s encoder for %i kbit/s @ %i Hz, from %s" % (encoder, bitrate, samplerate, source),
- 'worker': worker_name,
- 'feeder': feeder,
- 'properties': {
- 'bitrate': bitrate,
- },
- }
-
- video_encoder = 'none'
- if 'video' in mux[mux_name]:
- encoder = globals['formats'][format]['video']
- resolution = globals['profiles'][profile]['video']
- bitrate = globals['bitrates'][encoder][resolution]
- source = mux[mux_name]['video'].split(':')[0]
- input_resolution = input[source]['properties']['resolution']
- if resolution != input_resolution:
- feeder = 'resize-%s-%s' % (source, resolution)
- else:
- feeder = 'input-%s' % (mux[mux_name]['video'])
- video_encoder = 'encode-%s-%s-%s' % (source, encoder, resolution)
- if video_encoder not in flow['encoder_video'].keys():
- worker_name = 'encoder-%s-%s-%s' % (source, encoder, resolution)
- if worker_name not in worker:
- worker[worker_name] = -1
- else:
- worker[worker_name] = 1
- flow['encoder_video'][video_encoder] = {
- 'type': '%s-encode' % encoder,
- 'desc': "%s encoder for %sx%s, from %s" % (encoder, globals['resolutions'][resolution]['width'], globals['resolutions'][resolution]['height'], source),
- 'worker': worker_name,
- 'feeder': feeder,
- 'properties': {
- 'bitrate': bitrate,
- },
- }
-
- muxer = globals['formats'][format]['muxer']
- worker_name = 'muxer-%s-%s-%s' % (mux_name, format, profile)
- if worker_name not in worker:
- worker[worker_name] = -1
- else:
- worker[worker_name] = 1
- flow['muxer']['muxer-%s-%s-%s' % (mux_name, format, profile)] = {
- 'type': '%s-mux' % muxer,
- 'desc': "%s muxer for %s, profile %s" % (format, mux_name, profile),
- 'worker': worker_name,
- 'feeder_audio': audio_encoder,
- 'feeder_video': video_encoder,
- 'properties': {},
- }
-
- ### generate streamer components ################################
- flow['streamer'] = {}
- flow['repeater'] = {}
-
- for cluster in stream.keys():
- streamer_cnt = stream[cluster]['count']
- port = stream[cluster]['port']
- localdup = None
- if 'localdup' in stream[cluster].keys():
- localdup = stream[cluster]['localdup']
- for idx in range(streamer_cnt):
- stream_worker = '%s%i'%(cluster, idx+1)
- if stream_worker not in worker:
- worker[stream_worker] = -1
- else:
- worker[stream_worker] = 1
- for machine in globals['machines'].keys():
- if stream_worker in globals['machines'][machine]:
- if machine in machines:
- if 'porter' in machines[machine]:
- if port in machines[machine]['porter']:
- raise SystemExit("ERROR: porter cannot be created because machine '%s' already uses port %i" % (machine, port))
- else:
- machines[machine]['porter'] = {}
- else:
- machines[machine] = { 'porter': {} }
-
- machines[machine]['porter'][port] = {
- 'socket-path': "porter-%s"%(rand_string()),
- 'username': rand_string(size=12),
- 'password': rand_string(size=12),
- }
-
- atmosphere['porter-%s-%i'%(machine, port)] = {
- 'type': "porter",
- 'desc': "Porter for %s on port %i"%(machine, port),
- 'worker': stream_worker,
- 'properties': {
- 'port': port,
- 'socket-path': machines[machine]['porter'][port]['socket-path'],
- 'username': machines[machine]['porter'][port]['username'],
- 'password': machines[machine]['porter'][port]['password'],
- },
- }
-
- if localdup:
- if 'porter' in machines[machine]:
- if localdup['port'] in machines[machine]['porter']:
- raise SystemExit("ERROR: porter cannot be created because machine '%s' already uses port %i" % (machine, localdup['port']))
- else:
- machines[machine]['porter'] = {}
- machines[machine]['porter'][localdup['port']] = {
- 'socket-path': "porter-%s"%(rand_string()),
- 'username': rand_string(size=12),
- 'password': rand_string(size=12),
- }
-
- atmosphere['porter-%s-%i'%(machine, localdup['port'])] = {
- 'type': "porter",
- 'desc': "Porter (local-only) for %s on port %i"%(machine, localdup['port']),
- 'worker': stream_worker,
- 'properties': {
- 'port': localdup['port'],
- 'interface': "localhost",
- 'socket-path': machines[machine]['porter'][localdup['port']]['socket-path'],
- 'username': machines[machine]['porter'][localdup['port']]['username'],
- 'password': machines[machine]['porter'][localdup['port']]['password'],
- },
- }
-
- for mux_name in stream[cluster]['muxes']:
- for format in mux[mux_name]['formats'].keys():
- for profile in mux[mux_name]['formats'][format]:
- muxer_feed = 'muxer-%s-%s-%s' % (mux_name, format, profile)
- if 'repeater' in stream[cluster]:
- repeater_worker = stream[cluster]['repeater']
- if repeater_worker not in worker:
- worker[repeater_worker] = -1
- else:
- worker[repeater_worker] = 1
- repeater_name = '%s-%s-%s-%s' % (stream[cluster]['repeater'], mux_name, format, profile)
- flow['repeater'][repeater_name] = {
- 'type': "repeater",
- 'desc': "%s for %s %s-%s" % (stream[cluster]['repeater'], mux_name, format, profile),
- 'worker': repeater_worker,
- 'feeder': muxer_feed }
- feeder = repeater_name
- else:
- feeder = muxer_feed
-
- if localdup and format in localdup['formats']:
- repeater_name = 'localdup-%s%i-%s-%s-%s' % (cluster, idx+1, mux_name, format, profile)
- flow['repeater'][repeater_name] = {
- 'type': "repeater",
- 'desc': "local duplicate on %s%i for %s %s-%s" % (cluster, idx+1, mux_name, format, profile),
- 'worker': stream_worker,
- 'feeder': feeder }
- feeder = repeater_name
-
- name = '%s-local-%s%i-%s-%s-%s' % (stream[cluster]['type'], cluster, idx+1, mux_name, format, profile)
- mount_point = '/%s-%s-%s.%s' % (mux_name, format, profile, globals['formats'][format]['muxer'])
- flow['streamer'][name] = {
- 'type': "%s-stream" % stream[cluster]['type'],
- 'desc': "%s streamer for %s %s-%s (part %i of %s cluster, local duplicate)" % (stream[cluster]['type'], mux_name, format, profile, idx+1, cluster),
- 'worker': stream_worker,
- 'feeder': feeder,
- 'properties': {
- 'description': globals['description'],
- 'type': 'slave',
- 'porter-socket-path': machines[machine]['porter'][localdup['port']]['socket-path'],
- 'porter-username': machines[machine]['porter'][localdup['port']]['username'],
- 'porter-password': machines[machine]['porter'][localdup['port']]['password'],
- 'mount-point': mount_point,
- }
- }
- if 'burst-on-connect' in stream[cluster]['localdup'].keys():
- flow['streamer'][name]['properties']['burst-on-connect'] = 'true'
- flow['streamer'][name]['properties']['burst-time'] = stream[cluster]['localdup']['burst-on-connect']
-
-
- name = '%s-%s%i-%s-%s-%s' % (stream[cluster]['type'], cluster, idx+1, mux_name, format, profile)
- mount_point = '/%s-%s-%s.%s' % (mux_name, format, profile, globals['formats'][format]['muxer'])
- if streamer_cnt > 1:
- hostname = "%s.%s" % (stream[cluster]['hostname'] % (idx+1), globals['domain'])
- if idx != 0:
- hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (idx), globals['domain'])
- else:
- hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (streamer_cnt), globals['domain'])
+class Description:
+
+ def __init__(self):
+ self.globals = {}
+ self.input = {}
+ self.mux = {}
+ self.stream = {}
+ self.record = {}
+
+ def _sanity_check(self):
+ # TODO: add more sanity checks
+ components = {}
+ for _, worker in self.globals['workers'].items():
+ for c in worker:
+ if c in components:
+ raise Exception("ERROR: component '%s' is assigned to more than one worker!" % c)
else:
- hostname = "%s.%s" % (stream[cluster]['hostname'], globals['domain'])
- flow['streamer'][name] = {
- 'type': "%s-stream" % stream[cluster]['type'],
- 'desc': "%s streamer for %s %s-%s (part %i of %s cluster)" % (stream[cluster]['type'], mux_name, format, profile, idx+1, cluster),
- 'worker': stream_worker,
- 'feeder': feeder,
- 'properties': {
- 'description': globals['description'],
- 'type': 'slave',
- 'porter-socket-path': machines[machine]['porter'][port]['socket-path'],
- 'porter-username': machines[machine]['porter'][port]['username'],
- 'porter-password': machines[machine]['porter'][port]['password'],
- 'mount-point': mount_point,
- 'hostname': hostname,
- 'port': port,
- }
- }
-
- if 'stats' in globals.keys():
- flow['streamer'][name]['plugs'] = {}
- if 'rrd' in globals['stats'].keys():
- flow['streamer'][name]['plugs']['rrd'] = {}
- flow['streamer'][name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % (globals['stats']['rrd']['directory'], name)
- flow['streamer'][name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (globals['stats']['rrd']['directory'], name)
-
- if 'sfive' in globals['stats'].keys():
- flow['streamer'][name]['plugs']['sfive'] = {}
- flow['streamer'][name]['plugs']['sfive']['socket'] = globals['stats']['sfive']['socket']
- flow['streamer'][name]['plugs']['sfive']['duration'] = globals['stats']['sfive']['duration']
- flow['streamer'][name]['plugs']['sfive']['tags'] = globals['stats']['sfive']['tags']
- flow['streamer'][name]['plugs']['sfive']['hostname'] = '%s%i' % (cluster, idx+1)
- flow['streamer'][name]['plugs']['sfive']['content-id'] = mux_name
- flow['streamer'][name]['plugs']['sfive']['format'] = format
- flow['streamer'][name]['plugs']['sfive']['quality'] = profile
-
- for prop in stream[cluster]:
- if prop == 'max-con':
- flow['streamer'][name]['properties']['client-limit'] = stream[cluster][prop]
- if streamer_cnt > 1:
- flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point)
- if prop == 'max-bw':
- flow['streamer'][name]['properties']['bandwidth-limit'] = stream[cluster][prop]
- if streamer_cnt > 1:
- flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point)
- if prop == 'burst-on-connect':
- flow['streamer'][name]['properties']['burst-on-connect'] = 'true'
- flow['streamer'][name]['properties']['burst-time'] = stream[cluster][prop]
-
- ### generate record components ################################
- flow['recorder'] = {}
-
- for recorder in record.keys():
- record_worker = 'recorder-%s'%(recorder)
- if record_worker not in worker:
- worker[record_worker] = -1
- else:
- worker[record_worker] = 1
-
- for mux_name in record[recorder]['muxes']:
- format = record[recorder]['muxes'][mux_name]["format"]
- profile = record[recorder]['muxes'][mux_name]["profile"]
- feeder = 'muxer-%s-%s-%s' % (mux_name, format, profile)
+ components[c] = 1
- name = 'recorder-%s-%s-%s' % (mux_name, format, profile)
- flow['recorder'][name] = {
- 'type': "recorder",
- 'desc': "recorder for %s %s-%s" % (mux_name, format, profile),
- 'worker': record_worker,
- 'feeder': feeder,
- 'properties': { }
- }
- for prop in record[recorder]:
- if prop != 'muxes':
- if prop == 'filename':
- flow['recorder'][name]['properties'][prop] = record[recorder][prop]
- else:
- flow['recorder'][name]['properties'][prop] = record[recorder][prop]
+ def parse(self, config_file):
+ cf = open(config_file, 'r')
+ config = yaml.load(cf)
+ cf.close()
- ### sanity checks, cont'd #######################################
+ self.globals = config['globals']
+ self.input = config['input']
+ self.mux = config['mux']
+ self.stream = config['stream']
+ if 'record' in config:
+ self.record = config['record']
- error = 0
- for w in worker:
- if worker[w] == 0:
- print "WARNING: worker '%s' is not used" % w
- elif worker[w] < 0:
- error = error +1
- print "ERROR: worker '%s' is not assigned to any machine" % w
+ return self._sanity_check()
- if error != 0:
- raise SystemExit("%i Errors found - not generating any configuration" % error)
- ### initialize and render templates #############################
- #
-
- import shutil
- shutil.rmtree('%s/%s' % (globals['name'], output_format), ignore_errors=True)
-
- env = Environment(loader=FileSystemLoader('../templates/%s/%s/' % (output_format,globals['templates'])), line_statement_prefix = '%%')
-
- print "generating planet '%s/%s'" % (globals['manager']['machine'], globals['name'])
- template = env.get_template('planet.xml')
- planet = template.render(globals=globals, atmosphere=atmosphere, flow=flow)
-
- dir = '%s/%s/%s/managers/%s/' % (globals['name'], output_format, globals['manager']['machine'], globals['name'])
- if not os.path.exists(dir):
- os.makedirs(dir)
- f = open('%s/planet.xml' % (dir), 'w')
- f.write(planet.encode("utf8"))
- f.write('\n')
- f.close()
-
- passwd = open('%s/%s/%s/%s.passwd' % (globals['name'], output_format, globals['manager']['machine'], globals['name']), 'w')
-
- port = 9000
- for w in worker:
- if worker[w] > 0:
- machine_name = ""
- for machine in globals['machines']:
- if w in globals['machines'][machine]:
- machine_name = machine
- break
- print "generating worker '%s/%s'" % (machine_name, w)
- password = rand_string(12)
- template = env.get_template('worker.xml')
- ports = "%i-%i" % (port, port+1)
- port+=2
- workerconf = template.render(globals=globals, name=w, password=password, portrange=ports)
- dir = '%s/%s/%s/workers' % (globals['name'], output_format, machine)
- if not os.path.exists(dir):
- os.makedirs(dir)
- f = open('%s/%s-%s.xml' % (dir, globals['name'], w), 'w')
- f.write(workerconf.encode("utf8"))
- f.write('\n')
- f.close()
- salt = rand_string(6)
- passwd.write("%s:%s\n" % (w, crypt.crypt(password, salt)));
-
- if "admin" in globals:
- user = globals['admin']['username']
- password = globals['admin']['password']
- else:
- user = raw_input("username of administrator (leave empty for none): ")
- if user == "":
- raise SystemExit("WARN: empty username (not creating admin account)")
-
- password = getpass.getpass("password for '%s': " % user)
- password2 = getpass.getpass("retype password for '%s': " % user)
- if password != password2:
- raise SystemExit("WARN: passwords don't match (not creating admin account)")
-
- salt = rand_string(6)
- passwd.write("%s:%s\n" % (user, crypt.crypt(password, salt)));
-
- ### end flumotion configuration generator #######################
-
-elif output_format == 'player':
+# a flumtion planet configuration #############################
+#
- ### init stream player configuration generator ##################
+class Planet:
- ## tba...
- print "nothing here!"
+ def __init__(self):
+ self.atmosphere = {}
+ self.flow = {}
- ### end stream player configuration generator ###################
-elif output_format == 'stats':
+# Main ########################################################
+#
- ### init rrd statistic script generator #########################
+if __name__ == '__main__':
+ import traceback
+ import pprint
+ __pp = pprint.PrettyPrinter(indent=4, width=160)
- ## tba...
- print "nothing here!"
+ if len(sys.argv) <= 1:
+ print("ERROR: No configuration file given")
+ sys.exit(-1)
+ config_file = sys.argv[1]
- ### en rrd statistic script generator ###########################
+ ret = 0
+ try:
+ desc = Description()
+ desc.parse(config_file)
-else:
- raise SystemExit("OUTPUT FORMAT: unknown format '%s'" % output_format)
+ except Exception as e:
+ print("ERROR: while running app: %s" % e)
+ # print(traceback.format_exc())
+ sys.exit(1)
-### end #########################################################
+ sys.exit(ret)