From fd185d2e737cbc1b07985cc946fed7e235d2140f Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 17 Oct 2012 01:35:02 +0200 Subject: streamer for multi input works now.. --- src/flufigut.py | 163 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 82 insertions(+), 81 deletions(-) diff --git a/src/flufigut.py b/src/flufigut.py index 7704518..cfffeaa 100755 --- a/src/flufigut.py +++ b/src/flufigut.py @@ -247,87 +247,88 @@ for mux_name in mux.keys(): ### generate streamer components ################################ flow['streamer'] = {} -# for cluster in stream.keys(): -# streamer_cnt = stream[cluster]['count'] -# port = stream[cluster]['port'] -# for idx in range(streamer_cnt): -# stream_worker = '%s%i'%(cluster, idx+1) -# for machine in globals['machines'].keys(): -# if stream_worker in globals['machines'][machine]: -# if machine in machines: -# if 'porter' in machines[machine]: -# if port in machines[machine]['porter']: -# raise SystemExit("ERROR: porter cannot be created because machine '%s' already uses port %i" % (machine, port)) -# else: -# machines[machine]['porter'] = {} -# else: -# machines[machine] = { 'porter': {} } - -# machines[machine]['porter'][port] = { -# 'socket-path': "porter-%s"%(rand_string()), -# 'username': rand_string(size=12), -# 'password': rand_string(size=12), -# } - -# if stream_worker not in worker: -# worker[stream_worker] = -1 -# else: -# worker[stream_worker] = 1 -# atmosphere['porter-%s-%i'%(machine, port)] = { -# 'type': "porter", -# 'desc': "Porter for %s on port %i"%(machine, port), -# 'worker': stream_worker, -# 'properties': { -# 'port': port, -# 'socket-path': machines[machine]['porter'][port]['socket-path'], -# 'username': machines[machine]['porter'][port]['username'], -# 'password': machines[machine]['porter'][port]['password'], -# }, -# } - -# for format in stream[cluster]['formats']: -# for profile in transcode[format]: -# feeder = 'muxer-%s-%s' % (format, profile) -# name = '%s-%s%i-%s-%s' % (stream[cluster]['type'], cluster, idx+1, format, profile) -# mount_point = '/%s-%s.%s' % (format, profile, globals['formats'][format]['muxer']) -# if streamer_cnt > 1: -# hostname = "%s.%s" % (stream[cluster]['hostname'] % (idx+1), globals['domain']) -# if idx != 0: -# hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (idx), globals['domain']) -# else: -# hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (streamer_cnt), globals['domain']) -# else: -# hostname = "%s.%s" % (stream[cluster]['hostname'], globals['domain']) -# flow['streamer'][name] = { -# 'type': "%s-stream" % stream[cluster]['type'], -# 'desc': "%s streamer for %s-%s (part %i of %s cluster)" % (stream[cluster]['type'], format, profile, idx+1, cluster), -# 'worker': stream_worker, -# 'feeder': feeder, -# 'rrd_clients' : "%s/%s_clients.rrd" % (globals['rrd-dir'], name), -# 'rrd_bytes' : "%s/%s_bytes.rrd" % (globals['rrd-dir'], name), -# 'properties': { -# 'description': globals['description'], -# 'type': 'slave', -# 'porter-socket-path': machines[machine]['porter'][port]['socket-path'], -# 'porter-username': machines[machine]['porter'][port]['username'], -# 'porter-password': machines[machine]['porter'][port]['password'], -# 'mount-point': mount_point, -# 'hostname': hostname, -# 'port': port, -# } -# } -# for prop in stream[cluster]: -# if prop == 'max-con': -# flow['streamer'][name]['properties']['client-limit'] = stream[cluster][prop] -# if streamer_cnt > 1: -# flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) -# if prop == 'max-bw': -# flow['streamer'][name]['properties']['bandwidth-limit'] = stream[cluster][prop] -# if streamer_cnt > 1: -# flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) -# if prop == 'burst-on-connect': -# flow['streamer'][name]['properties']['burst-on-connect'] = 'true' -# flow['streamer'][name]['properties']['burst-time'] = stream[cluster][prop] +for cluster in stream.keys(): + streamer_cnt = stream[cluster]['count'] + port = stream[cluster]['port'] + for idx in range(streamer_cnt): + stream_worker = '%s%i'%(cluster, idx+1) + for machine in globals['machines'].keys(): + if stream_worker in globals['machines'][machine]: + if machine in machines: + if 'porter' in machines[machine]: + if port in machines[machine]['porter']: + raise SystemExit("ERROR: porter cannot be created because machine '%s' already uses port %i" % (machine, port)) + else: + machines[machine]['porter'] = {} + else: + machines[machine] = { 'porter': {} } + + machines[machine]['porter'][port] = { + 'socket-path': "porter-%s"%(rand_string()), + 'username': rand_string(size=12), + 'password': rand_string(size=12), + } + + if stream_worker not in worker: + worker[stream_worker] = -1 + else: + worker[stream_worker] = 1 + atmosphere['porter-%s-%i'%(machine, port)] = { + 'type': "porter", + 'desc': "Porter for %s on port %i"%(machine, port), + 'worker': stream_worker, + 'properties': { + 'port': port, + 'socket-path': machines[machine]['porter'][port]['socket-path'], + 'username': machines[machine]['porter'][port]['username'], + 'password': machines[machine]['porter'][port]['password'], + }, + } + + for mux_name in stream[cluster]['muxes']: + for format in mux[mux_name]['formats'].keys(): + for profile in mux[mux_name]['formats'][format]: + feeder = 'muxer-%s-%s-%s' % (mux_name, format, profile) + name = '%s-%s%i-%s-%s-%s' % (stream[cluster]['type'], cluster, idx+1, mux_name, format, profile) + mount_point = '/%s-%s-%s.%s' % (mux_name, format, profile, globals['formats'][format]['muxer']) + if streamer_cnt > 1: + hostname = "%s.%s" % (stream[cluster]['hostname'] % (idx+1), globals['domain']) + if idx != 0: + hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (idx), globals['domain']) + else: + hostname_next = "%s.%s" % (stream[cluster]['hostname'] % (streamer_cnt), globals['domain']) + else: + hostname = "%s.%s" % (stream[cluster]['hostname'], globals['domain']) + flow['streamer'][name] = { + 'type': "%s-stream" % stream[cluster]['type'], + 'desc': "%s streamer for %s-%s (part %i of %s cluster)" % (stream[cluster]['type'], format, profile, idx+1, cluster), + 'worker': stream_worker, + 'feeder': feeder, + 'rrd_clients' : "%s/%s_clients.rrd" % (globals['rrd-dir'], name), + 'rrd_bytes' : "%s/%s_bytes.rrd" % (globals['rrd-dir'], name), + 'properties': { + 'description': globals['description'], + 'type': 'slave', + 'porter-socket-path': machines[machine]['porter'][port]['socket-path'], + 'porter-username': machines[machine]['porter'][port]['username'], + 'porter-password': machines[machine]['porter'][port]['password'], + 'mount-point': mount_point, + 'hostname': hostname, + 'port': port, + } + } + for prop in stream[cluster]: + if prop == 'max-con': + flow['streamer'][name]['properties']['client-limit'] = stream[cluster][prop] + if streamer_cnt > 1: + flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) + if prop == 'max-bw': + flow['streamer'][name]['properties']['bandwidth-limit'] = stream[cluster][prop] + if streamer_cnt > 1: + flow['streamer'][name]['properties']['redirect-on-overflow'] = "http://%s:%i%s" % (hostname_next, port, mount_point) + if prop == 'burst-on-connect': + flow['streamer'][name]['properties']['burst-on-connect'] = 'true' + flow['streamer'][name]['properties']['burst-time'] = stream[cluster][prop] ### sanity checks, cont'd ####################################### -- cgit v1.2.3