summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-18 23:37:27 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-18 23:38:27 +0200
commitd05240899cb00e0431b4c6b63d1be2efaed6eecf (patch)
tree0ca374666672dc8d0dbd2267f0fe6d67639def99 /src/daq
parentdaq: nginx-lua works now but it has possible unresolvable issues (diff)
reverted to old nginx-lua model
Diffstat (limited to 'src/daq')
-rw-r--r--src/daq/nginx-lua/s5-nginx-fetch.lua73
-rw-r--r--src/daq/nginx-lua/s5-nginx-init.lua70
-rw-r--r--src/daq/nginx-lua/s5-nginx-log.lua100
-rw-r--r--src/daq/nginx-lua/s5-nginx.lua299
4 files changed, 243 insertions, 299 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-fetch.lua b/src/daq/nginx-lua/s5-nginx-fetch.lua
new file mode 100644
index 0000000..8821094
--- /dev/null
+++ b/src/daq/nginx-lua/s5-nginx-fetch.lua
@@ -0,0 +1,73 @@
+--
+-- sfive
+--
+-- sfive - spreadspace streaming statistics suite is a generic
+-- statistic collection tool for streaming server infrastuctures.
+-- The system collects and stores meta data like number of views
+-- and throughput from a number of streaming servers and stores
+-- it in a global data store.
+-- The data acquisition is designed to be generic and extensible in
+-- order to support different streaming software.
+-- sfive also contains tools and applications to filter and visualize
+-- live and recorded data.
+--
+--
+-- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+-- Markus Grueneis <gimpf@gimpf.org>
+--
+-- This file is part of sfive.
+--
+-- sfive is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License version 3
+-- as published by the Free Software Foundation.
+--
+-- sfive is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with sfive. If not, see <http://www.gnu.org/licenses/>.
+--
+--
+-- Install this by adding the following to your nginx.conf
+--
+-- location /sfive {
+-- allow 127.0.0.1;
+-- allow ::1;
+-- deny all;
+--
+-- content_by_lua_file '/path/to/s5-nginx-fetch.lua';
+-- }
+--
+
+local lock = locks:new("sfive_locks", { exptime = nil, timeout = 0.001 })
+local ok, err = lock:lock("fetch")
+if not ok then
+ if err == "timeout" then
+ ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE)
+ else
+ ngx.log(ngx.ERR, "SFive(fetch): failed to acquire fetch lock: " .. err)
+ ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
+ end
+else
+ ngx.log(ngx.INFO, "SFive(fetch): client connected")
+ local sfive_data = ngx.shared.sfive_data
+ while true do
+ logs = sfive_data:get_keys()
+ for i, k in ipairs(logs) do
+ local ok, err = ngx.say(sfive_data:get(k))
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err)
+ break
+ end
+ sfive_data:delete(k)
+ end
+ ngx.flush()
+ ngx.sleep(0.1)
+ end
+ ok, err = lock:unlock("fetch")
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(fetch): unlock error: " .. err)
+ end
+end
diff --git a/src/daq/nginx-lua/s5-nginx-init.lua b/src/daq/nginx-lua/s5-nginx-init.lua
new file mode 100644
index 0000000..7270e23
--- /dev/null
+++ b/src/daq/nginx-lua/s5-nginx-init.lua
@@ -0,0 +1,70 @@
+--
+-- sfive
+--
+-- sfive - spreadspace streaming statistics suite is a generic
+-- statistic collection tool for streaming server infrastuctures.
+-- The system collects and stores meta data like number of views
+-- and throughput from a number of streaming servers and stores
+-- it in a global data store.
+-- The data acquisition is designed to be generic and extensible in
+-- order to support different streaming software.
+-- sfive also contains tools and applications to filter and visualize
+-- live and recorded data.
+--
+--
+-- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+-- Markus Grueneis <gimpf@gimpf.org>
+--
+-- This file is part of sfive.
+--
+-- sfive is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License version 3
+-- as published by the Free Software Foundation.
+--
+-- sfive is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with sfive. If not, see <http://www.gnu.org/licenses/>.
+--
+--
+-- Install this by adding the following to your nginx.conf
+--
+-- http {
+-- lua_shared_dict sfive 64k;
+-- lua_shared_dict sfive_locks 64k;
+-- lua_shared_dict sfive_data 64m;
+-- init_by_lua_file '/path/to/s5-nginx-init.lua';
+-- }
+--
+
+ngx.log(ngx.DEBUG, "SFive: loaded")
+
+locks = require "resty.lock"
+
+local sfive = ngx.shared.sfive
+sfive:flush_all()
+sfive:flush_expired()
+local sfive_data = ngx.shared.sfive_data
+sfive_data:flush_all()
+sfive_data:flush_expired()
+
+-- try to create locks so we get an error on init and not on first usage
+local lock = locks:new("sfive_locks")
+local elapsed, err = lock:lock("test")
+if not elapsed then
+ ngx.log(ngx.ERR, "SFive(init): lock error: " .. err)
+else
+ local ok, err, force = sfive:set("log:idx", 0)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(init): creating log index counter failed: " .. err)
+ else
+ ngx.log(ngx.INFO, "SFive(init): initialized successfully!")
+ end
+ ok, err = lock:unlock("test")
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err)
+ end
+end
diff --git a/src/daq/nginx-lua/s5-nginx-log.lua b/src/daq/nginx-lua/s5-nginx-log.lua
new file mode 100644
index 0000000..43d2394
--- /dev/null
+++ b/src/daq/nginx-lua/s5-nginx-log.lua
@@ -0,0 +1,100 @@
+--
+-- sfive
+--
+-- sfive - spreadspace streaming statistics suite is a generic
+-- statistic collection tool for streaming server infrastuctures.
+-- The system collects and stores meta data like number of views
+-- and throughput from a number of streaming servers and stores
+-- it in a global data store.
+-- The data acquisition is designed to be generic and extensible in
+-- order to support different streaming software.
+-- sfive also contains tools and applications to filter and visualize
+-- live and recorded data.
+--
+--
+-- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+-- Markus Grueneis <gimpf@gimpf.org>
+--
+-- This file is part of sfive.
+--
+-- sfive is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License version 3
+-- as published by the Free Software Foundation.
+--
+-- sfive is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with sfive. If not, see <http://www.gnu.org/licenses/>.
+--
+--
+-- Install this by adding the following to your nginx.conf
+--
+-- location /path/to/hls {
+-- log_by_lua_file '/path/to/s5-nginx-log.lua';
+-- }
+--
+
+local cleanup_delay = 1200
+local log_exptime = 600
+
+local log_cleanup = function(premature)
+ local sfive = ngx.shared.sfive
+ sfive:flush_expired()
+ if premature then
+ return
+ end
+ local ok, err = ngx.timer.at(cleanup_delay, log_cleanup)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(log): failed to reenqueue log_cleanup: ", err)
+ sfive:delete("log:cleanup_running")
+ return
+ end
+end
+
+local start_cleanup_thread = function(sfive)
+ local ok, err, force = sfive:add("log:cleanup_running", 1)
+ if not ok then
+ if err ~= "exists" then
+ ngx.log(ngx.ERR, "SFive(log): failed to set cleanup_running flag: " .. err)
+ end
+ return
+ end
+ ok, err = ngx.timer.at(cleanup_delay, log_cleanup)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(log): failed to enqueue log_cleanup: ", err)
+ sfive:delete("log:cleanup_running")
+ return
+ end
+end
+
+local status = ngx.var.status
+
+if status == '200' or status == '206' then
+ local sfive = ngx.shared.sfive
+ local idx, err = sfive:incr("log:idx", 1)
+ if not idx then
+ ngx.log(ngx.ERR, "SFive(log): incrementing log index failed: " .. err)
+ else
+ local json = '{'
+ json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",'
+ json = json .. '"client": "' .. ngx.var.remote_addr .. '",'
+ json = json .. '"port": ' .. ngx.var.remote_port .. ','
+ json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",'
+ json = json .. '"uri": "' .. ngx.var.uri .. '",'
+ json = json .. '"status": ' .. status .. ','
+ json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent
+ json = json .. '}'
+
+ local sfive_data = ngx.shared.sfive_data
+ local ok, err, force = sfive_data:add(idx, json, log_exptime)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(log): adding log line (".. idx .. ") to log store failed: " .. err)
+ elseif force then
+ ngx.log(ngx.WARN, "SFive(log): adding log line has overwritten other log lines - consider increasing the log store!")
+ end
+ start_cleanup_thread(sfive)
+ end
+end
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua
deleted file mode 100644
index eedb85f..0000000
--- a/src/daq/nginx-lua/s5-nginx.lua
+++ /dev/null
@@ -1,299 +0,0 @@
---
--- sfive
---
--- sfive - spreadspace streaming statistics suite is a generic
--- statistic collection tool for streaming server infrastuctures.
--- The system collects and stores meta data like number of views
--- and throughput from a number of streaming servers and stores
--- it in a global data store.
--- The data acquisition is designed to be generic and extensible in
--- order to support different streaming software.
--- sfive also contains tools and applications to filter and visualize
--- live and recorded data.
---
---
--- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
--- Markus Grueneis <gimpf@gimpf.org>
---
--- This file is part of sfive.
---
--- sfive is free software: you can redistribute it and/or modify
--- it under the terms of the GNU General Public License version 3
--- as published by the Free Software Foundation.
---
--- sfive is distributed in the hope that it will be useful,
--- but WITHOUT ANY WARRANTY; without even the implied warranty of
--- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
--- GNU General Public License for more details.
---
--- You should have received a copy of the GNU General Public License
--- along with sfive. If not, see <http://www.gnu.org/licenses/>.
---
---
--- Install this by adding the following to your nginx.conf
---
--- http {
--- .....
---
--- lua_shared_dict sfive 64k;
--- lua_shared_dict sfive_locks 64k;
--- lua_shared_dict sfive_data 64m;
--- init_by_lua 's5 = require "s5-nginx.lua"';
--- init_worker_by_lua '
--- s5.init_worker("myhostname", 5, "/path/to/sfive.sock",
--- { "tag1", "tag2" },
--- { { content="A", format="1", quality="low" },
--- { content="A", format="1", quality="high" },
--- { content="B", format="1", quality="low" },
--- { content="B", format="1", quality="high" },
--- { content="A", format="2", quality="low" },
--- { content="A", format="2", quality="high" },
--- { content="B", format="2", quality="low" },
--- { content="B", format="2", quality="high" } })
--- ';
---
--- .....
---
--- server {
---
--- .....
---
--- location /path/to/fromat1/contentA-low/ {
--- log_by_lua 's5.log("A", "1", "low")';
--- }
--- location /path/to/fromat1/contentA-high/ {
--- log_by_lua 's5.log("A", "1", "high")';
--- }
--- .....
---
--- }
---
-
-locks = require "resty.lock"
-
-local _SFIVE = {}
-
------------------------------------------------
--- this will only be shared within one worker
-local config = {
- hostname = "unknown",
- duration = 5,
- sockpath = "/var/run/sfive.sock",
- tags = {},
- streamer = {}
-}
-local sfive = ngx.shared.sfive
-local sfive_data = ngx.shared.sfive_data
------------------------------------------------
-
-function _SFIVE.log(content, format, quality)
- local status = ngx.var.status
- if status == '200' or status == '206' then
- local lock = locks:new("sfive_locks")
- local elapsed, err = lock:lock("data")
- if not elapsed then
- ngx.log(ngx.ERR, "SFive(log): lock error: " .. err)
- ret = false
- else
- local remote_addr = ngx.var.remote_addr
- local bytes_sent = tonumber(ngx.var.bytes_sent)
- local key = table.concat({content, format, quality, remote_addr}, ';')
- local ok, err = sfive_data:incr(key, bytes_sent)
- if not ok then
- if err == "not found" then
- local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5)
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err)
- end
- else
- ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err)
- end
- end
- ok, err = lock:unlock("data")
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err)
- ret = false
- end
- end
- return ret
- end
-end
-
-local connect = function()
- ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'")
-
- local sock = ngx.socket.udp()
- local ok, err = sock:setpeername("unix:" .. config.sockpath)
- if not ok then
- return nil, "connect to '" .. config.sockpath .. "' failed: " .. err
- end
- return sock
-end
-
-local get_data_streamer = function(keys, content, format, quality)
- local clients = {}
- local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$'
- for i, k in ipairs(keys) do
- local m, err = ngx.re.match(k, re)
- if m then
- local bs, err = sfive_data:get(k)
- if not bs then
- ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?")
- else
- clients[m[1]] = (clients[m[1]] or 0) + bs
- end
- end
- end
-
- local cnt = 0
- local bytes_sent = 0
- json = '{ clients: ['
- for k, v in pairs(clients) do
- if cnt > 0 then json = json .. ',' end
- json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}'
- bytes_sent = bytes_sent + v
- cnt = cnt + 1
- end
- json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }'
- return json
-end
-
-local send_data = function(sock, starttime)
- local keys = sfive_data:get_keys(0)
- for i, s in ipairs(config.streamer) do
- local json = '{'
- json = json .. '"start-time": "' .. starttime .. 'Z", '
- json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", '
- json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' ..
- '"format": "' .. s.format .. '", ' ..
- '"quality": "' .. s.quality .. '" }, '
- json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality)
- json = json .. '}\n'
- while true do
- -- TODO: this should probably be done inside a seperate writer thread
- local ok, err = sock:send(json)
- if not ok then
- if err ~= "resource temporarily unavailable" then
- return nil, err
- end
- ngx.sleep(0.02)
- else
- break
- end
- end
- end
- return true
-end
-
-local send_init = function(sock)
- local json = '{'
- json = json .. '"version": 1, '
- json = json .. '"hostname": "' .. config.hostname .. '", '
- json = json .. '"tags": ['
- for i, k in ipairs(config.tags) do
- json = json .. (i==1 and '' or ',') .. '"' .. k .. '"'
- end
- json = json .. ']'
- json = json .. '}\n'
- return sock:send(json)
-end
-
-local send_updates = function(sock, starttime, lock)
- ret = true
- local elapsed, err = lock:lock("data")
- if not elapsed then
- ngx.log(ngx.ERR, "SFive(task): lock error: " .. err)
- ret = false
- else
- local ok, err = send_data(sock, starttime)
- if not ok then
- ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err)
- sock:close()
- ret = false
- else
- sfive_data:flush_all()
- sfive_data:flush_expired()
- end
-
- ok, err = lock:unlock("data")
- if not ok then
- ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err)
- ret = false
- end
- end
- return ret
-end
-
-local task = function()
- local sock, err = connect(config.sockpath)
- if not sock then
- ngx.log(ngx.ERR, "SFive(task): connect(): " .. err)
- return
- else
- local ok, err = send_init(sock)
- if not ok then
- ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err)
- sock:close()
- return
- end
- end
- sfive_data:flush_all()
- sfive_data:flush_expired()
- local lock = locks:new("sfive_locks")
- while true do
- local starttime = string.gsub(ngx.utctime(), " ", "T", 1)
- ngx.sleep(config.duration)
- if not send_updates(sock, starttime, lock) then
- return
- end
- end
-end
-
-local launcher
-launcher = function(premature)
- local task = ngx.thread.spawn(task)
- ngx.thread.wait(task)
- if premature then
- return
- end
- local ok, err = ngx.timer.at(2, launcher)
- if not ok then
- ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err)
- inst.flags:delete("running")
- return
- end
-end
-
-local init = function()
- local ok, err = sfive:add("running", true)
- if not ok then
- if err ~= "exists" then
- ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err)
- end
- return
- end
-
- sfive_data:flush_all()
- sfive_data:flush_expired()
-
- ok, err = ngx.timer.at(0, launcher)
- if not ok then
- ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err)
- return
- end
-
- ngx.log(ngx.INFO, "SFive(init): task initialized successfully!")
-end
-
-function _SFIVE.init_worker(hostname, duration, sockpath, tags, streamer)
- config.hostname = hostname
- config.duration = duration
- config.sockpath = sockpath
- config.tags = (tags and tags or {})
- config.streamer = (streamer and streamer or {})
- init()
- ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!")
-end
-
-ngx.log(ngx.DEBUG, "SFive: loaded successfully")
-return _SFIVE