diff options
-rw-r--r-- | src/daq/nginx-lua/s5-nginx.lua | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua index 6559e95..eedb85f 100644 --- a/src/daq/nginx-lua/s5-nginx.lua +++ b/src/daq/nginx-lua/s5-nginx.lua @@ -89,19 +89,33 @@ local sfive_data = ngx.shared.sfive_data function _SFIVE.log(content, format, quality) local status = ngx.var.status if status == '200' or status == '206' then - - -- TODO: acquire(data_lock), update(data), release(data_lock) - ngx.log(ngx.ERR, "SFive(log): won't add data from " .. ngx.var.remote_addr .. - " to (" .. content .. ", " .. format .. ", " .. quality .. "): not implemented yet!") - -- local json = '{' - -- json = json .. '"time": "' .. string.gsub(ngx.utctime(), " ", "T", 1) .. 'Z",' - -- json = json .. '"client": "' .. ngx.var.remote_addr .. '",' - -- json = json .. '"port": ' .. ngx.var.remote_port .. ',' - -- json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' - -- json = json .. '"uri": "' .. ngx.var.uri .. '",' - -- json = json .. '"status": ' .. status .. ',' - -- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent - -- json = json .. '}' + local lock = locks:new("sfive_locks") + local elapsed, err = lock:lock("data") + if not elapsed then + ngx.log(ngx.ERR, "SFive(log): lock error: " .. err) + ret = false + else + local remote_addr = ngx.var.remote_addr + local bytes_sent = tonumber(ngx.var.bytes_sent) + local key = table.concat({content, format, quality, remote_addr}, ';') + local ok, err = sfive_data:incr(key, bytes_sent) + if not ok then + if err == "not found" then + local ok , err = sfive_data:add(key, bytes_sent, config.duration + 5) + if not ok then + ngx.log(ngx.ERR, "SFive(log): adding dataset failed for '" .. key .. "': " .. err) + end + else + ngx.log(ngx.ERR, "SFive(log): updating dataset failed for '" .. key .. "': " .. err) + end + end + ok, err = lock:unlock("data") + if not ok then + ngx.log(ngx.ERR, "SFive(log): unlock error: " .. err) + ret = false + end + end + return ret end end @@ -116,11 +130,36 @@ local connect = function() return sock end -local get_data_streamer = function(content, format, quality) - return '{ "client-count": 0, "bytes-sent": 0 }' +local get_data_streamer = function(keys, content, format, quality) + local clients = {} + local re = '^' .. table.concat({content, format, quality, "([^;]+)"}, ';') .. '$' + for i, k in ipairs(keys) do + local m, err = ngx.re.match(k, re) + if m then + local bs, err = sfive_data:get(k) + if not bs then + ngx.log(ngx.ERR, "SFive(task): '" .. k .. "' not found in sfive_data?") + else + clients[m[1]] = (clients[m[1]] or 0) + bs + end + end + end + + local cnt = 0 + local bytes_sent = 0 + json = '{ clients: [' + for k, v in pairs(clients) do + if cnt > 0 then json = json .. ',' end + json = json .. '{ "ip": ' .. k .. ', "bytes-sent": ' .. v .. '}' + bytes_sent = bytes_sent + v + cnt = cnt + 1 + end + json = json .. '], "client-count": ' .. cnt .. ', "bytes-sent": ' .. bytes_sent .. ' }' + return json end local send_data = function(sock, starttime) + local keys = sfive_data:get_keys(0) for i, s in ipairs(config.streamer) do local json = '{' json = json .. '"start-time": "' .. starttime .. 'Z", ' @@ -128,7 +167,7 @@ local send_data = function(sock, starttime) json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' .. '"format": "' .. s.format .. '", ' .. '"quality": "' .. s.quality .. '" }, ' - json = json .. '"data": ' .. get_data_streamer(s.content, s.format, s.quality) + json = json .. '"data": ' .. get_data_streamer(keys, s.content, s.format, s.quality) json = json .. '}\n' while true do -- TODO: this should probably be done inside a seperate writer thread |