From d4dd348dd8ddaf57075b1eb394f6f4cf19db91b4 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 18 Oct 2014 06:12:01 +0200 Subject: sending empty messages works now (almost) --- src/daq/nginx-lua/s5-nginx.lua | 100 +++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 38 deletions(-) diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua index 3d6ad8e..c38b692 100644 --- a/src/daq/nginx-lua/s5-nginx.lua +++ b/src/daq/nginx-lua/s5-nginx.lua @@ -42,14 +42,14 @@ -- init_worker_by_lua ' -- s5.init_worker("myhostname", 5, "/path/to/sfive.sock", -- { "tag1", "tag2" }, --- { { "contentA", "format1", "low" }, --- { "contentA", "format1", "high" }, --- { "contentB", "format1", "low" }, --- { "contentB", "format1", "high" }, --- { "contentA", "format2", "low" }, --- { "contentA", "format2", "high" }, --- { "contentB", "format2", "low" }, --- { "contentB", "format2", "high" } }) +-- { { content="A", format="1", quality="low" }, +-- { content="A", format="1", quality="high" }, +-- { content="B", format="1", quality="low" }, +-- { content="B", format="1", quality="high" }, +-- { content="A", format="2", quality="low" }, +-- { content="A", format="2", quality="high" }, +-- { content="B", format="2", quality="low" }, +-- { content="B", format="2", quality="high" } }) -- '; -- -- ..... @@ -59,10 +59,10 @@ -- ..... -- -- location /path/to/fromat1/contentA-low/ { --- log_by_lua 's5.log("contentA", "format1", "low")'; +-- log_by_lua 's5.log("A", "1", "low")'; -- } -- location /path/to/fromat1/contentA-high/ { --- log_by_lua 's5.log("contentA", "format1", "high")'; +-- log_by_lua 's5.log("A", "1", "high")'; -- } -- ..... -- @@ -116,30 +116,33 @@ local connect = function() return sock end -local send_data = function(sock, starttime) - - -- logs = sfive_data:get_keys() - -- for i, k in ipairs(logs) do - -- local ok, err = ngx.say(sfive_data:get(k)) - -- if not ok then - -- ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err) - -- break - -- end - -- sfive_data:delete(k) - -- end +local get_data_streamer = function(content, format, quality) + return '{ "client-count": 0, "bytes-sent": 0 }' +end - local json = '{' - json = json .. '"start-time": "' .. starttime .. 'Z",' - json = json .. '"duration-ms": "' .. config.duration * 1000 .. '"' - json = json .. '}\n' - return sock:send(json) +local send_data = function(sock, starttime) + for i, s in ipairs(config.streamer) do + local json = '{' + json = json .. '"start-time": "' .. starttime .. 'Z", ' + json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", ' + json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' .. + '"format": "' .. s.format .. '", ' .. + '"quality": "' .. s.quality .. '" }, ' + json = json .. '"data": ' .. get_data_streamer(s.content, s.format, s.quality) + json = json .. '}\n' + local ok, err = sock:send(json) + if not ok then + return nil, err + end + end + return true end local send_init = function(sock) local json = '{' - json = json .. '"version": 1' - json = json .. ',"hostname": "' .. config.hostname .. '"' - json = json .. ',"tags": [' + json = json .. '"version": 1, ' + json = json .. '"hostname": "' .. config.hostname .. '", ' + json = json .. '"tags": [' for i, k in ipairs(config.tags) do json = json .. (i==1 and '' or ',') .. '"' .. k .. '"' end @@ -148,6 +151,32 @@ local send_init = function(sock) return sock:send(json) end +local send_updates = function(sock, starttime, lock) + ret = true + local elapsed, err = lock:lock("data") + if not elapsed then + ngx.log(ngx.ERR, "SFive(task): lock error: " .. err) + ret = false + else + local ok, err = send_data(sock, starttime) + if not ok then + ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) + sock:close() + ret = false + else + sfive_data:flush_all() + sfive_data:flush_expired() + end + + ok, err = lock:unlock("data") + if not ok then + ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err) + ret = false + end + end + return ret +end + local task = function() local sock, err = connect(config.sockpath) if not sock then @@ -161,18 +190,13 @@ local task = function() return end end - - + sfive_data:flush_all() + sfive_data:flush_expired() + local lock = locks:new("sfive_locks") while true do local starttime = string.gsub(ngx.utctime(), " ", "T", 1) - ngx.sleep(config.duration) - - -- TODO: acquire(data_lock), send(data), flush(data), release(data_lock) - local ok, err = send_data(sock, starttime) - if not ok then - ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err) - sock:close() + if not send_updates(sock, starttime, lock) then return end end -- cgit v1.2.3