-- -- sfive -- -- sfive - spreadspace streaming statistics suite is a generic -- statistic collection tool for streaming server infrastuctures. -- The system collects and stores meta data like number of views -- and throughput from a number of streaming servers and stores -- it in a global data store. -- The data acquisition is designed to be generic and extensible in -- order to support different streaming software. -- sfive also contains tools and applications to filter and visualize -- live and recorded data. -- -- -- Copyright (C) 2014 Christian Pointner -- Markus Grueneis -- -- This file is part of sfive. -- -- sfive is free software: you can redistribute it and/or modify -- it under the terms of the GNU General Public License version 3 -- as published by the Free Software Foundation. -- -- sfive is distributed in the hope that it will be useful, -- but WITHOUT ANY WARRANTY; without even the implied warranty of -- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -- GNU General Public License for more details. -- -- You should have received a copy of the GNU General Public License -- along with sfive. If not, see . -- -- -- Install this by adding the following to your nginx.conf -- -- http { -- ..... -- -- lua_shared_dict sfive 64k; -- lua_shared_dict sfive_locks 64k; -- lua_shared_dict sfive_data 64m; -- init_by_lua 's5 = require "s5-nginx.lua"'; -- init_worker_by_lua ' -- s5.init_worker("myhostname", 5, "/path/to/sfive.sock", -- { "tag1", "tag2" }, -- { { content="A", format="1", quality="low" }, -- { content="A", format="1", quality="high" }, -- { content="B", format="1", quality="low" }, -- { content="B", format="1", quality="high" }, -- { content="A", format="2", quality="low" }, -- { content="A", format="2", quality="high" }, -- { content="B", format="2", quality="low" }, -- { content="B", format="2", quality="high" } }) -- '; -- -- ..... -- -- server { -- -- ..... -- -- location /path/to/fromat1/contentA-low/ { -- log_by_lua 's5.log("A", "1", "low")'; -- } -- location /path/to/fromat1/contentA-high/ { -- log_by_lua 's5.log("A", "1", "high")'; -- } -- ..... -- -- } -- locks = require "resty.lock" local _SFIVE = {} ----------------------------------------------- -- this will only be shared within one worker local config = { hostname = "unknown", duration = 5, sockpath = "/var/run/sfive.sock", tags = {}, streamer = {} } local sfive = ngx.shared.sfive local sfive_data = ngx.shared.sfive_data ----------------------------------------------- function _SFIVE.log(content, format, quality) local status = ngx.var.status if status == '200' or status == '206' then local lock = locks:new("sfive_locks") local elapsed, err = lock:lock("data") if not elapsed then ngx.log(ngx.ERR, "SFive(log): lock error: " .. err) ret = false else local remote_addr = ngx.var.remote_addr local bytes_sent = tonumber(ngx.var.bytes_sent) local key = table.concat({content, format, quality, remote_addr}, ';') local ok, err = sfive_data:incr(key, bytes_sent) if not ok then if err == "not found" then local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5) if not ok then ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err) end else ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err) end end ok, err = lock:unlock("data") if not ok then ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err) ret = false end end return ret end end local connect = function() ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'") local sock = ngx.socket.udp() local ok, err = sock:setpeername("unix:" .. config.sockpath) if not ok then return nil, "connect to '" .. config.sockpath .. "' failed: " .. err end return sock end local get_data_streamer = function(keys, content, format, quality) local clients = {} local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$' for i, k in ipairs(keys) do local m, err = ngx.re.match(k, re) if m then local bs, err = sfive_data:get(k) if not bs then ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?") else clients[m[1]] = (clients[m[1]] or 0) + bs end end end local cnt = 0 local bytes_sent = 0 json = '{ clients: [' for k, v in pairs(clients) do if cnt > 0 then json = json .. ',' end json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}' bytes_sent = bytes_sent + v cnt = cnt + 1 end json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }' return json end local send_data = function(sock, starttime) local keys = sfive_data:get_keys(0) for i, s in ipairs(config.streamer) do local json = '{' json = json .. '"start-time": "' .. starttime .. 'Z", ' json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", ' json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' .. '"format": "' .. s.format .. '", ' .. '"quality": "' .. s.quality .. '" }, ' json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality) json = json .. '}\n' while true do -- TODO: this should probably be done inside a seperate writer thread local ok, err = sock:send(json) if not ok then if err ~= "resource temporarily unavailable" then return nil, err end ngx.sleep(0.02) else break end end end return true end local send_init = function(sock) local json = '{' json = json .. '"version": 1, ' json = json .. '"hostname": "' .. config.hostname .. '", ' json = json .. '"tags": [' for i, k in ipairs(config.tags) do json = json .. (i==1 and '' or ',') .. '"' .. k .. '"' end json = json .. ']' json = json .. '}\n' return sock:send(json) end local send_updates = function(sock, starttime, lock) ret = true local elapsed, err = lock:lock("data") if not elapsed then ngx.log(ngx.ERR, "SFive(task): lock error: " .. err) ret = false else local ok, err = send_data(sock, starttime) if not ok then ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) sock:close() ret = false else sfive_data:flush_all() sfive_data:flush_expired() end ok, err = lock:unlock("data") if not ok then ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) ret = false end end return ret end local task = function() local sock, err = connect(config.sockpath) if not sock then ngx.log(ngx.ERR, "SFive(task): connect(): " .. err) return else local ok, err = send_init(sock) if not ok then ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err) sock:close() return end end sfive_data:flush_all() sfive_data:flush_expired() local lock = locks:new("sfive_locks") while true do local starttime = string.gsub(ngx.utctime(), " ", "T", 1) ngx.sleep(config.duration) if not send_updates(sock, starttime, lock) then return end end end local launcher launcher = function(premature) local task = ngx.thread.spawn(task) ngx.thread.wait(task) if premature then return end local ok, err = ngx.timer.at(2, launcher) if not ok then ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err) inst.flags:delete("running") return end end local init = function() local ok, err = sfive:add("running", true) if not ok then if err ~= "exists" then ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err) end return end sfive_data:flush_all() sfive_data:flush_expired() ok, err = ngx.timer.at(0, launcher) if not ok then ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err) return end ngx.log(ngx.INFO, "SFive(init): task initialized successfully!") end function _SFIVE.init_worker(hostname, duration, sockpath, tags, streamer) config.hostname = hostname config.duration = duration config.sockpath = sockpath config.tags = (tags and tags or {}) config.streamer = (streamer and streamer or {}) init() ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!") end ngx.log(ngx.DEBUG, "SFive: loaded successfully") return _SFIVE