summaryrefslogtreecommitdiff
path: root/src/daq/nginx-lua/s5-nginx.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/daq/nginx-lua/s5-nginx.lua')
-rw-r--r--src/daq/nginx-lua/s5-nginx.lua71
1 files changed, 55 insertions, 16 deletions
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua
index 6559e95..eedb85f 100644
--- a/src/daq/nginx-lua/s5-nginx.lua
+++ b/src/daq/nginx-lua/s5-nginx.lua
@@ -89,19 +89,33 @@ local sfive_data = ngx.shared.sfive_data
function _SFIVE.log(content, format, quality)
local status = ngx.var.status
if status == '200' or status == '206' then
-
- -- TODO: acquire(data_lock), update(data), release(data_lock)
- ngx.log(ngx.ERR, "SFive(log): won't add data from " .. ngx.var.remote_addr ..
- " to (" .. content .. ", " .. format .. ", " .. quality .. "): not implemented yet!")
- -- local json = '{'
- -- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",'
- -- json = json .. '"client": "' .. ngx.var.remote_addr .. '",'
- -- json = json .. '"port": ' .. ngx.var.remote_port .. ','
- -- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",'
- -- json = json .. '"uri": "' .. ngx.var.uri .. '",'
- -- json = json .. '"status": ' .. status .. ','
- -- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent
- -- json = json .. '}'
+ local lock = locks:new("sfive_locks")
+ local elapsed, err = lock:lock("data")
+ if not elapsed then
+ ngx.log(ngx.ERR, "SFive(log): lock error: " .. err)
+ ret = false
+ else
+ local remote_addr = ngx.var.remote_addr
+ local bytes_sent = tonumber(ngx.var.bytes_sent)
+ local key = table.concat({content, format, quality, remote_addr}, ';')
+ local ok, err = sfive_data:incr(key, bytes_sent)
+ if not ok then
+ if err == "not found" then
+ local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err)
+ end
+ else
+ ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err)
+ end
+ end
+ ok, err = lock:unlock("data")
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err)
+ ret = false
+ end
+ end
+ return ret
end
end
@@ -116,11 +130,36 @@ local connect = function()
return sock
end
-local get_data_streamer = function(content, format, quality)
- return '{ "client-count": 0, "bytes-sent": 0 }'
+local get_data_streamer = function(keys, content, format, quality)
+ local clients = {}
+ local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$'
+ for i, k in ipairs(keys) do
+ local m, err = ngx.re.match(k, re)
+ if m then
+ local bs, err = sfive_data:get(k)
+ if not bs then
+ ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?")
+ else
+ clients[m[1]] = (clients[m[1]] or 0) + bs
+ end
+ end
+ end
+
+ local cnt = 0
+ local bytes_sent = 0
+ json = '{ clients: ['
+ for k, v in pairs(clients) do
+ if cnt > 0 then json = json .. ',' end
+ json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}'
+ bytes_sent = bytes_sent + v
+ cnt = cnt + 1
+ end
+ json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }'
+ return json
end
local send_data = function(sock, starttime)
+ local keys = sfive_data:get_keys(0)
for i, s in ipairs(config.streamer) do
local json = '{'
json = json .. '"start-time": "' .. starttime .. 'Z", '
@@ -128,7 +167,7 @@ local send_data = function(sock, starttime)
json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' ..
'"format": "' .. s.format .. '", ' ..
'"quality": "' .. s.quality .. '" }, '
- json = json .. '"data": ' .. get_data_streamer(s.content, s.format, s.quality)
+ json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality)
json = json .. '}\n'
while true do
-- TODO: this should probably be done inside a seperate writer thread