From d05240899cb00e0431b4c6b63d1be2efaed6eecf Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 18 Oct 2014 23:37:27 +0200 Subject: reverted to old nginx-lua model --- src/daq/nginx-lua/s5-nginx-fetch.lua | 73 +++++++++ src/daq/nginx-lua/s5-nginx-init.lua | 70 ++++++++ src/daq/nginx-lua/s5-nginx-log.lua | 100 ++++++++++++ src/daq/nginx-lua/s5-nginx.lua | 299 ----------------------------------- 4 files changed, 243 insertions(+), 299 deletions(-) create mode 100644 src/daq/nginx-lua/s5-nginx-fetch.lua create mode 100644 src/daq/nginx-lua/s5-nginx-init.lua create mode 100644 src/daq/nginx-lua/s5-nginx-log.lua delete mode 100644 src/daq/nginx-lua/s5-nginx.lua (limited to 'src/daq') diff --git a/src/daq/nginx-lua/s5-nginx-fetch.lua b/src/daq/nginx-lua/s5-nginx-fetch.lua new file mode 100644 index 0000000..8821094 --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx-fetch.lua @@ -0,0 +1,73 @@ +-- +-- sfive +-- +-- sfive - spreadspace streaming statistics suite is a generic +-- statistic collection tool for streaming server infrastuctures. +-- The system collects and stores meta data like number of views +-- and throughput from a number of streaming servers and stores +-- it in a global data store. +-- The data acquisition is designed to be generic and extensible in +-- order to support different streaming software. +-- sfive also contains tools and applications to filter and visualize +-- live and recorded data. +-- +-- +-- Copyright (C) 2014 Christian Pointner +-- Markus Grueneis +-- +-- This file is part of sfive. +-- +-- sfive is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License version 3 +-- as published by the Free Software Foundation. +-- +-- sfive is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with sfive. If not, see . +-- +-- +-- Install this by adding the following to your nginx.conf +-- +-- location /sfive { +-- allow 127.0.0.1; +-- allow ::1; +-- deny all; +-- +-- content_by_lua_file '/path/to/s5-nginx-fetch.lua'; +-- } +-- + +local lock = locks:new("sfive_locks", { exptime = nil, timeout = 0.001 }) +local ok, err = lock:lock("fetch") +if not ok then + if err == "timeout" then + ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE) + else + ngx.log(ngx.ERR, "SFive(fetch): failed to acquire fetch lock: " .. err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end +else + ngx.log(ngx.INFO, "SFive(fetch): client connected") + local sfive_data = ngx.shared.sfive_data + while true do + logs = sfive_data:get_keys() + for i, k in ipairs(logs) do + local ok, err = ngx.say(sfive_data:get(k)) + if not ok then + ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err) + break + end + sfive_data:delete(k) + end + ngx.flush() + ngx.sleep(0.1) + end + ok, err = lock:unlock("fetch") + if not ok then + ngx.log(ngx.ERR, "SFive(fetch): unlock error: " .. err) + end +end diff --git a/src/daq/nginx-lua/s5-nginx-init.lua b/src/daq/nginx-lua/s5-nginx-init.lua new file mode 100644 index 0000000..7270e23 --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx-init.lua @@ -0,0 +1,70 @@ +-- +-- sfive +-- +-- sfive - spreadspace streaming statistics suite is a generic +-- statistic collection tool for streaming server infrastuctures. +-- The system collects and stores meta data like number of views +-- and throughput from a number of streaming servers and stores +-- it in a global data store. +-- The data acquisition is designed to be generic and extensible in +-- order to support different streaming software. +-- sfive also contains tools and applications to filter and visualize +-- live and recorded data. +-- +-- +-- Copyright (C) 2014 Christian Pointner +-- Markus Grueneis +-- +-- This file is part of sfive. +-- +-- sfive is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License version 3 +-- as published by the Free Software Foundation. +-- +-- sfive is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with sfive. If not, see . +-- +-- +-- Install this by adding the following to your nginx.conf +-- +-- http { +-- lua_shared_dict sfive 64k; +-- lua_shared_dict sfive_locks 64k; +-- lua_shared_dict sfive_data 64m; +-- init_by_lua_file '/path/to/s5-nginx-init.lua'; +-- } +-- + +ngx.log(ngx.DEBUG, "SFive: loaded") + +locks = require "resty.lock" + +local sfive = ngx.shared.sfive +sfive:flush_all() +sfive:flush_expired() +local sfive_data = ngx.shared.sfive_data +sfive_data:flush_all() +sfive_data:flush_expired() + +-- try to create locks so we get an error on init and not on first usage +local lock = locks:new("sfive_locks") +local elapsed, err = lock:lock("test") +if not elapsed then + ngx.log(ngx.ERR, "SFive(init): lock error: " .. err) +else + local ok, err, force = sfive:set("log:idx", 0) + if not ok then + ngx.log(ngx.ERR, "SFive(init): creating log index counter failed: " .. err) + else + ngx.log(ngx.INFO, "SFive(init): initialized successfully!") + end + ok, err = lock:unlock("test") + if not ok then + ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) + end +end diff --git a/src/daq/nginx-lua/s5-nginx-log.lua b/src/daq/nginx-lua/s5-nginx-log.lua new file mode 100644 index 0000000..43d2394 --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx-log.lua @@ -0,0 +1,100 @@ +-- +-- sfive +-- +-- sfive - spreadspace streaming statistics suite is a generic +-- statistic collection tool for streaming server infrastuctures. +-- The system collects and stores meta data like number of views +-- and throughput from a number of streaming servers and stores +-- it in a global data store. +-- The data acquisition is designed to be generic and extensible in +-- order to support different streaming software. +-- sfive also contains tools and applications to filter and visualize +-- live and recorded data. +-- +-- +-- Copyright (C) 2014 Christian Pointner +-- Markus Grueneis +-- +-- This file is part of sfive. +-- +-- sfive is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License version 3 +-- as published by the Free Software Foundation. +-- +-- sfive is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with sfive. If not, see . +-- +-- +-- Install this by adding the following to your nginx.conf +-- +-- location /path/to/hls { +-- log_by_lua_file '/path/to/s5-nginx-log.lua'; +-- } +-- + +local cleanup_delay = 1200 +local log_exptime = 600 + +local log_cleanup = function(premature) + local sfive = ngx.shared.sfive + sfive:flush_expired() + if premature then + return + end + local ok, err = ngx.timer.at(cleanup_delay, log_cleanup) + if not ok then + ngx.log(ngx.ERR, "SFive(log): failed to reenqueue log_cleanup: ", err) + sfive:delete("log:cleanup_running") + return + end +end + +local start_cleanup_thread = function(sfive) + local ok, err, force = sfive:add("log:cleanup_running", 1) + if not ok then + if err ~= "exists" then + ngx.log(ngx.ERR, "SFive(log): failed to set cleanup_running flag: " .. err) + end + return + end + ok, err = ngx.timer.at(cleanup_delay, log_cleanup) + if not ok then + ngx.log(ngx.ERR, "SFive(log): failed to enqueue log_cleanup: ", err) + sfive:delete("log:cleanup_running") + return + end +end + +local status = ngx.var.status + +if status == '200' or status == '206' then + local sfive = ngx.shared.sfive + local idx, err = sfive:incr("log:idx", 1) + if not idx then + ngx.log(ngx.ERR, "SFive(log): incrementing log index failed: " .. err) + else + local json = '{' + json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' + json = json .. '"client": "' .. ngx.var.remote_addr .. '",' + json = json .. '"port": ' .. ngx.var.remote_port .. ',' + json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' + json = json .. '"uri": "' .. ngx.var.uri .. '",' + json = json .. '"status": ' .. status .. ',' + json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent + json = json .. '}' + + local sfive_data = ngx.shared.sfive_data + local ok, err, force = sfive_data:add(idx, json, log_exptime) + if not ok then + ngx.log(ngx.ERR, "SFive(log): adding log line (".. idx .. ") to log store failed: " .. err) + elseif force then + ngx.log(ngx.WARN, "SFive(log): adding log line has overwritten other log lines - consider increasing the log store!") + end + start_cleanup_thread(sfive) + end +end diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua deleted file mode 100644 index eedb85f..0000000 --- a/src/daq/nginx-lua/s5-nginx.lua +++ /dev/null @@ -1,299 +0,0 @@ --- --- sfive --- --- sfive - spreadspace streaming statistics suite is a generic --- statistic collection tool for streaming server infrastuctures. --- The system collects and stores meta data like number of views --- and throughput from a number of streaming servers and stores --- it in a global data store. --- The data acquisition is designed to be generic and extensible in --- order to support different streaming software. --- sfive also contains tools and applications to filter and visualize --- live and recorded data. --- --- --- Copyright (C) 2014 Christian Pointner --- Markus Grueneis --- --- This file is part of sfive. --- --- sfive is free software: you can redistribute it and/or modify --- it under the terms of the GNU General Public License version 3 --- as published by the Free Software Foundation. --- --- sfive is distributed in the hope that it will be useful, --- but WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the --- GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License --- along with sfive. If not, see . --- --- --- Install this by adding the following to your nginx.conf --- --- http { --- ..... --- --- lua_shared_dict sfive 64k; --- lua_shared_dict sfive_locks 64k; --- lua_shared_dict sfive_data 64m; --- init_by_lua 's5 = require "s5-nginx.lua"'; --- init_worker_by_lua ' --- s5.init_worker("myhostname", 5, "/path/to/sfive.sock", --- { "tag1", "tag2" }, --- { { content="A", format="1", quality="low" }, --- { content="A", format="1", quality="high" }, --- { content="B", format="1", quality="low" }, --- { content="B", format="1", quality="high" }, --- { content="A", format="2", quality="low" }, --- { content="A", format="2", quality="high" }, --- { content="B", format="2", quality="low" }, --- { content="B", format="2", quality="high" } }) --- '; --- --- ..... --- --- server { --- --- ..... --- --- location /path/to/fromat1/contentA-low/ { --- log_by_lua 's5.log("A", "1", "low")'; --- } --- location /path/to/fromat1/contentA-high/ { --- log_by_lua 's5.log("A", "1", "high")'; --- } --- ..... --- --- } --- - -locks = require "resty.lock" - -local _SFIVE = {} - ------------------------------------------------ --- this will only be shared within one worker -local config = { - hostname = "unknown", - duration = 5, - sockpath = "/var/run/sfive.sock", - tags = {}, - streamer = {} -} -local sfive = ngx.shared.sfive -local sfive_data = ngx.shared.sfive_data ------------------------------------------------ - -function _SFIVE.log(content, format, quality) - local status = ngx.var.status - if status == '200' or status == '206' then - local lock = locks:new("sfive_locks") - local elapsed, err = lock:lock("data") - if not elapsed then - ngx.log(ngx.ERR, "SFive(log): lock error: " .. err) - ret = false - else - local remote_addr = ngx.var.remote_addr - local bytes_sent = tonumber(ngx.var.bytes_sent) - local key = table.concat({content, format, quality, remote_addr}, ';') - local ok, err = sfive_data:incr(key, bytes_sent) - if not ok then - if err == "not found" then - local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5) - if not ok then - ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err) - end - else - ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err) - end - end - ok, err = lock:unlock("data") - if not ok then - ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err) - ret = false - end - end - return ret - end -end - -local connect = function() - ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'") - - local sock = ngx.socket.udp() - local ok, err = sock:setpeername("unix:" .. config.sockpath) - if not ok then - return nil, "connect to '" .. config.sockpath .. "' failed: " .. err - end - return sock -end - -local get_data_streamer = function(keys, content, format, quality) - local clients = {} - local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$' - for i, k in ipairs(keys) do - local m, err = ngx.re.match(k, re) - if m then - local bs, err = sfive_data:get(k) - if not bs then - ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?") - else - clients[m[1]] = (clients[m[1]] or 0) + bs - end - end - end - - local cnt = 0 - local bytes_sent = 0 - json = '{ clients: [' - for k, v in pairs(clients) do - if cnt > 0 then json = json .. ',' end - json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}' - bytes_sent = bytes_sent + v - cnt = cnt + 1 - end - json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }' - return json -end - -local send_data = function(sock, starttime) - local keys = sfive_data:get_keys(0) - for i, s in ipairs(config.streamer) do - local json = '{' - json = json .. '"start-time": "' .. starttime .. 'Z", ' - json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", ' - json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' .. - '"format": "' .. s.format .. '", ' .. - '"quality": "' .. s.quality .. '" }, ' - json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality) - json = json .. '}\n' - while true do - -- TODO: this should probably be done inside a seperate writer thread - local ok, err = sock:send(json) - if not ok then - if err ~= "resource temporarily unavailable" then - return nil, err - end - ngx.sleep(0.02) - else - break - end - end - end - return true -end - -local send_init = function(sock) - local json = '{' - json = json .. '"version": 1, ' - json = json .. '"hostname": "' .. config.hostname .. '", ' - json = json .. '"tags": [' - for i, k in ipairs(config.tags) do - json = json .. (i==1 and '' or ',') .. '"' .. k .. '"' - end - json = json .. ']' - json = json .. '}\n' - return sock:send(json) -end - -local send_updates = function(sock, starttime, lock) - ret = true - local elapsed, err = lock:lock("data") - if not elapsed then - ngx.log(ngx.ERR, "SFive(task): lock error: " .. err) - ret = false - else - local ok, err = send_data(sock, starttime) - if not ok then - ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) - sock:close() - ret = false - else - sfive_data:flush_all() - sfive_data:flush_expired() - end - - ok, err = lock:unlock("data") - if not ok then - ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) - ret = false - end - end - return ret -end - -local task = function() - local sock, err = connect(config.sockpath) - if not sock then - ngx.log(ngx.ERR, "SFive(task): connect(): " .. err) - return - else - local ok, err = send_init(sock) - if not ok then - ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err) - sock:close() - return - end - end - sfive_data:flush_all() - sfive_data:flush_expired() - local lock = locks:new("sfive_locks") - while true do - local starttime = string.gsub(ngx.utctime(), " ", "T", 1) - ngx.sleep(config.duration) - if not send_updates(sock, starttime, lock) then - return - end - end -end - -local launcher -launcher = function(premature) - local task = ngx.thread.spawn(task) - ngx.thread.wait(task) - if premature then - return - end - local ok, err = ngx.timer.at(2, launcher) - if not ok then - ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err) - inst.flags:delete("running") - return - end -end - -local init = function() - local ok, err = sfive:add("running", true) - if not ok then - if err ~= "exists" then - ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err) - end - return - end - - sfive_data:flush_all() - sfive_data:flush_expired() - - ok, err = ngx.timer.at(0, launcher) - if not ok then - ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err) - return - end - - ngx.log(ngx.INFO, "SFive(init): task initialized successfully!") -end - -function _SFIVE.init_worker(hostname, duration, sockpath, tags, streamer) - config.hostname = hostname - config.duration = duration - config.sockpath = sockpath - config.tags = (tags and tags or {}) - config.streamer = (streamer and streamer or {}) - init() - ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!") -end - -ngx.log(ngx.DEBUG, "SFive: loaded successfully") -return _SFIVE -- cgit v1.2.3