diff options
-rw-r--r-- | src/daq/nginx-lua/s5-nginx-fetch.lua | 73 | ||||
-rw-r--r-- | src/daq/nginx-lua/s5-nginx-init.lua | 70 | ||||
-rw-r--r-- | src/daq/nginx-lua/s5-nginx-log.lua | 100 | ||||
-rw-r--r-- | src/daq/nginx-lua/s5-nginx.lua | 203 |
4 files changed, 203 insertions, 243 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-fetch.lua b/src/daq/nginx-lua/s5-nginx-fetch.lua deleted file mode 100644 index 8821094..0000000 --- a/src/daq/nginx-lua/s5-nginx-fetch.lua +++ /dev/null @@ -1,73 +0,0 @@ --- --- sfive --- --- sfive - spreadspace streaming statistics suite is a generic --- statistic collection tool for streaming server infrastuctures. --- The system collects and stores meta data like number of views --- and throughput from a number of streaming servers and stores --- it in a global data store. --- The data acquisition is designed to be generic and extensible in --- order to support different streaming software. --- sfive also contains tools and applications to filter and visualize --- live and recorded data. --- --- --- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> --- Markus Grueneis <gimpf@gimpf.org> --- --- This file is part of sfive. --- --- sfive is free software: you can redistribute it and/or modify --- it under the terms of the GNU General Public License version 3 --- as published by the Free Software Foundation. --- --- sfive is distributed in the hope that it will be useful, --- but WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the --- GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License --- along with sfive. If not, see <http://www.gnu.org/licenses/>. --- --- --- Install this by adding the following to your nginx.conf --- --- location /sfive { --- allow 127.0.0.1; --- allow ::1; --- deny all; --- --- content_by_lua_file '/path/to/s5-nginx-fetch.lua'; --- } --- - -local lock = locks:new("sfive_locks", { exptime = nil, timeout = 0.001 }) -local ok, err = lock:lock("fetch") -if not ok then - if err == "timeout" then - ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE) - else - ngx.log(ngx.ERR, "SFive(fetch): failed to acquire fetch lock: " .. err) - ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) - end -else - ngx.log(ngx.INFO, "SFive(fetch): client connected") - local sfive_data = ngx.shared.sfive_data - while true do - logs = sfive_data:get_keys() - for i, k in ipairs(logs) do - local ok, err = ngx.say(sfive_data:get(k)) - if not ok then - ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err) - break - end - sfive_data:delete(k) - end - ngx.flush() - ngx.sleep(0.1) - end - ok, err = lock:unlock("fetch") - if not ok then - ngx.log(ngx.ERR, "SFive(fetch): unlock error: " .. err) - end -end diff --git a/src/daq/nginx-lua/s5-nginx-init.lua b/src/daq/nginx-lua/s5-nginx-init.lua deleted file mode 100644 index 7270e23..0000000 --- a/src/daq/nginx-lua/s5-nginx-init.lua +++ /dev/null @@ -1,70 +0,0 @@ --- --- sfive --- --- sfive - spreadspace streaming statistics suite is a generic --- statistic collection tool for streaming server infrastuctures. --- The system collects and stores meta data like number of views --- and throughput from a number of streaming servers and stores --- it in a global data store. --- The data acquisition is designed to be generic and extensible in --- order to support different streaming software. --- sfive also contains tools and applications to filter and visualize --- live and recorded data. --- --- --- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> --- Markus Grueneis <gimpf@gimpf.org> --- --- This file is part of sfive. --- --- sfive is free software: you can redistribute it and/or modify --- it under the terms of the GNU General Public License version 3 --- as published by the Free Software Foundation. --- --- sfive is distributed in the hope that it will be useful, --- but WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the --- GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License --- along with sfive. If not, see <http://www.gnu.org/licenses/>. --- --- --- Install this by adding the following to your nginx.conf --- --- http { --- lua_shared_dict sfive 64k; --- lua_shared_dict sfive_locks 64k; --- lua_shared_dict sfive_data 64m; --- init_by_lua_file '/path/to/s5-nginx-init.lua'; --- } --- - -ngx.log(ngx.DEBUG, "SFive: loaded") - -locks = require "resty.lock" - -local sfive = ngx.shared.sfive -sfive:flush_all() -sfive:flush_expired() -local sfive_data = ngx.shared.sfive_data -sfive_data:flush_all() -sfive_data:flush_expired() - --- try to create locks so we get an error on init and not on first usage -local lock = locks:new("sfive_locks") -local elapsed, err = lock:lock("test") -if not elapsed then - ngx.log(ngx.ERR, "SFive(init): lock error: " .. err) -else - local ok, err, force = sfive:set("log:idx", 0) - if not ok then - ngx.log(ngx.ERR, "SFive(init): creating log index counter failed: " .. err) - else - ngx.log(ngx.INFO, "SFive(init): initialized successfully!") - end - ok, err = lock:unlock("test") - if not ok then - ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) - end -end diff --git a/src/daq/nginx-lua/s5-nginx-log.lua b/src/daq/nginx-lua/s5-nginx-log.lua deleted file mode 100644 index 43d2394..0000000 --- a/src/daq/nginx-lua/s5-nginx-log.lua +++ /dev/null @@ -1,100 +0,0 @@ --- --- sfive --- --- sfive - spreadspace streaming statistics suite is a generic --- statistic collection tool for streaming server infrastuctures. --- The system collects and stores meta data like number of views --- and throughput from a number of streaming servers and stores --- it in a global data store. --- The data acquisition is designed to be generic and extensible in --- order to support different streaming software. --- sfive also contains tools and applications to filter and visualize --- live and recorded data. --- --- --- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> --- Markus Grueneis <gimpf@gimpf.org> --- --- This file is part of sfive. --- --- sfive is free software: you can redistribute it and/or modify --- it under the terms of the GNU General Public License version 3 --- as published by the Free Software Foundation. --- --- sfive is distributed in the hope that it will be useful, --- but WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the --- GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License --- along with sfive. If not, see <http://www.gnu.org/licenses/>. --- --- --- Install this by adding the following to your nginx.conf --- --- location /path/to/hls { --- log_by_lua_file '/path/to/s5-nginx-log.lua'; --- } --- - -local cleanup_delay = 1200 -local log_exptime = 600 - -local log_cleanup = function(premature) - local sfive = ngx.shared.sfive - sfive:flush_expired() - if premature then - return - end - local ok, err = ngx.timer.at(cleanup_delay, log_cleanup) - if not ok then - ngx.log(ngx.ERR, "SFive(log): failed to reenqueue log_cleanup: ", err) - sfive:delete("log:cleanup_running") - return - end -end - -local start_cleanup_thread = function(sfive) - local ok, err, force = sfive:add("log:cleanup_running", 1) - if not ok then - if err ~= "exists" then - ngx.log(ngx.ERR, "SFive(log): failed to set cleanup_running flag: " .. err) - end - return - end - ok, err = ngx.timer.at(cleanup_delay, log_cleanup) - if not ok then - ngx.log(ngx.ERR, "SFive(log): failed to enqueue log_cleanup: ", err) - sfive:delete("log:cleanup_running") - return - end -end - -local status = ngx.var.status - -if status == '200' or status == '206' then - local sfive = ngx.shared.sfive - local idx, err = sfive:incr("log:idx", 1) - if not idx then - ngx.log(ngx.ERR, "SFive(log): incrementing log index failed: " .. err) - else - local json = '{' - json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' - json = json .. '"client": "' .. ngx.var.remote_addr .. '",' - json = json .. '"port": ' .. ngx.var.remote_port .. ',' - json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' - json = json .. '"uri": "' .. ngx.var.uri .. '",' - json = json .. '"status": ' .. status .. ',' - json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent - json = json .. '}' - - local sfive_data = ngx.shared.sfive_data - local ok, err, force = sfive_data:add(idx, json, log_exptime) - if not ok then - ngx.log(ngx.ERR, "SFive(log): adding log line (".. idx .. ") to log store failed: " .. err) - elseif force then - ngx.log(ngx.WARN, "SFive(log): adding log line has overwritten other log lines - consider increasing the log store!") - end - start_cleanup_thread(sfive) - end -end diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua new file mode 100644 index 0000000..7a3e9ed --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx.lua @@ -0,0 +1,203 @@ +-- +-- sfive +-- +-- sfive - spreadspace streaming statistics suite is a generic +-- statistic collection tool for streaming server infrastuctures. +-- The system collects and stores meta data like number of views +-- and throughput from a number of streaming servers and stores +-- it in a global data store. +-- The data acquisition is designed to be generic and extensible in +-- order to support different streaming software. +-- sfive also contains tools and applications to filter and visualize +-- live and recorded data. +-- +-- +-- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> +-- Markus Grueneis <gimpf@gimpf.org> +-- +-- This file is part of sfive. +-- +-- sfive is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License version 3 +-- as published by the Free Software Foundation. +-- +-- sfive is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with sfive. If not, see <http://www.gnu.org/licenses/>. +-- +-- +-- Install this by adding the following to your nginx.conf +-- +-- http { +-- ..... +-- +-- lua_shared_dict sfive 64k; +-- lua_shared_dict sfive_locks 64k; +-- lua_shared_dict sfive_data 64m; +-- init_by_lua 's5 = require "s5-nginx.lua"'; +-- init_worker_by_lua 's5.init_worker("myhostname", 5, "/path/to/sfive.sock")'; +-- +-- ..... +-- +-- server { +-- +-- ..... +-- +-- location /path/to/hls/content-quality/ { +-- log_by_lua 's5.log("content", "hls", "quality")'; +-- } +-- +-- ..... +-- +-- } +-- + +locks = require "resty.lock" + +local _SFIVE = {} + +----------------------------------------------- +-- this will only be shared within one worker +local config = { + hostname = "unknown", + duration = 5, + sockpath = "/var/run/sfive.sock", +} +local sfive = ngx.shared.sfive +local sfive_data = ngx.shared.sfive_data +----------------------------------------------- + +function _SFIVE.log(content, format, quality) + -- TODO: acquire(data_lock), update(data), release(data_lock) + ngx.log(ngx.WARN, "SFive(log): won't add data from " .. ngx.var.remote_addr .. + " to (" .. content .. ", " .. format .. ", " .. quality .. "): not implemented yet!") + + -- local json = '{' + -- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' + -- json = json .. '"client": "' .. ngx.var.remote_addr .. '",' + -- json = json .. '"port": ' .. ngx.var.remote_port .. ',' + -- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' + -- json = json .. '"uri": "' .. ngx.var.uri .. '",' + -- json = json .. '"status": ' .. status .. ',' + -- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent + -- json = json .. '}' + +end + +function _SFIVE.connect() + ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'") + + local sock = ngx.socket.udp() + local ok, err = sock:setpeername("unix:" .. config.sockpath) + if not ok then + return nil, "connect to '" .. config.sockpath .. "' failed: " .. err + end + return sock +end + +function _SFIVE.send_data(sock, starttime) + + -- logs = sfive_data:get_keys() + -- for i, k in ipairs(logs) do + -- local ok, err = ngx.say(sfive_data:get(k)) + -- if not ok then + -- ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err) + -- break + -- end + -- sfive_data:delete(k) + -- end + + local json = '{' + json = json .. '"start-time": "' .. starttime .. 'Z",' + json = json .. '"duration-ms": "' .. config.duration * 1000 .. '"' + json = json .. '}\n' + return sock:send(json) +end + +function _SFIVE.send_init(sock) + local json = '{' + json = json .. '"version": 1,' + json = json .. '"hostname": "' .. config.hostname .. '"' + json = json .. '}\n' + return sock:send(json) +end + +function _SFIVE.task() + local sock, err = _SFIVE.connect(config.sockpath) + if not sock then + ngx.log(ngx.ERR, "SFive(task): connect(): " .. err) + return + else + local ok, err = _SFIVE.send_init(sock) + if not ok then + ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err) + sock:close() + return + end + end + + + while true do + local starttime = string.gsub(ngx.utctime(), " ", "T", 1) + + ngx.sleep(config.duration) + + -- TODO: acquire(data_lock), send(data), flush(data), release(data_lock) + local ok, err = _SFIVE.send_data(sock, starttime) + if not ok then + ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) + sock:close() + return + end + end +end + +function _SFIVE.launcher(premature) + local task = ngx.thread.spawn(_SFIVE.task) + ngx.thread.wait(task) + if premature then + return + end + local ok, err = ngx.timer.at(2, _SFIVE.launcher) + if not ok then + ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err) + inst.flags:delete("running") + return + end +end + +function _SFIVE.init() + local ok, err = sfive:add("running", true) + if not ok then + if err ~= "exists" then + ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err) + end + return + end + + sfive_data:flush_all() + sfive_data:flush_expired() + + ok, err = ngx.timer.at(0, _SFIVE.launcher) + if not ok then + ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err) + return + end + + ngx.log(ngx.INFO, "SFive(init): task initialized successfully!") +end + +function _SFIVE.init_worker(hostname, duration, sockpath) + config.hostname = hostname + config.duration = duration + config.sockpath = sockpath + _SFIVE.init() + ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!") +end + +ngx.log(ngx.DEBUG, "SFive: loaded successfully") +return _SFIVE |