summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-18 04:35:51 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-18 04:35:51 +0200
commit47f172dbcbe0cae3a6882acc46c74a755507afec (patch)
tree8f98c98b9ff8579181f35d61c4884f337a041b67 /src/daq
parentmoved to resty-lock for fetch (diff)
nginx lua now makes all work by itself
Diffstat (limited to 'src/daq')
-rw-r--r--src/daq/nginx-lua/s5-nginx-fetch.lua73
-rw-r--r--src/daq/nginx-lua/s5-nginx-init.lua70
-rw-r--r--src/daq/nginx-lua/s5-nginx-log.lua100
-rw-r--r--src/daq/nginx-lua/s5-nginx.lua203
4 files changed, 203 insertions, 243 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-fetch.lua b/src/daq/nginx-lua/s5-nginx-fetch.lua
deleted file mode 100644
index 8821094..0000000
--- a/src/daq/nginx-lua/s5-nginx-fetch.lua
+++ /dev/null
@@ -1,73 +0,0 @@
---
--- sfive
---
--- sfive - spreadspace streaming statistics suite is a generic
--- statistic collection tool for streaming server infrastuctures.
--- The system collects and stores meta data like number of views
--- and throughput from a number of streaming servers and stores
--- it in a global data store.
--- The data acquisition is designed to be generic and extensible in
--- order to support different streaming software.
--- sfive also contains tools and applications to filter and visualize
--- live and recorded data.
---
---
--- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
--- Markus Grueneis <gimpf@gimpf.org>
---
--- This file is part of sfive.
---
--- sfive is free software: you can redistribute it and/or modify
--- it under the terms of the GNU General Public License version 3
--- as published by the Free Software Foundation.
---
--- sfive is distributed in the hope that it will be useful,
--- but WITHOUT ANY WARRANTY; without even the implied warranty of
--- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
--- GNU General Public License for more details.
---
--- You should have received a copy of the GNU General Public License
--- along with sfive. If not, see <http://www.gnu.org/licenses/>.
---
---
--- Install this by adding the following to your nginx.conf
---
--- location /sfive {
--- allow 127.0.0.1;
--- allow ::1;
--- deny all;
---
--- content_by_lua_file '/path/to/s5-nginx-fetch.lua';
--- }
---
-
-local lock = locks:new("sfive_locks", { exptime = nil, timeout = 0.001 })
-local ok, err = lock:lock("fetch")
-if not ok then
- if err == "timeout" then
- ngx.exit(ngx.HTTP_SERVICE_UNAVAILABLE)
- else
- ngx.log(ngx.ERR, "SFive(fetch): failed to acquire fetch lock: " .. err)
- ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
- end
-else
- ngx.log(ngx.INFO, "SFive(fetch): client connected")
- local sfive_data = ngx.shared.sfive_data
- while true do
- logs = sfive_data:get_keys()
- for i, k in ipairs(logs) do
- local ok, err = ngx.say(sfive_data:get(k))
- if not ok then
- ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err)
- break
- end
- sfive_data:delete(k)
- end
- ngx.flush()
- ngx.sleep(0.1)
- end
- ok, err = lock:unlock("fetch")
- if not ok then
- ngx.log(ngx.ERR, "SFive(fetch): unlock error: " .. err)
- end
-end
diff --git a/src/daq/nginx-lua/s5-nginx-init.lua b/src/daq/nginx-lua/s5-nginx-init.lua
deleted file mode 100644
index 7270e23..0000000
--- a/src/daq/nginx-lua/s5-nginx-init.lua
+++ /dev/null
@@ -1,70 +0,0 @@
---
--- sfive
---
--- sfive - spreadspace streaming statistics suite is a generic
--- statistic collection tool for streaming server infrastuctures.
--- The system collects and stores meta data like number of views
--- and throughput from a number of streaming servers and stores
--- it in a global data store.
--- The data acquisition is designed to be generic and extensible in
--- order to support different streaming software.
--- sfive also contains tools and applications to filter and visualize
--- live and recorded data.
---
---
--- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
--- Markus Grueneis <gimpf@gimpf.org>
---
--- This file is part of sfive.
---
--- sfive is free software: you can redistribute it and/or modify
--- it under the terms of the GNU General Public License version 3
--- as published by the Free Software Foundation.
---
--- sfive is distributed in the hope that it will be useful,
--- but WITHOUT ANY WARRANTY; without even the implied warranty of
--- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
--- GNU General Public License for more details.
---
--- You should have received a copy of the GNU General Public License
--- along with sfive. If not, see <http://www.gnu.org/licenses/>.
---
---
--- Install this by adding the following to your nginx.conf
---
--- http {
--- lua_shared_dict sfive 64k;
--- lua_shared_dict sfive_locks 64k;
--- lua_shared_dict sfive_data 64m;
--- init_by_lua_file '/path/to/s5-nginx-init.lua';
--- }
---
-
-ngx.log(ngx.DEBUG, "SFive: loaded")
-
-locks = require "resty.lock"
-
-local sfive = ngx.shared.sfive
-sfive:flush_all()
-sfive:flush_expired()
-local sfive_data = ngx.shared.sfive_data
-sfive_data:flush_all()
-sfive_data:flush_expired()
-
--- try to create locks so we get an error on init and not on first usage
-local lock = locks:new("sfive_locks")
-local elapsed, err = lock:lock("test")
-if not elapsed then
- ngx.log(ngx.ERR, "SFive(init): lock error: " .. err)
-else
- local ok, err, force = sfive:set("log:idx", 0)
- if not ok then
- ngx.log(ngx.ERR, "SFive(init): creating log index counter failed: " .. err)
- else
- ngx.log(ngx.INFO, "SFive(init): initialized successfully!")
- end
- ok, err = lock:unlock("test")
- if not ok then
- ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err)
- end
-end
diff --git a/src/daq/nginx-lua/s5-nginx-log.lua b/src/daq/nginx-lua/s5-nginx-log.lua
deleted file mode 100644
index 43d2394..0000000
--- a/src/daq/nginx-lua/s5-nginx-log.lua
+++ /dev/null
@@ -1,100 +0,0 @@
---
--- sfive
---
--- sfive - spreadspace streaming statistics suite is a generic
--- statistic collection tool for streaming server infrastuctures.
--- The system collects and stores meta data like number of views
--- and throughput from a number of streaming servers and stores
--- it in a global data store.
--- The data acquisition is designed to be generic and extensible in
--- order to support different streaming software.
--- sfive also contains tools and applications to filter and visualize
--- live and recorded data.
---
---
--- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
--- Markus Grueneis <gimpf@gimpf.org>
---
--- This file is part of sfive.
---
--- sfive is free software: you can redistribute it and/or modify
--- it under the terms of the GNU General Public License version 3
--- as published by the Free Software Foundation.
---
--- sfive is distributed in the hope that it will be useful,
--- but WITHOUT ANY WARRANTY; without even the implied warranty of
--- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
--- GNU General Public License for more details.
---
--- You should have received a copy of the GNU General Public License
--- along with sfive. If not, see <http://www.gnu.org/licenses/>.
---
---
--- Install this by adding the following to your nginx.conf
---
--- location /path/to/hls {
--- log_by_lua_file '/path/to/s5-nginx-log.lua';
--- }
---
-
-local cleanup_delay = 1200
-local log_exptime = 600
-
-local log_cleanup = function(premature)
- local sfive = ngx.shared.sfive
- sfive:flush_expired()
- if premature then
- return
- end
- local ok, err = ngx.timer.at(cleanup_delay, log_cleanup)
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): failed to reenqueue log_cleanup: ", err)
- sfive:delete("log:cleanup_running")
- return
- end
-end
-
-local start_cleanup_thread = function(sfive)
- local ok, err, force = sfive:add("log:cleanup_running", 1)
- if not ok then
- if err ~= "exists" then
- ngx.log(ngx.ERR, "SFive(log): failed to set cleanup_running flag: " .. err)
- end
- return
- end
- ok, err = ngx.timer.at(cleanup_delay, log_cleanup)
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): failed to enqueue log_cleanup: ", err)
- sfive:delete("log:cleanup_running")
- return
- end
-end
-
-local status = ngx.var.status
-
-if status == '200' or status == '206' then
- local sfive = ngx.shared.sfive
- local idx, err = sfive:incr("log:idx", 1)
- if not idx then
- ngx.log(ngx.ERR, "SFive(log): incrementing log index failed: " .. err)
- else
- local json = '{'
- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",'
- json = json .. '"client": "' .. ngx.var.remote_addr .. '",'
- json = json .. '"port": ' .. ngx.var.remote_port .. ','
- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",'
- json = json .. '"uri": "' .. ngx.var.uri .. '",'
- json = json .. '"status": ' .. status .. ','
- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent
- json = json .. '}'
-
- local sfive_data = ngx.shared.sfive_data
- local ok, err, force = sfive_data:add(idx, json, log_exptime)
- if not ok then
- ngx.log(ngx.ERR, "SFive(log): adding log line (".. idx .. ") to log store failed: " .. err)
- elseif force then
- ngx.log(ngx.WARN, "SFive(log): adding log line has overwritten other log lines - consider increasing the log store!")
- end
- start_cleanup_thread(sfive)
- end
-end
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua
new file mode 100644
index 0000000..7a3e9ed
--- /dev/null
+++ b/src/daq/nginx-lua/s5-nginx.lua
@@ -0,0 +1,203 @@
+--
+-- sfive
+--
+-- sfive - spreadspace streaming statistics suite is a generic
+-- statistic collection tool for streaming server infrastuctures.
+-- The system collects and stores meta data like number of views
+-- and throughput from a number of streaming servers and stores
+-- it in a global data store.
+-- The data acquisition is designed to be generic and extensible in
+-- order to support different streaming software.
+-- sfive also contains tools and applications to filter and visualize
+-- live and recorded data.
+--
+--
+-- Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+-- Markus Grueneis <gimpf@gimpf.org>
+--
+-- This file is part of sfive.
+--
+-- sfive is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License version 3
+-- as published by the Free Software Foundation.
+--
+-- sfive is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with sfive. If not, see <http://www.gnu.org/licenses/>.
+--
+--
+-- Install this by adding the following to your nginx.conf
+--
+-- http {
+-- .....
+--
+-- lua_shared_dict sfive 64k;
+-- lua_shared_dict sfive_locks 64k;
+-- lua_shared_dict sfive_data 64m;
+-- init_by_lua 's5 = require "s5-nginx.lua"';
+-- init_worker_by_lua 's5.init_worker("myhostname", 5, "/path/to/sfive.sock")';
+--
+-- .....
+--
+-- server {
+--
+-- .....
+--
+-- location /path/to/hls/content-quality/ {
+-- log_by_lua 's5.log("content", "hls", "quality")';
+-- }
+--
+-- .....
+--
+-- }
+--
+
+locks = require "resty.lock"
+
+local _SFIVE = {}
+
+-----------------------------------------------
+-- this will only be shared within one worker
+local config = {
+ hostname = "unknown",
+ duration = 5,
+ sockpath = "/var/run/sfive.sock",
+}
+local sfive = ngx.shared.sfive
+local sfive_data = ngx.shared.sfive_data
+-----------------------------------------------
+
+function _SFIVE.log(content, format, quality)
+ -- TODO: acquire(data_lock), update(data), release(data_lock)
+ ngx.log(ngx.WARN, "SFive(log): won't add data from " .. ngx.var.remote_addr ..
+ " to (" .. content .. ", " .. format .. ", " .. quality .. "): not implemented yet!")
+
+ -- local json = '{'
+ -- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",'
+ -- json = json .. '"client": "' .. ngx.var.remote_addr .. '",'
+ -- json = json .. '"port": ' .. ngx.var.remote_port .. ','
+ -- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",'
+ -- json = json .. '"uri": "' .. ngx.var.uri .. '",'
+ -- json = json .. '"status": ' .. status .. ','
+ -- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent
+ -- json = json .. '}'
+
+end
+
+function _SFIVE.connect()
+ ngx.log(ngx.INFO, "SFive(connect): trying to connect to '" .. config.sockpath .."'")
+
+ local sock = ngx.socket.udp()
+ local ok, err = sock:setpeername("unix:" .. config.sockpath)
+ if not ok then
+ return nil, "connect to '" .. config.sockpath .. "' failed: " .. err
+ end
+ return sock
+end
+
+function _SFIVE.send_data(sock, starttime)
+
+ -- logs = sfive_data:get_keys()
+ -- for i, k in ipairs(logs) do
+ -- local ok, err = ngx.say(sfive_data:get(k))
+ -- if not ok then
+ -- ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err)
+ -- break
+ -- end
+ -- sfive_data:delete(k)
+ -- end
+
+ local json = '{'
+ json = json .. '"start-time": "' .. starttime .. 'Z",'
+ json = json .. '"duration-ms": "' .. config.duration * 1000 .. '"'
+ json = json .. '}\n'
+ return sock:send(json)
+end
+
+function _SFIVE.send_init(sock)
+ local json = '{'
+ json = json .. '"version": 1,'
+ json = json .. '"hostname": "' .. config.hostname .. '"'
+ json = json .. '}\n'
+ return sock:send(json)
+end
+
+function _SFIVE.task()
+ local sock, err = _SFIVE.connect(config.sockpath)
+ if not sock then
+ ngx.log(ngx.ERR, "SFive(task): connect(): " .. err)
+ return
+ else
+ local ok, err = _SFIVE.send_init(sock)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(task): sending init failed: " .. err)
+ sock:close()
+ return
+ end
+ end
+
+
+ while true do
+ local starttime = string.gsub(ngx.utctime(), " ", "T", 1)
+
+ ngx.sleep(config.duration)
+
+ -- TODO: acquire(data_lock), send(data), flush(data), release(data_lock)
+ local ok, err = _SFIVE.send_data(sock, starttime)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err)
+ sock:close()
+ return
+ end
+ end
+end
+
+function _SFIVE.launcher(premature)
+ local task = ngx.thread.spawn(_SFIVE.task)
+ ngx.thread.wait(task)
+ if premature then
+ return
+ end
+ local ok, err = ngx.timer.at(2, _SFIVE.launcher)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(launcher): failed to reenqueue myself: ", err)
+ inst.flags:delete("running")
+ return
+ end
+end
+
+function _SFIVE.init()
+ local ok, err = sfive:add("running", true)
+ if not ok then
+ if err ~= "exists" then
+ ngx.log(ngx.ERR, "SFive(init): failed to set running flag: " .. err)
+ end
+ return
+ end
+
+ sfive_data:flush_all()
+ sfive_data:flush_expired()
+
+ ok, err = ngx.timer.at(0, _SFIVE.launcher)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(init): failed to enqueue sfive launcher: ", err)
+ return
+ end
+
+ ngx.log(ngx.INFO, "SFive(init): task initialized successfully!")
+end
+
+function _SFIVE.init_worker(hostname, duration, sockpath)
+ config.hostname = hostname
+ config.duration = duration
+ config.sockpath = sockpath
+ _SFIVE.init()
+ ngx.log(ngx.INFO, "SFive(init): worker initialized successfully!")
+end
+
+ngx.log(ngx.DEBUG, "SFive: loaded successfully")
+return _SFIVE