From 02baf86179304500c0995942ed620085c77d2797 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 11 Feb 2018 18:00:27 +0100 Subject: nginx streamer for hls and dash work now --- src/flufigut.py | 96 ++++++++++++++-------- .../default/kubernetes/nginx-streamer-cm.yml.j2 | 46 +++++------ .../default/kubernetes/nginx-streamer-svc.yml.j2 | 2 +- 3 files changed, 85 insertions(+), 59 deletions(-) diff --git a/src/flufigut.py b/src/flufigut.py index 6be8cb2..6d7eaf3 100755 --- a/src/flufigut.py +++ b/src/flufigut.py @@ -193,7 +193,25 @@ class Planet: 'flags': {} } - def __add_worker_flag(self, worker, flag_name, flag): + def __add_worker_flag_exclusive(self, worker, flag_name, flag): + if flag_name not in self.workers[worker.name]['flags']: + self.workers[worker.name]['flags'][flag_name] = flag + else: + if flag != self.workers[worker.name]['flags'][flag_name]: + raise Exception("can't set exclusive flag '%s' for worker '%s' to '%s', it is already set to '%s'" % ( + flag_name, worker.name, flag, self.workers[worker.name]['flags'][flag_name])) + + if not worker.subname: + return + + if flag_name not in self.workers[worker.name]['subs'][worker.subname]['flags']: + self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name] = flag + else: + if flag != self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name]: + raise Exception("can't set exclusive flag '%s' for worker '%s' to '%s', it is already set to '%s'" % ( + flag_name, worker.name, flag, self.workers[worker.name]['subs'][worker.subname]['flags'][flag_name])) + + def __add_worker_flag_multi(self, worker, flag_name, flag): if flag_name not in self.workers[worker.name]['flags']: self.workers[worker.name]['flags'][flag_name] = [flag] else: @@ -263,7 +281,7 @@ class Planet: if input['master']: master_cnt += 1 - self.__add_worker_flag(worker, "input", input_name) + self.__add_worker_flag_multi(worker, "input", input_name) self.__set_input_properties(comp_name, input['properties']) if master_cnt == 0: @@ -433,7 +451,7 @@ class Planet: if interface: self.atmosphere[porter.name]['properties']['interface'] = interface - self.__add_worker_flag(worker, "port", port) + self.__add_worker_flag_multi(worker, "port", port) return porter def __generate_stream_mux_repeater(self, stream, mux, format, profile, feeder): @@ -523,8 +541,8 @@ class Planet: } self.__set_stream_mux_instance_props(comp_name, stream, porter.port, mount_point, hostname_next) self.__add_stream_mux_instance_plugs(comp_name, stream_name, idx, mux, format, profile) - self.__add_worker_flag(worker, "stream", stream_name) - self.__add_worker_flag(worker, "stream-hostname-%s" % stream_name, hostname) + self.__add_worker_flag_exclusive(worker, "stream", stream_name) + self.__add_worker_flag_exclusive(worker, "stream-hostname-%s" % stream_name, hostname) def __generate_stream_instance(self, stream_name, stream, idx, cnt): port = stream['port'] @@ -574,7 +592,7 @@ class Planet: if prop not in ['mux', 'format', 'profile']: self.flow['recorders'][comp_name]['properties'][prop] = record[prop] - self.__add_worker_flag(worker, "record", record_name) + self.__add_worker_flag_multi(worker, "record", record_name) # # all @@ -652,12 +670,6 @@ class K8sDeployment: appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_flumotion_worker(self, template_dir, tmpl_env, v1, appsV1, worker): - if not len(worker['subs']): - worker['subs']['worker'] = { - 'fullname': worker['name'], - 'flags': worker['flags'] - } - secret = self.__generate_object(tmpl_env, 'flumotion-worker-secret.yml', worker) secret['data']['password'] = base64.b64encode(worker['password'].encode('utf-8')).decode('ascii') v1.create_namespaced_secret(self._namespace, secret) @@ -670,6 +682,15 @@ class K8sDeployment: appsV1.create_namespaced_deployment(self._namespace, deploy) def _deploy_nginx_worker(self, template_dir, tmpl_env, v1, appsV1, worker): + needs_nginx = False + if 'stream' in worker['flags']: + stream_name = worker['flags']['stream'] + if 'nginx-muxes' in self._desc.streams[stream_name] and len(self._desc.streams[stream_name]['nginx-muxes']) > 0: + needs_nginx = True + + if not needs_nginx: + return + cm = self.__generate_object(tmpl_env, 'nginx-streamer-cm.yml', worker) v1.create_namespaced_config_map(self._namespace, cm) @@ -689,6 +710,11 @@ class K8sDeployment: self._deploy_flumotion_manager(template_dir, tmpl_env, v1, appsV1) for _, worker in self._planet.workers.items(): + if not len(worker['subs']): + worker['subs']['worker'] = { + 'fullname': worker['name'], + 'flags': worker['flags'] + } self._deploy_flumotion_worker(template_dir, tmpl_env, v1, appsV1, worker) self._deploy_nginx_worker(template_dir, tmpl_env, v1, appsV1, worker) @@ -733,29 +759,29 @@ if __name__ == '__main__': print("- " + c) sys.exit(1) - print("****************************************************") - print("** atmosphere **") - print("**") - __pp.pprint(p.atmosphere) - print("**") - print("**************************") - print("** planet **") - print("**") - __pp.pprint(p.flow) - print("**") - print("**************************") - print("** _workers_ **") - print("**") - __pp.pprint(p.workers) - print("**") - print("****************************************************") - - # k8s = K8sDeployment(d, p) - # print("wiping exisiting namespace ...") - # k8s.wipe() - # print("deploying new planet ...") - # k8s.deploy('../templates') - # print("done.") + # print("****************************************************") + # print("** atmosphere **") + # print("**") + # __pp.pprint(p.atmosphere) + # print("**") + # print("**************************") + # print("** planet **") + # print("**") + # __pp.pprint(p.flow) + # print("**") + # print("**************************") + # print("** _workers_ **") + # print("**") + # __pp.pprint(p.workers) + # print("**") + # print("****************************************************") + + k8s = K8sDeployment(d, p) + print("wiping exisiting namespace ...") + k8s.wipe() + print("deploying new planet ...") + k8s.deploy('../templates') + print("done.") except Exception as e: print("ERROR: while running app: %s" % e) diff --git a/templates/default/kubernetes/nginx-streamer-cm.yml.j2 b/templates/default/kubernetes/nginx-streamer-cm.yml.j2 index 4171305..2ee77c6 100644 --- a/templates/default/kubernetes/nginx-streamer-cm.yml.j2 +++ b/templates/default/kubernetes/nginx-streamer-cm.yml.j2 @@ -1,3 +1,6 @@ +%% set flumotion_port = desc.streams[worker.flags.stream].port +%% set nginx_port = desc.streams[worker.flags.stream]['nginx-port'] +%% set hostname = worker.flags['stream-hostname-'+worker.flags.stream] apiVersion: v1 kind: ConfigMap metadata: @@ -35,8 +38,8 @@ data: access_log /dev/null; server { - listen 8000 default_server; - listen [::]:8000 default_server; + listen {{ nginx_port }} default_server; + listen [::]:{{ nginx_port }} default_server; server_name _; root /srv/www; @@ -57,21 +60,6 @@ data: server { listen localhost:1935; - respawn on; - - exec_static ffmpeg -i http://flumotion-worker-{{ worker.name }}:8000/av-orig-flash-mini.flv - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/hls/av-orig-mini - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/dash/av-orig-mini; - exec_static ffmpeg -i http://flumotion-worker-{{ worker.name }}:8000/av-orig-flash-low.flv - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/hls/av-orig-low - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/dash/av-orig-low; - exec_static ffmpeg -i http://flumotion-worker-{{ worker.name }}:8000/av-orig-flash-medium.flv - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/hls/av-orig-medium - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/dash/av-orig-medium; - exec_static ffmpeg -i http://flumotion-worker-{{ worker.name }}:8000/av-orig-flash-high.flv - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/hls/av-orig-high - -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/dash/av-orig-high; - application hls { live on; @@ -84,12 +72,15 @@ data: hls_fragment_naming timestamp; hls_fragment_slicing aligned; hls_type live; - hls_base_url http://localhost:8000/hls/; - - hls_variant -mini BANDWIDTH=350000; - hls_variant -low BANDWIDTH=700000; - hls_variant -medium BANDWIDTH=1200000; - hls_variant -high BANDWIDTH=2000000; + hls_base_url http://{{ hostname }}:{{ nginx_port }}/hls/; + +%% for mux in desc.streams[worker.flags.stream]['nginx-muxes'] +%% for profile in desc.muxes[mux].formats.flash +%% set abitrate = desc.globals.profiles[profile].audio +%% set vbitrate = desc.globals.bitrates[desc.globals.formats.flash.video][desc.globals.profiles[profile].video] + hls_variant -{{ profile }} BANDWIDTH={{ (abitrate + vbitrate) * 1000 }}; +%% endfor +%% endfor } application dash { @@ -102,5 +93,14 @@ data: dash_nested on; dash_cleanup on; } + + respawn on; +%% for mux in desc.streams[worker.flags.stream]['nginx-muxes'] +%% for profile in desc.muxes[mux].formats.flash + exec_static ffmpeg -i http://flumotion-worker-{{ worker.name }}:{{ flumotion_port }}/{{ mux }}-flash-{{ profile }}.flv + -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/hls/{{ mux }}-{{ profile }} + -acodec copy -vcodec copy -vbsf h264_mp4toannexb -f flv rtmp://localhost/dash/{{ mux }}-{{ profile }}; +%% endfor +%% endfor } } diff --git a/templates/default/kubernetes/nginx-streamer-svc.yml.j2 b/templates/default/kubernetes/nginx-streamer-svc.yml.j2 index a6a9f0d..b6bcfe5 100644 --- a/templates/default/kubernetes/nginx-streamer-svc.yml.j2 +++ b/templates/default/kubernetes/nginx-streamer-svc.yml.j2 @@ -13,4 +13,4 @@ spec: type: streamer worker: {{ worker.name }} ports: - - port: 8000 + - port: {{ desc.streams[worker.flags.stream]['nginx-port'] }} -- cgit v1.2.3