-- -- sfive -- -- sfive - spreadspace streaming statistics suite is a generic -- statistic collection tool for streaming server infrastuctures. -- The system collects and stores meta data like number of views -- and throughput from a number of streaming servers and stores -- it in a global data store. -- The data acquisition is designed to be generic and extensible in -- order to support different streaming software. -- sfive also contains tools and applications to filter and visualize -- live and recorded data. -- -- -- Copyright (C) 2014 Christian Pointner -- Markus Grueneis -- -- This file is part of sfive. -- -- sfive is free software: you can redistribute it and/or modify -- it under the terms of the GNU General Public License version 3 -- as published by the Free Software Foundation. -- -- sfive is distributed in the hope that it will be useful, -- but WITHOUT ANY WARRANTY; without even the implied warranty of -- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -- GNU General Public License for more details. -- -- You should have received a copy of the GNU General Public License -- along with sfive. If not, see . -- -- -- Install this by adding the following to your nginx.conf -- -- http { -- ..... -- -- lua_shared_dict sfive 64k; -- lua_shared_dict sfive_locks 64k; -- lua_shared_dict sfive_data 64m; -- init_by_lua 's5 = require "s5-nginx.lua"'; -- init_worker_by_lua ' -- s5.init_worker("myhostname", 5, "/path/to/sfive.sock", -- { "tag1", "tag2" }, -- { { content="A", format="1", quality="low" }, -- { content="A", format="1", quality="high" }, -- { content="B", format="1", quality="low" }, -- { content="B", format="1", quality="high" }, -- { content="A", format="2", quality="low" }, -- { content="A", format="2", quality="high" }, -- { content="B", format="2", quality="low" }, -- { content="B", format="2", quality="high" } }) -- '; -- -- ..... -- -- server { -- -- ..... -- -- location /path/to/fromat1/contentA-low/ { -- log_by_lua 's5.log("A", "1", "low")'; -- } -- location /path/to/fromat1/contentA-high/ { -- log_by_lua 's5.log("A", "1", "high")'; -- } -- ..... -- -- } -- locks = require "resty.lock" local _SFIVE = {} ----------------------------------------------- -- this will only be shared within one worker local config = { hostname = "unknown", duration = 5, sockpath = "/var/run/sfive.sock", tags = {}, streamer = {} } local sfive = ngx.shared.sfive local sfive_data = ngx.shared.sfive_data ----------------------------------------------- function _SFIVE.log(content, format, quality) local status = ngx.var.status if status == '200' or status == '206' then -- TODO: acquire(data_lock), update(data), release(data_lock) ngx.log(ngx.ERR, "SFive(log): won't add data from " .. ngx.var.remote_addr .. " to (" .. content .. ", " .. format .. ", " .. quality .. "): not implemented yet!") -- local json = '{' -- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' -- json = json .. '"client": "' .. ngx.var.remote_addr .. '",' -- json = json .. '"port": ' .. ngx.var.remote_port .. ',' -- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' -- json = json .. '"uri": "' .. ngx.var.uri .. '",' -- json = json .. '"status": ' .. status .. ',' -- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent -- json = json .. '}' end end local connect = function() ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'") local sock = ngx.socket.udp() local ok, err = sock:setpeername("unix:" .. config.sockpath) if not ok then return nil, "connect to '" .. config.sockpath .. "' failed: " .. err end return sock end local get_data_streamer = function(content, format, quality) return '{ "client-count": 0, "bytes-sent": 0 }' end local send_data = function(sock, starttime) for i, s in ipairs(config.streamer) do local json = '{' json = json .. '"start-time": "' .. starttime .. 'Z", ' json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", ' json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' .. '"format": "' .. s.format .. '", ' .. '"quality": "' .. s.quality .. '" }, ' json = json .. '"data": ' .. get_data_streamer(s.content, s.format, s.quality) json = json .. '}\n' local ok, err = sock:send(json) if not ok then return nil, err end end return true end local send_init = function(sock) local json = '{' json = json .. '"version": 1, ' json = json .. '"hostname": "' .. config.hostname .. '", ' json = json .. '"tags": [' for i, k in ipairs(config.tags) do json = json .. (i==1 and '' or ',') .. '"' .. k .. '"' end json = json .. ']' json = json .. '}\n' return sock:send(json) end local send_updates = function(sock, starttime, lock) ret = true local elapsed, err = lock:lock("data") if not elapsed then ngx.log(ngx.ERR, "SFive(task): lock error: " .. err) ret = false else local ok, err = send_data(sock, starttime) if not ok then ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) sock:close() ret = false else sfive_data:flush_all() sfive_data:flush_expired() end ok, err = lock:unlock("data") if not ok then ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) ret = false end end return ret end local task = function() local sock, err = connect(config.sockpath) if not sock then ngx.log(ngx.ERR, "SFive(task): connect(): " .. err) return else local ok, err = send_init(sock) if not ok then ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err) sock:close() return end end sfive_data:flush_all() sfive_data:flush_expired() local lock = locks:new("sfive_locks") while true do local starttime = string.gsub(ngx.utctime(), " ", "T", 1) ngx.sleep(config.duration) if not send_updates(sock, starttime, lock) then return end end end local launcher launcher = function(premature) local task = ngx.thread.spawn(task) ngx.thread.wait(task) if premature then return end local ok, err = ngx.timer.at(2, launcher) if not ok then ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err) inst.flags:delete("running") return end end local init = function() local ok, err = sfive:add("running", true) if not ok then if err ~= "exists" then ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err) end return end sfive_data:flush_all() sfive_data:flush_expired() ok, err = ngx.timer.at(0, launcher) if not ok then ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err) return end ngx.log(ngx.INFO, "SFive(init): task initialized successfully!") end function _SFIVE.init_worker(hostname, duration, sockpath, tags, streamer) config.hostname = hostname config.duration = duration config.sockpath = sockpath config.tags = (tags and tags or {}) config.streamer = (streamer and streamer or {}) init() ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!") end ngx.log(ngx.DEBUG, "SFive: loaded successfully") return _SFIVE