From 47f172dbcbe0cae3a6882acc46c74a755507afec Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 18 Oct 2014 04:35:51 +0200 Subject: nginx lua now makes all work by itself --- src/daq/nginx-lua/s5-nginx.lua | 203 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 src/daq/nginx-lua/s5-nginx.lua (limited to 'src/daq/nginx-lua/s5-nginx.lua') diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua new file mode 100644 index 0000000..7a3e9ed --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx.lua @@ -0,0 +1,203 @@ +-- +-- sfive +-- +-- sfive - spreadspace streaming statistics suite is a generic +-- statistic collection tool for streaming server infrastuctures. +-- The system collects and stores meta data like number of views +-- and throughput from a number of streaming servers and stores +-- it in a global data store. +-- The data acquisition is designed to be generic and extensible in +-- order to support different streaming software. +-- sfive also contains tools and applications to filter and visualize +-- live and recorded data. +-- +-- +-- Copyright (C) 2014 Christian Pointner +-- Markus Grueneis +-- +-- This file is part of sfive. +-- +-- sfive is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License version 3 +-- as published by the Free Software Foundation. +-- +-- sfive is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with sfive. If not, see . +-- +-- +-- Install this by adding the following to your nginx.conf +-- +-- http { +-- ..... +-- +-- lua_shared_dict sfive 64k; +-- lua_shared_dict sfive_locks 64k; +-- lua_shared_dict sfive_data 64m; +-- init_by_lua 's5 = require "s5-nginx.lua"'; +-- init_worker_by_lua 's5.init_worker("myhostname", 5, "/path/to/sfive.sock")'; +-- +-- ..... +-- +-- server { +-- +-- ..... +-- +-- location /path/to/hls/content-quality/ { +-- log_by_lua 's5.log("content", "hls", "quality")'; +-- } +-- +-- ..... +-- +-- } +-- + +locks = require "resty.lock" + +local _SFIVE = {} + +----------------------------------------------- +-- this will only be shared within one worker +local config = { + hostname = "unknown", + duration = 5, + sockpath = "/var/run/sfive.sock", +} +local sfive = ngx.shared.sfive +local sfive_data = ngx.shared.sfive_data +----------------------------------------------- + +function _SFIVE.log(content, format, quality) + -- TODO: acquire(data_lock), update(data), release(data_lock) + ngx.log(ngx.WARN, "SFive(log): won't add data from " .. ngx.var.remote_addr .. + " to (" .. content .. ", " .. format .. ", " .. quality .. "): not implemented yet!") + + -- local json = '{' + -- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' + -- json = json .. '"client": "' .. ngx.var.remote_addr .. '",' + -- json = json .. '"port": ' .. ngx.var.remote_port .. ',' + -- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' + -- json = json .. '"uri": "' .. ngx.var.uri .. '",' + -- json = json .. '"status": ' .. status .. ',' + -- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent + -- json = json .. '}' + +end + +function _SFIVE.connect() + ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'") + + local sock = ngx.socket.udp() + local ok, err = sock:setpeername("unix:" .. config.sockpath) + if not ok then + return nil, "connect to '" .. config.sockpath .. "' failed: " .. err + end + return sock +end + +function _SFIVE.send_data(sock, starttime) + + -- logs = sfive_data:get_keys() + -- for i, k in ipairs(logs) do + -- local ok, err = ngx.say(sfive_data:get(k)) + -- if not ok then + -- ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err) + -- break + -- end + -- sfive_data:delete(k) + -- end + + local json = '{' + json = json .. '"start-time": "' .. starttime .. 'Z",' + json = json .. '"duration-ms": "' .. config.duration * 1000 .. '"' + json = json .. '}\n' + return sock:send(json) +end + +function _SFIVE.send_init(sock) + local json = '{' + json = json .. '"version": 1,' + json = json .. '"hostname": "' .. config.hostname .. '"' + json = json .. '}\n' + return sock:send(json) +end + +function _SFIVE.task() + local sock, err = _SFIVE.connect(config.sockpath) + if not sock then + ngx.log(ngx.ERR, "SFive(task): connect(): " .. err) + return + else + local ok, err = _SFIVE.send_init(sock) + if not ok then + ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err) + sock:close() + return + end + end + + + while true do + local starttime = string.gsub(ngx.utctime(), " ", "T", 1) + + ngx.sleep(config.duration) + + -- TODO: acquire(data_lock), send(data), flush(data), release(data_lock) + local ok, err = _SFIVE.send_data(sock, starttime) + if not ok then + ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) + sock:close() + return + end + end +end + +function _SFIVE.launcher(premature) + local task = ngx.thread.spawn(_SFIVE.task) + ngx.thread.wait(task) + if premature then + return + end + local ok, err = ngx.timer.at(2, _SFIVE.launcher) + if not ok then + ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err) + inst.flags:delete("running") + return + end +end + +function _SFIVE.init() + local ok, err = sfive:add("running", true) + if not ok then + if err ~= "exists" then + ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err) + end + return + end + + sfive_data:flush_all() + sfive_data:flush_expired() + + ok, err = ngx.timer.at(0, _SFIVE.launcher) + if not ok then + ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err) + return + end + + ngx.log(ngx.INFO, "SFive(init): task initialized successfully!") +end + +function _SFIVE.init_worker(hostname, duration, sockpath) + config.hostname = hostname + config.duration = duration + config.sockpath = sockpath + _SFIVE.init() + ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!") +end + +ngx.log(ngx.DEBUG, "SFive: loaded successfully") +return _SFIVE -- cgit v1.2.3