From 8daa5e01c9f61b7905629b5098f6ccd017fe4539 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 19 Oct 2014 00:29:51 +0200 Subject: daq: nginx lua aggregated files into one --- src/daq/nginx-lua/s5-nginx.lua | 169 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 src/daq/nginx-lua/s5-nginx.lua (limited to 'src/daq/nginx-lua/s5-nginx.lua') diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua new file mode 100644 index 0000000..475a576 --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx.lua @@ -0,0 +1,169 @@ +-- +-- sfive +-- +-- sfive - spreadspace streaming statistics suite is a generic +-- statistic collection tool for streaming server infrastuctures. +-- The system collects and stores meta data like number of views +-- and throughput from a number of streaming servers and stores +-- it in a global data store. +-- The data acquisition is designed to be generic and extensible in +-- order to support different streaming software. +-- sfive also contains tools and applications to filter and visualize +-- live and recorded data. +-- +-- +-- Copyright (C) 2014 Christian Pointner +-- Markus Grueneis +-- +-- This file is part of sfive. +-- +-- sfive is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License version 3 +-- as published by the Free Software Foundation. +-- +-- sfive is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with sfive. If not, see . +-- +-- +-- Install this by adding the following to your nginx.conf +-- +-- http { +-- ..... +-- +-- lua_shared_dict sfive 64k; +-- lua_shared_dict sfive_locks 64k; +-- lua_shared_dict sfive_data 64m; +-- init_by_lua 's5 = require "s5-nginx.lua"'; +-- init_worker_by_lua 's5.init_worker(5)'; +-- +-- ..... +-- +-- server { +-- +-- ..... +-- +-- location /path/to/hls { +-- log_by_lua 's5.log()'; +-- } +-- +-- location /sfive { +-- allow 127.0.0.1; +-- allow ::1; +-- deny all; +-- +-- content_by_lua 's5.fetch()'; +-- } +-- +-- ..... +-- +-- } +-- + +locks = require "resty.lock" + +local _SFIVE = {} + +----------------------------------------------- +-- this will only be shared within one worker +local config = { + log_exptime = 30, + log_max_index = 1000000 +} +local sfive = ngx.shared.sfive +local sfive_data = ngx.shared.sfive_data +----------------------------------------------- + +function _SFIVE.log() + local status = ngx.var.status + if status == '200' or status == '206' then + local idx, err = sfive:incr("log:idx", 1) + if not idx then + ngx.log(ngx.ERR, "SFive(log): incrementing log index failed: " .. err) + else + local json = '{' + json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' + json = json .. '"client": "' .. ngx.var.remote_addr .. '",' + json = json .. '"port": ' .. ngx.var.remote_port .. ',' + json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' + json = json .. '"uri": "' .. ngx.var.uri .. '",' + json = json .. '"status": ' .. status .. ',' + json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent + json = json .. '}' + + local ok, err, force = sfive_data:add(idx, json, config.log_exptime) + if not ok then + ngx.log(ngx.ERR, "SFive(log): adding log line (".. idx .. ") to log store failed: " .. err) + elseif force then + ngx.log(ngx.WARN, "SFive(log): adding log line has overwritten other log lines - consider increasing the log store!") + end + end + end +end + +function _SFIVE.fetch() + local lock = locks:new("sfive_locks", { exptime = nil, timeout = 0.001 }) + local ok, err = lock:lock("fetch") + if not ok then + if err == "timeout" then + ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE) + else + ngx.log(ngx.ERR, "SFive(fetch): failed to acquire fetch lock: " .. err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end + else + ngx.log(ngx.INFO, "SFive(fetch): client connected") + while true do + logs = sfive_data:get_keys() + for i, k in ipairs(logs) do + local ok, err = ngx.say(sfive_data:get(k)) + if not ok then + ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err) + break + end + sfive_data:delete(k) + end + ngx.flush() + local idx, err = sfive:get("log:idx") + if not idx then + ngx.log(ngx.ERR, "SFive(fetch): failed to get log index: ", err) + else + if idx >= config.log_max_index then + local ok, err = sfive:replace("log:idx", 0) + if not ok then + ngx.log(ngx.ERR, "SFive(init): reseting log index counter failed: " .. err) + end + end + end + ngx.sleep(0.1) + end + ok, err = lock:unlock("fetch") + if not ok then + ngx.log(ngx.ERR, "SFive(fetch): unlock error: " .. err) + end + end +end + +function _SFIVE.init_worker(duration) + config.log_exptime = duration + config.log_max_index = 1000000 +end + +ngx.log(ngx.DEBUG, "SFive: loaded successfully") + +sfive:flush_all() +sfive:flush_expired() +sfive_data:flush_all() +sfive_data:flush_expired() +local ok, err = sfive:set("log:idx", 0) +if not ok then + ngx.log(ngx.ERR, "SFive(init): creating log index counter failed: " .. err) +else + ngx.log(ngx.INFO, "SFive(init): initialized successfully!") +end + +return _SFIVE -- cgit v1.2.3