diff options
author | Christian Pointner <equinox@spreadspace.org> | 2014-10-18 23:37:27 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2014-10-18 23:38:27 +0200 |
commit | d05240899cb00e0431b4c6b63d1be2efaed6eecf (patch) | |
tree | 0ca374666672dc8d0dbd2267f0fe6d67639def99 /src/daq/nginx-lua/s5-nginx.lua | |
parent | daq: nginx-lua works now but it has possible unresolvable issues (diff) |
reverted to old nginx-lua model
Diffstat (limited to 'src/daq/nginx-lua/s5-nginx.lua')
-rw-r--r-- | src/daq/nginx-lua/s5-nginx.lua | 299 |
1 files changed, 0 insertions, 299 deletions
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua deleted file mode 100644 index eedb85f..0000000 --- a/src/daq/nginx-lua/s5-nginx.lua +++ /dev/null @@ -1,299 +0,0 @@ --- --- sfive --- --- sfive - spreadspace streaming statistics suite is a generic --- statistic collection tool for streaming server infrastuctures. --- The system collects and stores meta data like number of views --- and throughput from a number of streaming servers and stores --- it in a global data store. --- The data acquisition is designed to be generic and extensible in --- order to support different streaming software. --- sfive also contains tools and applications to filter and visualize --- live and recorded data. --- --- --- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> --- Markus Grueneis <gimpf@gimpf.org> --- --- This file is part of sfive. --- --- sfive is free software: you can redistribute it and/or modify --- it under the terms of the GNU General Public License version 3 --- as published by the Free Software Foundation. --- --- sfive is distributed in the hope that it will be useful, --- but WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the --- GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License --- along with sfive. If not, see <http://www.gnu.org/licenses/>. --- --- --- Install this by adding the following to your nginx.conf --- --- http { --- ..... --- --- lua_shared_dict sfive 64k; --- lua_shared_dict sfive_locks 64k; --- lua_shared_dict sfive_data 64m; --- init_by_lua 's5 = require "s5-nginx.lua"'; --- init_worker_by_lua ' --- s5.init_worker("myhostname", 5, "/path/to/sfive.sock", --- { "tag1", "tag2" }, --- { { content="A", format="1", quality="low" }, --- { content="A", format="1", quality="high" }, --- { content="B", format="1", quality="low" }, --- { content="B", format="1", quality="high" }, --- { content="A", format="2", quality="low" }, --- { content="A", format="2", quality="high" }, --- { content="B", format="2", quality="low" }, --- { content="B", format="2", quality="high" } }) --- '; --- --- ..... --- --- server { --- --- ..... --- --- location /path/to/fromat1/contentA-low/ { --- log_by_lua 's5.log("A", "1", "low")'; --- } --- location /path/to/fromat1/contentA-high/ { --- log_by_lua 's5.log("A", "1", "high")'; --- } --- ..... --- --- } --- - -locks = require "resty.lock" - -local _SFIVE = {} - ------------------------------------------------ --- this will only be shared within one worker -local config = { - hostname = "unknown", - duration = 5, - sockpath = "/var/run/sfive.sock", - tags = {}, - streamer = {} -} -local sfive = ngx.shared.sfive -local sfive_data = ngx.shared.sfive_data ------------------------------------------------ - -function _SFIVE.log(content, format, quality) - local status = ngx.var.status - if status == '200' or status == '206' then - local lock = locks:new("sfive_locks") - local elapsed, err = lock:lock("data") - if not elapsed then - ngx.log(ngx.ERR, "SFive(log): lock error: " .. err) - ret = false - else - local remote_addr = ngx.var.remote_addr - local bytes_sent = tonumber(ngx.var.bytes_sent) - local key = table.concat({content, format, quality, remote_addr}, ';') - local ok, err = sfive_data:incr(key, bytes_sent) - if not ok then - if err == "not found" then - local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5) - if not ok then - ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err) - end - else - ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err) - end - end - ok, err = lock:unlock("data") - if not ok then - ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err) - ret = false - end - end - return ret - end -end - -local connect = function() - ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'") - - local sock = ngx.socket.udp() - local ok, err = sock:setpeername("unix:" .. config.sockpath) - if not ok then - return nil, "connect to '" .. config.sockpath .. "' failed: " .. err - end - return sock -end - -local get_data_streamer = function(keys, content, format, quality) - local clients = {} - local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$' - for i, k in ipairs(keys) do - local m, err = ngx.re.match(k, re) - if m then - local bs, err = sfive_data:get(k) - if not bs then - ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?") - else - clients[m[1]] = (clients[m[1]] or 0) + bs - end - end - end - - local cnt = 0 - local bytes_sent = 0 - json = '{ clients: [' - for k, v in pairs(clients) do - if cnt > 0 then json = json .. ',' end - json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}' - bytes_sent = bytes_sent + v - cnt = cnt + 1 - end - json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }' - return json -end - -local send_data = function(sock, starttime) - local keys = sfive_data:get_keys(0) - for i, s in ipairs(config.streamer) do - local json = '{' - json = json .. '"start-time": "' .. starttime .. 'Z", ' - json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", ' - json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' .. - '"format": "' .. s.format .. '", ' .. - '"quality": "' .. s.quality .. '" }, ' - json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality) - json = json .. '}\n' - while true do - -- TODO: this should probably be done inside a seperate writer thread - local ok, err = sock:send(json) - if not ok then - if err ~= "resource temporarily unavailable" then - return nil, err - end - ngx.sleep(0.02) - else - break - end - end - end - return true -end - -local send_init = function(sock) - local json = '{' - json = json .. '"version": 1, ' - json = json .. '"hostname": "' .. config.hostname .. '", ' - json = json .. '"tags": [' - for i, k in ipairs(config.tags) do - json = json .. (i==1 and '' or ',') .. '"' .. k .. '"' - end - json = json .. ']' - json = json .. '}\n' - return sock:send(json) -end - -local send_updates = function(sock, starttime, lock) - ret = true - local elapsed, err = lock:lock("data") - if not elapsed then - ngx.log(ngx.ERR, "SFive(task): lock error: " .. err) - ret = false - else - local ok, err = send_data(sock, starttime) - if not ok then - ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) - sock:close() - ret = false - else - sfive_data:flush_all() - sfive_data:flush_expired() - end - - ok, err = lock:unlock("data") - if not ok then - ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) - ret = false - end - end - return ret -end - -local task = function() - local sock, err = connect(config.sockpath) - if not sock then - ngx.log(ngx.ERR, "SFive(task): connect(): " .. err) - return - else - local ok, err = send_init(sock) - if not ok then - ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err) - sock:close() - return - end - end - sfive_data:flush_all() - sfive_data:flush_expired() - local lock = locks:new("sfive_locks") - while true do - local starttime = string.gsub(ngx.utctime(), " ", "T", 1) - ngx.sleep(config.duration) - if not send_updates(sock, starttime, lock) then - return - end - end -end - -local launcher -launcher = function(premature) - local task = ngx.thread.spawn(task) - ngx.thread.wait(task) - if premature then - return - end - local ok, err = ngx.timer.at(2, launcher) - if not ok then - ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err) - inst.flags:delete("running") - return - end -end - -local init = function() - local ok, err = sfive:add("running", true) - if not ok then - if err ~= "exists" then - ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err) - end - return - end - - sfive_data:flush_all() - sfive_data:flush_expired() - - ok, err = ngx.timer.at(0, launcher) - if not ok then - ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err) - return - end - - ngx.log(ngx.INFO, "SFive(init): task initialized successfully!") -end - -function _SFIVE.init_worker(hostname, duration, sockpath, tags, streamer) - config.hostname = hostname - config.duration = duration - config.sockpath = sockpath - config.tags = (tags and tags or {}) - config.streamer = (streamer and streamer or {}) - init() - ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!") -end - -ngx.log(ngx.DEBUG, "SFive: loaded successfully") -return _SFIVE |