summaryrefslogtreecommitdiff
path: root/src/daq/nginx-lua/s5-nginx.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/daq/nginx-lua/s5-nginx.lua')
-rw-r--r--src/daq/nginx-lua/s5-nginx.lua299
1 files changed, 0 insertions, 299 deletions
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua
deleted file mode 100644
index eedb85f..0000000
--- a/src/daq/nginx-lua/s5-nginx.lua
+++ /dev/null
@@ -1,299 +0,0 @@
---
--- sfive
---
--- sfive - spreadspace streaming statistics suite is a generic
--- statistic collection tool for streaming server infrastuctures.
--- The system collects and stores meta data like number of views
--- and throughput from a number of streaming servers and stores
--- it in a global data store.
--- The data acquisition is designed to be generic and extensible in
--- order to support different streaming software.
--- sfive also contains tools and applications to filter and visualize
--- live and recorded data.
---
---
--- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
--- Markus Grueneis <gimpf@gimpf.org>
---
--- This file is part of sfive.
---
--- sfive is free software: you can redistribute it and/or modify
--- it under the terms of the GNU General Public License version 3
--- as published by the Free Software Foundation.
---
--- sfive is distributed in the hope that it will be useful,
--- but WITHOUT ANY WARRANTY; without even the implied warranty of
--- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
--- GNU General Public License for more details.
---
--- You should have received a copy of the GNU General Public License
--- along with sfive. If not, see <http://www.gnu.org/licenses/>.
---
---
--- Install this by adding the following to your nginx.conf
---
--- http {
--- .....
---
--- lua_shared_dict sfive 64k;
--- lua_shared_dict sfive_locks 64k;
--- lua_shared_dict sfive_data 64m;
--- init_by_lua 's5 = require "s5-nginx.lua"';
--- init_worker_by_lua '
--- s5.init_worker("myhostname", 5, "/path/to/sfive.sock",
--- { "tag1", "tag2" },
--- { { content="A", format="1", quality="low" },
--- { content="A", format="1", quality="high" },
--- { content="B", format="1", quality="low" },
--- { content="B", format="1", quality="high" },
--- { content="A", format="2", quality="low" },
--- { content="A", format="2", quality="high" },
--- { content="B", format="2", quality="low" },
--- { content="B", format="2", quality="high" } })
--- ';
---
--- .....
---
--- server {
---
--- .....
---
--- location /path/to/fromat1/contentA-low/ {
--- log_by_lua 's5.log("A", "1", "low")';
--- }
--- location /path/to/fromat1/contentA-high/ {
--- log_by_lua 's5.log("A", "1", "high")';
--- }
--- .....
---
--- }
---
-
-locks = require "resty.lock"
-
-local _SFIVE = {}
-
------------------------------------------------
--- this will only be shared within one worker
-local config = {
- hostname = "unknown",
- duration = 5,
- sockpath = "/var/run/sfive.sock",
- tags = {},
- streamer = {}
-}
-local sfive = ngx.shared.sfive
-local sfive_data = ngx.shared.sfive_data
------------------------------------------------
-
-function _SFIVE.log(content, format, quality)
- local status = ngx.var.status
- if status == '200' or status == '206' then
- local lock = locks:new("sfive_locks")
- local elapsed, err = lock:lock("data")
- if not elapsed then
- ngx.log(ngx.ERR, "SFive(log): lock error: " .. err)
- ret = false
- else
- local remote_addr = ngx.var.remote_addr
- local bytes_sent = tonumber(ngx.var.bytes_sent)
- local key = table.concat({content, format, quality, remote_addr}, ';')
- local ok, err = sfive_data:incr(key, bytes_sent)
- if not ok then
- if err == "not found" then
- local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5)
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err)
- end
- else
- ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err)
- end
- end
- ok, err = lock:unlock("data")
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err)
- ret = false
- end
- end
- return ret
- end
-end
-
-local connect = function()
- ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'")
-
- local sock = ngx.socket.udp()
- local ok, err = sock:setpeername("unix:" .. config.sockpath)
- if not ok then
- return nil, "connect to '" .. config.sockpath .. "' failed: " .. err
- end
- return sock
-end
-
-local get_data_streamer = function(keys, content, format, quality)
- local clients = {}
- local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$'
- for i, k in ipairs(keys) do
- local m, err = ngx.re.match(k, re)
- if m then
- local bs, err = sfive_data:get(k)
- if not bs then
- ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?")
- else
- clients[m[1]] = (clients[m[1]] or 0) + bs
- end
- end
- end
-
- local cnt = 0
- local bytes_sent = 0
- json = '{ clients: ['
- for k, v in pairs(clients) do
- if cnt > 0 then json = json .. ',' end
- json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}'
- bytes_sent = bytes_sent + v
- cnt = cnt + 1
- end
- json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }'
- return json
-end
-
-local send_data = function(sock, starttime)
- local keys = sfive_data:get_keys(0)
- for i, s in ipairs(config.streamer) do
- local json = '{'
- json = json .. '"start-time": "' .. starttime .. 'Z", '
- json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", '
- json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' ..
- '"format": "' .. s.format .. '", ' ..
- '"quality": "' .. s.quality .. '" }, '
- json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality)
- json = json .. '}\n'
- while true do
- -- TODO: this should probably be done inside a seperate writer thread
- local ok, err = sock:send(json)
- if not ok then
- if err ~= "resource temporarily unavailable" then
- return nil, err
- end
- ngx.sleep(0.02)
- else
- break
- end
- end
- end
- return true
-end
-
-local send_init = function(sock)
- local json = '{'
- json = json .. '"version": 1, '
- json = json .. '"hostname": "' .. config.hostname .. '", '
- json = json .. '"tags": ['
- for i, k in ipairs(config.tags) do
- json = json .. (i==1 and '' or ',') .. '"' .. k .. '"'
- end
- json = json .. ']'
- json = json .. '}\n'
- return sock:send(json)
-end
-
-local send_updates = function(sock, starttime, lock)
- ret = true
- local elapsed, err = lock:lock("data")
- if not elapsed then
- ngx.log(ngx.ERR, "SFive(task): lock error: " .. err)
- ret = false
- else
- local ok, err = send_data(sock, starttime)
- if not ok then
- ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err)
- sock:close()
- ret = false
- else
- sfive_data:flush_all()
- sfive_data:flush_expired()
- end
-
- ok, err = lock:unlock("data")
- if not ok then
- ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err)
- ret = false
- end
- end
- return ret
-end
-
-local task = function()
- local sock, err = connect(config.sockpath)
- if not sock then
- ngx.log(ngx.ERR, "SFive(task): connect(): " .. err)
- return
- else
- local ok, err = send_init(sock)
- if not ok then
- ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err)
- sock:close()
- return
- end
- end
- sfive_data:flush_all()
- sfive_data:flush_expired()
- local lock = locks:new("sfive_locks")
- while true do
- local starttime = string.gsub(ngx.utctime(), " ", "T", 1)
- ngx.sleep(config.duration)
- if not send_updates(sock, starttime, lock) then
- return
- end
- end
-end
-
-local launcher
-launcher = function(premature)
- local task = ngx.thread.spawn(task)
- ngx.thread.wait(task)
- if premature then
- return
- end
- local ok, err = ngx.timer.at(2, launcher)
- if not ok then
- ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err)
- inst.flags:delete("running")
- return
- end
-end
-
-local init = function()
- local ok, err = sfive:add("running", true)
- if not ok then
- if err ~= "exists" then
- ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err)
- end
- return
- end
-
- sfive_data:flush_all()
- sfive_data:flush_expired()
-
- ok, err = ngx.timer.at(0, launcher)
- if not ok then
- ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err)
- return
- end
-
- ngx.log(ngx.INFO, "SFive(init): task initialized successfully!")
-end
-
-function _SFIVE.init_worker(hostname, duration, sockpath, tags, streamer)
- config.hostname = hostname
- config.duration = duration
- config.sockpath = sockpath
- config.tags = (tags and tags or {})
- config.streamer = (streamer and streamer or {})
- init()
- ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!")
-end
-
-ngx.log(ngx.DEBUG, "SFive: loaded successfully")
-return _SFIVE