summaryrefslogtreecommitdiff
path: root/src/flufigut.py
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2018-01-30 23:32:05 +0100
committerChristian Pointner <equinox@spreadspace.org>2018-01-30 23:32:05 +0100
commitadb7cb7a7229200cc8fd18ad8498a805c93c0f65 (patch)
tree3faf467c4f20a024a31f4fd4d3106d5925f89857 /src/flufigut.py
parentno stats needed in 2018 (diff)
added worker templates (not done yet)
Diffstat (limited to 'src/flufigut.py')
-rwxr-xr-xsrc/flufigut.py163
1 files changed, 88 insertions, 75 deletions
diff --git a/src/flufigut.py b/src/flufigut.py
index f9f63f2..1acdf44 100755
--- a/src/flufigut.py
+++ b/src/flufigut.py
@@ -53,16 +53,27 @@ class Description:
self.streams = {}
self.records = {}
- def _sanity_check(self):
- # TODO: add more sanity checks
- # - check if stream['localdup']['port'] and stream['port'] are different
- components = {}
- for _, worker in self.globals['workers'].items():
- for c in worker:
- if c in components:
- raise Exception("ERROR: component '%s' is assigned to more than one worker!" % c)
- else:
- components[c] = 1
+ self.worker_patterns = {}
+
+ def _compute_worker_patterns(self):
+ for worker_name, worker in self.globals['workers'].items():
+ for pattern in worker:
+ is_prefix = False
+ orig_pattern = pattern
+ if '*' in pattern:
+ if '*' in pattern[:-1]:
+ raise Exception("invalid worker pattern '%s' (component globs may only have a single '*' at the end)" % pattern)
+ is_prefix = True
+ pattern = pattern[:-1]
+ if not pattern:
+ raise Exception("worker '%s' has wildcard pattern '%s' which is not allowed" % (worker_name, orig_pattern))
+ if pattern in self.worker_patterns:
+ raise Exception("the worker pattern '%s' is used by at least two workers (%s and %s)" %
+ (orig_pattern, self.worker_patterns[pattern]['worker'], worker_name))
+ self.worker_patterns[pattern] = {
+ 'worker': worker_name,
+ 'is_prefix': is_prefix
+ }
def parse(self, config_file):
cf = open(config_file, 'r')
@@ -76,7 +87,7 @@ class Description:
if 'records' in config:
self.records = config['records']
- return self._sanity_check()
+ return self._compute_worker_patterns()
# a flumtion planet configuration #############################
@@ -95,25 +106,26 @@ class Porter:
class Planet:
- def __init__(self):
+ def __init__(self, desc):
+ self._desc = desc
self.atmosphere = {}
self.flow = {}
#
# inputs
- def __set_input_properties(self, comp_name, props, globals):
+ def __set_input_properties(self, comp_name, props):
for prop in props:
if prop == 'resolution':
- self.flow['inputs'][comp_name]['properties']['width'] = globals['resolutions'][props[prop]]['width']
- self.flow['inputs'][comp_name]['properties']['height'] = globals['resolutions'][props[prop]]['height']
- self.flow['inputs'][comp_name]['properties']['framerate'] = globals['resolutions'][props[prop]]['rate']
+ self.flow['inputs'][comp_name]['properties']['width'] = self._desc.globals['resolutions'][props[prop]]['width']
+ self.flow['inputs'][comp_name]['properties']['height'] = self._desc.globals['resolutions'][props[prop]]['height']
+ self.flow['inputs'][comp_name]['properties']['framerate'] = self._desc.globals['resolutions'][props[prop]]['rate']
else:
self.flow['inputs'][comp_name]['properties'][prop] = props[prop]
- def _generate_inputs(self, inputs, globals):
+ def _generate_inputs(self):
self.flow['inputs'] = {}
master_cnt = 0
- for source, input in inputs.items():
+ for source, input in self._desc.inputs.items():
comp_name = 'input-%s' % source
comp_desc = 'capture raw data from %s' % (source)
self.flow['inputs'][comp_name] = {
@@ -125,7 +137,7 @@ class Planet:
}
if input['master']:
master_cnt += 1
- self.__set_input_properties(comp_name, input['properties'], globals)
+ self.__set_input_properties(comp_name, input['properties'])
if master_cnt == 0:
raise Exception("You have not configured any master clock device!")
@@ -151,14 +163,14 @@ class Planet:
}
return comp_name
- def __generate_audio_encoder(self, mux, format, profile, inputs, globals):
+ def __generate_audio_encoder(self, mux, format, profile):
source = mux['audio'].split(':')[0]
- encoder = globals['formats'][format]['audio']
- bitrate = globals['profiles'][profile]['audio']
- input_samplerate = inputs[source]['properties']['samplerate']
+ encoder = self._desc.globals['formats'][format]['audio']
+ bitrate = self._desc.globals['profiles'][profile]['audio']
+ input_samplerate = self._desc.inputs[source]['properties']['samplerate']
samplerate = input_samplerate
- if 'samplerate' in globals['formats'][format]:
- samplerate = globals['formats'][format]['samplerate']
+ if 'samplerate' in self._desc.globals['formats'][format]:
+ samplerate = self._desc.globals['formats'][format]['samplerate']
feeder = self.__generate_audio_feeder('input-%s' % (mux['audio']), source, input_samplerate, samplerate)
comp_name = 'encode-%s-%s-%i-%i' % (source, encoder, bitrate, samplerate)
@@ -181,45 +193,45 @@ class Planet:
}
return comp_name
- def __generate_video_feeder(self, input, source, input_resolution, target_resolution, globals):
- if globals['resolutions'][target_resolution]['rate'] != globals['resolutions'][input_resolution]['rate']:
+ def __generate_video_feeder(self, input, source, input_resolution, target_resolution):
+ if self._desc.globals['resolutions'][target_resolution]['rate'] != self._desc.globals['resolutions'][input_resolution]['rate']:
raise Exception("ERROR: video rate conversion is not supported!!!")
if target_resolution == input_resolution:
return input
comp_name = 'resize-%s-%s' % (source, target_resolution)
- comp_desc = 'resize video from %s to %sx%s' % (source, globals['resolutions'][target_resolution]['width'],
- globals['resolutions'][target_resolution]['height']),
+ comp_desc = 'resize video from %s to %sx%s' % (source, self._desc.globals['resolutions'][target_resolution]['width'],
+ self._desc.globals['resolutions'][target_resolution]['height']),
self.flow['inputs'][comp_name] = {
'type': 'video-resize',
'desc': comp_desc,
'worker': None,
'feeder': input,
'properties': {
- 'width': globals['resolutions'][target_resolution]['width'],
- 'height': globals['resolutions'][target_resolution]['height'],
+ 'width': self._desc.globals['resolutions'][target_resolution]['width'],
+ 'height': self._desc.globals['resolutions'][target_resolution]['height'],
},
}
return comp_name
- def __generate_video_encoder(self, mux, format, profile, inputs, globals):
- if 'video' not in globals['profiles'][profile]:
+ def __generate_video_encoder(self, mux, format, profile):
+ if 'video' not in self._desc.globals['profiles'][profile]:
return None
source = mux['video'].split(':')[0]
- encoder = globals['formats'][format]['video']
- input_resolution = inputs[source]['properties']['resolution']
- resolution = globals['profiles'][profile]['video']
- bitrate = globals['bitrates'][encoder][resolution]
+ encoder = self._desc.globals['formats'][format]['video']
+ input_resolution = self._desc.inputs[source]['properties']['resolution']
+ resolution = self._desc.globals['profiles'][profile]['video']
+ bitrate = self._desc.globals['bitrates'][encoder][resolution]
if input_resolution == "":
raise Exception("format definition needs video but no video input given")
- feeder = self.__generate_video_feeder('input-%s' % (mux['video']), source, input_resolution, resolution, globals)
+ feeder = self.__generate_video_feeder('input-%s' % (mux['video']), source, input_resolution, resolution)
comp_name = 'encode-%s-%s-%s' % (source, encoder, resolution)
- comp_desc = '%s encoder for %sx%s, from %s' % (encoder, globals['resolutions'][resolution]['width'],
- globals['resolutions'][resolution]['height'], source),
+ comp_desc = '%s encoder for %sx%s, from %s' % (encoder, self._desc.globals['resolutions'][resolution]['width'],
+ self._desc.globals['resolutions'][resolution]['height'], source),
if comp_name in self.flow['encoders-video']:
return comp_name
@@ -235,8 +247,8 @@ class Planet:
}
return comp_name
- def __generate_muxer(self, mux_name, format, profile, globals, feeder_audio, feeder_video):
- muxer = globals['formats'][format]['muxer']
+ def __generate_muxer(self, mux_name, format, profile, feeder_audio, feeder_video):
+ muxer = self._desc.globals['formats'][format]['muxer']
comp_name = 'mux-%s-%s-%s' % (mux_name, format, profile)
comp_desc = '%s muxer for %s, profile %s' % (format, mux_name, profile),
@@ -249,22 +261,22 @@ class Planet:
'properties': {},
}
- def _generate_muxes(self, muxes, inputs, globals):
+ def _generate_muxes(self):
self.flow['encoders-audio'] = {}
self.flow['encoders-video'] = {}
self.flow['muxers'] = {}
- for mux_name, mux in muxes.items():
+ for mux_name, mux in self._desc.muxes.items():
for format in mux['formats']:
for profile in mux['formats'][format]:
audio_encoder = None
video_encoder = None
if 'audio' in mux:
- audio_encoder = self.__generate_audio_encoder(mux, format, profile, inputs, globals)
+ audio_encoder = self.__generate_audio_encoder(mux, format, profile)
if 'video' in mux:
- video_encoder = self.__generate_video_encoder(mux, format, profile, inputs, globals)
+ video_encoder = self.__generate_video_encoder(mux, format, profile)
- self.__generate_muxer(mux_name, format, profile, globals, audio_encoder, video_encoder)
+ self.__generate_muxer(mux_name, format, profile, audio_encoder, video_encoder)
#
# streams
@@ -320,32 +332,33 @@ class Planet:
self.flow['streamers'][comp_name]['properties']['burst-on-connect'] = 'true'
self.flow['streamers'][comp_name]['properties']['burst-time'] = stream[prop]
- def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile, globals):
- if 'stats' in globals:
+ def __add_stream_mux_instance_plugs(self, comp_name, stream_name, idx, mux, format, profile):
+ if 'stats' in self._desc.globals:
self.flow['streamers'][comp_name]['plugs'] = {}
- if 'rrd' in globals['stats']:
+ if 'rrd' in self._desc.globals['stats']:
self.flow['streamers'][comp_name]['plugs']['rrd'] = {}
- self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % (globals['stats']['rrd']['directory'], comp_name)
- self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (globals['stats']['rrd']['directory'], comp_name)
+ self.flow['streamers'][comp_name]['plugs']['rrd']['clients'] = "%s/%s_clients.rrd" % (
+ self._desc.globals['stats']['rrd']['directory'], comp_name)
+ self.flow['streamers'][comp_name]['plugs']['rrd']['bytes'] = "%s/%s_bytes.rrd" % (self._desc.globals['stats']['rrd']['directory'], comp_name)
- if 'sfive' in globals['stats']:
+ if 'sfive' in self._desc.globals['stats']:
self.flow['streamers'][comp_name]['plugs']['sfive'] = {}
- self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = globals['stats']['sfive']['socket']
- self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = globals['stats']['sfive']['duration']
- self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = globals['stats']['sfive']['tags']
+ self.flow['streamers'][comp_name]['plugs']['sfive']['socket'] = self._desc.globals['stats']['sfive']['socket']
+ self.flow['streamers'][comp_name]['plugs']['sfive']['duration'] = self._desc.globals['stats']['sfive']['duration']
+ self.flow['streamers'][comp_name]['plugs']['sfive']['tags'] = self._desc.globals['stats']['sfive']['tags']
self.flow['streamers'][comp_name]['plugs']['sfive']['hostname'] = '%s%i' % (stream_name, idx + 1)
self.flow['streamers'][comp_name]['plugs']['sfive']['content-id'] = mux
self.flow['streamers'][comp_name]['plugs']['sfive']['format'] = format
self.flow['streamers'][comp_name]['plugs']['sfive']['quality'] = profile
- def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter, globals):
+ def __generate_stream_mux_instance(self, stream_name, stream, mux, format, profile, idx, cnt, porter):
muxer_feed = 'muxer-%s-%s-%s' % (mux, format, profile)
feeder = muxer_feed
if 'repeater' in stream:
feeder = self.__generate_stream_mux_repeater(stream_name, mux, format, profile, muxer_feed)
comp_name = '%s-%s%i-%s-%s-%s' % (stream['type'], stream_name, idx + 1, mux, format, profile)
- mount_point = '/%s-%s-%s.%s' % (mux, format, profile, globals['formats'][format]['muxer'])
+ mount_point = '/%s-%s-%s.%s' % (mux, format, profile, self._desc.globals['formats'][format]['muxer'])
hostname = stream['hostname']
hostname_next = None
if cnt > 1:
@@ -361,7 +374,7 @@ class Planet:
'worker': None,
'feeder': feeder,
'properties': {
- 'description': globals['description'],
+ 'description': self._desc.globals['description'],
'type': 'slave',
'porter-socket-path': porter.socket_path,
'porter-username': porter.username,
@@ -372,35 +385,35 @@ class Planet:
}
}
self.__set_stream_mux_instance_props(comp_name, stream, porter.port, mount_point, hostname_next)
- self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile, globals)
+ self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile)
- def __generate_stream_instance(self, stream_name, stream, idx, cnt, muxes, globals):
+ def __generate_stream_instance(self, stream_name, stream, idx, cnt):
port = stream['port']
interface = None
if 'interface' in stream:
interface = stream['interface']
porter = self.__create_porter(stream_name, idx, port, interface)
for mux_name in stream['muxes']:
- for format_name, format in muxes[mux_name]['formats'].items():
+ for format_name, format in self._desc.muxes[mux_name]['formats'].items():
for profile_name in format:
self.__generate_stream_mux_instance(stream_name, stream, mux_name, format_name, profile_name,
- idx, cnt, porter, globals)
+ idx, cnt, porter)
- def _generate_streams(self, streams, muxes, globals):
+ def _generate_streams(self):
self.flow['repeaters'] = {}
self.flow['streamers'] = {}
- for stream_name, stream in streams.items():
+ for stream_name, stream in self._desc.streams.items():
cnt = stream['count']
for idx in range(cnt):
- self.__generate_stream_instance(stream_name, stream, idx, cnt, muxes, globals)
+ self.__generate_stream_instance(stream_name, stream, idx, cnt)
#
# records
- def _generate_records(self, records, globals):
+ def _generate_records(self):
self.flow['recorders'] = {}
- for _, record in records.items():
+ for _, record in self._desc.records.items():
for mux in record['muxes']:
format = record['muxes'][mux]["format"]
profile = record['muxes'][mux]["profile"]
@@ -422,11 +435,11 @@ class Planet:
#
# all
- def generate(self, desc):
- self._generate_inputs(desc.inputs, desc.globals)
- self._generate_muxes(desc.muxes, desc.inputs, desc.globals)
- self._generate_streams(desc.streams, desc.muxes, desc.globals)
- self._generate_records(desc.records, desc.globals)
+ def generate(self):
+ self._generate_inputs()
+ self._generate_muxes()
+ self._generate_streams()
+ self._generate_records()
# Main ########################################################
@@ -447,8 +460,8 @@ if __name__ == '__main__':
d = Description()
d.parse(config_file)
- p = Planet()
- p.generate(d)
+ p = Planet(d)
+ p.generate()
print("****************************************************")
print("** atmosphere **")