summaryrefslogtreecommitdiff
path: root/src/daq/nginx-lua/s5-nginx.lua
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-18 06:12:01 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-18 06:12:11 +0200
commitd4dd348dd8ddaf57075b1eb394f6f4cf19db91b4 (patch)
tree3ebb1a25909b6ee0670df6cb3202efa4aff27bf3 /src/daq/nginx-lua/s5-nginx.lua
parentsetting tags works now (diff)
sending empty messages works now (almost)
Diffstat (limited to 'src/daq/nginx-lua/s5-nginx.lua')
-rw-r--r--src/daq/nginx-lua/s5-nginx.lua100
1 files changed, 62 insertions, 38 deletions
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua
index 3d6ad8e..c38b692 100644
--- a/src/daq/nginx-lua/s5-nginx.lua
+++ b/src/daq/nginx-lua/s5-nginx.lua
@@ -42,14 +42,14 @@
-- init_worker_by_lua '
-- s5.init_worker("myhostname", 5, "/path/to/sfive.sock",
-- { "tag1", "tag2" },
--- { { "contentA", "format1", "low" },
--- { "contentA", "format1", "high" },
--- { "contentB", "format1", "low" },
--- { "contentB", "format1", "high" },
--- { "contentA", "format2", "low" },
--- { "contentA", "format2", "high" },
--- { "contentB", "format2", "low" },
--- { "contentB", "format2", "high" } })
+-- { { content="A", format="1", quality="low" },
+-- { content="A", format="1", quality="high" },
+-- { content="B", format="1", quality="low" },
+-- { content="B", format="1", quality="high" },
+-- { content="A", format="2", quality="low" },
+-- { content="A", format="2", quality="high" },
+-- { content="B", format="2", quality="low" },
+-- { content="B", format="2", quality="high" } })
-- ';
--
-- .....
@@ -59,10 +59,10 @@
-- .....
--
-- location /path/to/fromat1/contentA-low/ {
--- log_by_lua 's5.log("contentA", "format1", "low")';
+-- log_by_lua 's5.log("A", "1", "low")';
-- }
-- location /path/to/fromat1/contentA-high/ {
--- log_by_lua 's5.log("contentA", "format1", "high")';
+-- log_by_lua 's5.log("A", "1", "high")';
-- }
-- .....
--
@@ -116,30 +116,33 @@ local connect = function()
return sock
end
-local send_data = function(sock, starttime)
-
- -- logs = sfive_data:get_keys()
- -- for i, k in ipairs(logs) do
- -- local ok, err = ngx.say(sfive_data:get(k))
- -- if not ok then
- -- ngx.log(ngx.ERR, "SFive(fetch): failed to send data set: ", err)
- -- break
- -- end
- -- sfive_data:delete(k)
- -- end
+local get_data_streamer = function(content, format, quality)
+ return '{ "client-count": 0, "bytes-sent": 0 }'
+end
- local json = '{'
- json = json .. '"start-time": "' .. starttime .. 'Z",'
- json = json .. '"duration-ms": "' .. config.duration * 1000 .. '"'
- json = json .. '}\n'
- return sock:send(json)
+local send_data = function(sock, starttime)
+ for i, s in ipairs(config.streamer) do
+ local json = '{'
+ json = json .. '"start-time": "' .. starttime .. 'Z", '
+ json = json .. '"duration-ms": "' .. config.duration * 1000 .. '", '
+ json = json .. '"streamer-id": { "content-id": "' .. s.content .. '", ' ..
+ '"format": "' .. s.format .. '", ' ..
+ '"quality": "' .. s.quality .. '" }, '
+ json = json .. '"data": ' .. get_data_streamer(s.content, s.format, s.quality)
+ json = json .. '}\n'
+ local ok, err = sock:send(json)
+ if not ok then
+ return nil, err
+ end
+ end
+ return true
end
local send_init = function(sock)
local json = '{'
- json = json .. '"version": 1'
- json = json .. ',"hostname": "' .. config.hostname .. '"'
- json = json .. ',"tags": ['
+ json = json .. '"version": 1, '
+ json = json .. '"hostname": "' .. config.hostname .. '", '
+ json = json .. '"tags": ['
for i, k in ipairs(config.tags) do
json = json .. (i==1 and '' or ',') .. '"' .. k .. '"'
end
@@ -148,6 +151,32 @@ local send_init = function(sock)
return sock:send(json)
end
+local send_updates = function(sock, starttime, lock)
+ ret = true
+ local elapsed, err = lock:lock("data")
+ if not elapsed then
+ ngx.log(ngx.ERR, "SFive(task): lock error: " .. err)
+ ret = false
+ else
+ local ok, err = send_data(sock, starttime)
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err)
+ sock:close()
+ ret = false
+ else
+ sfive_data:flush_all()
+ sfive_data:flush_expired()
+ end
+
+ ok, err = lock:unlock("data")
+ if not ok then
+ ngx.log(ngx.ERR, "SFive(init): unlock error: " .. err)
+ ret = false
+ end
+ end
+ return ret
+end
+
local task = function()
local sock, err = connect(config.sockpath)
if not sock then
@@ -161,18 +190,13 @@ local task = function()
return
end
end
-
-
+ sfive_data:flush_all()
+ sfive_data:flush_expired()
+ local lock = locks:new("sfive_locks")
while true do
local starttime = string.gsub(ngx.utctime(), " ", "T", 1)
-
ngx.sleep(config.duration)
-
- -- TODO: acquire(data_lock), send(data), flush(data), release(data_lock)
- local ok, err = send_data(sock, starttime)
- if not ok then
- ngx.log(ngx.ERR, "SFive(task): sending data failed: " .. err)
- sock:close()
+ if not send_updates(sock, starttime, lock) then
return
end
end