From 2ff79c65051ceb468ee0a48c732a2a496a525bda Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 28 Dec 2010 03:19:57 +0000 Subject: added tcp.client and tcp.connect added tcp_connect module some cleanup and small fixes git-svn-id: https://svn.spreadspace.org/gcsd/trunk@85 ac14a137-c7f1-4531-abe0-07747231d213 --- TODO | 4 +- src/l_tcp.c | 168 +++++++++++++++++++++++++++++++++++++++----- src/main_loop.lua | 4 +- src/modules/exec.lua | 4 +- src/modules/tcp_connect.lua | 150 +++++++++++++++++++++++++++++++++++++++ src/modules/tcp_listen.lua | 6 +- 6 files changed, 309 insertions(+), 27 deletions(-) create mode 100644 src/modules/tcp_connect.lua diff --git a/TODO b/TODO index a1a1d94..85bef3e 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,4 @@ + * stop daemon when output module is supposed be removed * send response to requestor * out-of-order responses * add listener tables (for requests, responses and other messages) @@ -5,10 +6,9 @@ * finish API for command & response dispatch tables * module API etc. clean-up, finalize, freeze * add modules - - tcp_connect - udp_listen, udp_connect - unix_listen, unix_connect - inotify - * finilize tcp_listen module + * finilize tcp_listen and tcp_connect modules * exec module, add support for parameters * improve module loader (allow multiple search paths) diff --git a/src/l_tcp.c b/src/l_tcp.c index 158a8ba..c00921e 100644 --- a/src/l_tcp.c +++ b/src/l_tcp.c @@ -40,6 +40,9 @@ #include #include #include +#include +#include +#include #include "l_tcp.h" @@ -82,6 +85,29 @@ static char* tcp_endpoint_to_string(tcp_endpoint_t* e) return ret; } +static struct addrinfo* tcp_resolve_endpoint(const char* addr, const char* port, int af, int passive) +{ + struct addrinfo hints, *res; + res = NULL; + memset (&hints, 0, sizeof (hints)); + hints.ai_socktype = SOCK_STREAM; + if(passive) + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + switch(af) { + case 4: hints.ai_family = AF_INET; break; + case 6: hints.ai_family = AF_INET6; break; + default: hints.ai_family = AF_UNSPEC; break; + } + + int errcode = getaddrinfo(addr, port, &hints, &res); + if(errcode != 0 || !res) { + log_printf(ERROR, "tcp: resolver error: %s", errcode ? gai_strerror(errcode):"no address found!"); + return NULL; + } + + return res; +} + static int init_listener(tcp_endpoint_t* end) { int fd = socket(end->addr_.ss_family, SOCK_STREAM, 0); @@ -127,20 +153,8 @@ static int l_tcp_server(lua_State *L) const char* port = luaL_checkstring(L, 2); int af = luaL_optint(L, 3, 0); - struct addrinfo hints, *res; - res = NULL; - memset (&hints, 0, sizeof (hints)); - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - switch(af) { - case 4: hints.ai_family = AF_INET; break; - case 6: hints.ai_family = AF_INET6; break; - default: hints.ai_family = AF_UNSPEC; break; - } - - int errcode = getaddrinfo(addr, port, &hints, &res); - if(errcode != 0 || !res) { - log_printf(ERROR, "tcp: resolver error: %s", errcode ? gai_strerror(errcode):"no address found!"); + struct addrinfo *res = tcp_resolve_endpoint(addr, port, af, 1); + if(!res) { lua_pushnil(L); lua_pushstring(L, "error at tcp-server initialization"); return 2; @@ -199,16 +213,134 @@ static int l_tcp_accept(lua_State *L) return 2; } +static int init_client(tcp_endpoint_t remote, tcp_endpoint_t source) +{ + int fd = socket(remote.addr_.ss_family, SOCK_STREAM, 0); + if(fd < 0) { + log_printf(INFO, "error on socket(): %s", strerror(errno)); + return -1; + } + + int on = 1; + if(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on))) { + log_printf(ERROR, "error on setsockopt(): %s", strerror(errno)); + close(fd); + return -1; + } + + if(fcntl(fd, F_SETFL, O_NONBLOCK)) { + log_printf(ERROR, "error on fcntl(): %s", strerror(errno)); + close(fd); + return -1; + } + + if(source.addr_.ss_family != AF_UNSPEC) { + if(bind(fd, (struct sockaddr *)&(source.addr_), source.len_)==-1) { + log_printf(INFO, "error on bind(): %s", strerror(errno)); + close(fd); + return -1; + } + } + + return fd; +} + static int l_tcp_client(lua_State *L) { - // TODO: implement this - return 0; + const char* raddr = luaL_checkstring(L, 1); + const char* rport = luaL_checkstring(L, 2); + int af = luaL_optint(L, 3, 0); + const char* saddr = luaL_optstring(L, 4, NULL); + + tcp_endpoint_t remote, source; + memset(&remote, 0, sizeof(tcp_endpoint_t)); + memset(&source, 0, sizeof(tcp_endpoint_t)); + source.addr_.ss_family = AF_UNSPEC; + + struct addrinfo *res = tcp_resolve_endpoint(raddr, rport, af, 0); + if(!res) { + lua_pushnil(L); + lua_pushstring(L, "error at tcp-client initialization"); + return 2; + } + memcpy(&(remote.addr_), res->ai_addr, res->ai_addrlen); + remote.len_ = res->ai_addrlen; + freeaddrinfo(res); + + if(saddr) { + res = tcp_resolve_endpoint(saddr, NULL, af, 0); + if(!res) { + lua_pushnil(L); + lua_pushstring(L, "error at tcp-client initialization"); + return 2; + } + memcpy(&(source.addr_), res->ai_addr, res->ai_addrlen); + source.len_ = res->ai_addrlen; + freeaddrinfo(res); + } + + int fd = init_client(remote, source); + if(fd < 0) { + lua_pushnil(L); + lua_pushstring(L, "error at tcp-client initialization"); + return 2; + } + + lua_newtable(L); + + lua_pushliteral(L, "remote_end"); + tcp_endpoint_t* re = newtcpendudata(L); + memcpy(re, &remote, sizeof(tcp_endpoint_t)); + lua_settable(L, -3); + + if(source.addr_.ss_family != AF_UNSPEC) { + lua_pushliteral(L, "source_end"); + tcp_endpoint_t* se = newtcpendudata(L); + memcpy(se, &source, sizeof(tcp_endpoint_t)); + lua_settable(L, -3); + } + + lua_pushliteral(L, "fd"); + lua_pushinteger(L, fd); + lua_settable(L, -3); + + if(connect(fd, (struct sockaddr *)&(remote.addr_), remote.len_)==-1) { + if(errno == EINPROGRESS) { + lua_pushboolean(L, 0); + return 2; + } + + close(fd); + log_printf(INFO, "error on connect(): %s", strerror(errno)); + lua_pushnil(L); + lua_pushstring(L, "connect() failed"); + return 2; + } + + lua_pushboolean(L, 1); + return 2; } static int l_tcp_connect(lua_State *L) { - // TODO: implement this - return 0; + int fd = luaL_checkint(L, 1); + int error = 0; + int len = sizeof(error); + if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len)==-1) { + log_printf(ERROR, "error on getsockopt(): %s", strerror(errno)); + lua_pushnil(L); + lua_pushstring(L, "can't read SO_ERROR"); + return 2; + } + if(error) { + log_printf(ERROR, "error on connect(): %s", strerror(error)); + lua_pushnil(L); + lua_pushstring(L, "connect() failed"); + return 2; + } + + lua_pushboolean(L, 1); + return 1; } static int l_tcp_recv(lua_State *L) diff --git a/src/main_loop.lua b/src/main_loop.lua index f1712d9..84afa01 100644 --- a/src/main_loop.lua +++ b/src/main_loop.lua @@ -50,14 +50,14 @@ function get_readables() end function get_writeables() - local writeables = { } + local writeables = {} for _, module in ipairs(module_list.inputs) do for _, fd in ipairs(module:get_write_handles()) do table.insert(writeables, fd) end end for _, fd in ipairs(module_list.output.get_write_handles()) do - table.insert(readables, fd) + table.insert(writeables, fd) end for _, client in ipairs(client_list.clients) do for _, fd in ipairs(client:get_write_handles()) do diff --git a/src/modules/exec.lua b/src/modules/exec.lua index 1556087..46f540e 100644 --- a/src/modules/exec.lua +++ b/src/modules/exec.lua @@ -78,7 +78,7 @@ function exec:new(config, runtype) local buffer, err = rawio.read(self.fd, 100) if(buffer == nil) then log.printf(log.ERROR, inst.name .. ": connection error: %s", err) - return defines.KILL_MODULE_CLASS + return defines.KILL_MODULE end self.in_buffer = self.in_buffer .. buffer @@ -102,7 +102,7 @@ function exec:new(config, runtype) local len, err = rawio.write(self.fd, self.out_buffer) if(len == nil) then log.printf(log.ERROR, inst.name .. ": connection error: %s", err) - ret = defines.KILL_MODULE_CLASS + ret = defines.KILL_MODULE else self.out_buffer = string.sub(self.out_buffer, len+1) end diff --git a/src/modules/tcp_connect.lua b/src/modules/tcp_connect.lua new file mode 100644 index 0000000..c5171ea --- /dev/null +++ b/src/modules/tcp_connect.lua @@ -0,0 +1,150 @@ +-- +-- gcsd +-- +-- gcsd the generic command sequencer daemon can be used to serialize +-- commands sent over various paralell communication channels to a +-- single command output. It can be seen as a multiplexer for any +-- kind of communication between a single resource and various clients +-- which want to submit commands to it or query information from it. +-- gcsd is written in C and Lua. The goal is to provide an easy to +-- understand high level API based on Lua which can be used to +-- implement the business logic of the so formed multiplexer daemon. +-- +-- +-- Copyright (C) 2009-2010 Markus Grueneis +-- Christian Pointner +-- +-- This file is part of gcsd. +-- +-- gcsd is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- any later version. +-- +-- gcsd is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with gcsd. If not, see . +-- + +-- tcp_connect module class +local tcp_connect = {} +tcp_connect.properties = { type=defines.INOUT_MODULE, name="tcp-connect", max_instances=-1 } +tcp_connect.next_id = 0 + +-- create new instance of tcp_connect module class +function tcp_connect:new(config, runtype) + local inst = {} + inst.class = self + inst.config = config + inst.config.runtype = runtype + if(config.name == nil or config.name == "") then + inst.name = self.properties.name .. self.next_id + self.next_id = self.next_id + 1 + else + inst.name = config.name + end + + local handle, connected = tcp.client(config.addr, config.port, config.resolv_type, config.source) + if(not handle) then + return nil + end + inst.connected = connected + + handle.in_buffer = "" + handle.out_buffer = "" + function handle:read() + -- TODO: which size should we request?? + local buffer, err = tcp.recv(self.fd, 100) + if(buffer == nil) then + log.printf(log.ERROR, inst.name .. ": connection error: %s", err) + return defines.KILL_MODULE -- TODO: reconnect + end + if(#buffer == 0) then + log.printf(log.INFO, inst.name .. ": connection closed") + return defines.KILL_MODULE -- TODO: reconnect + end + + self.in_buffer = self.in_buffer .. buffer + if(inst.config.runtype == defines.IN_MODULE) then + self.in_buffer = command_table:dispatch(self.in_buffer) + else + self.in_buffer = response_table:dispatch(self.in_buffer) + end + + return defines.OK + end + function handle:write() + local len, err = tcp.send(self.fd, self.out_buffer) + if(len == nil) then + log.printf(log.ERROR, inst.name .. ": connection error: %s", err) + ret = defines.KILL_MODULE -- TODO: reconnect + else + self.out_buffer = string.sub(self.out_buffer, len+1) + end + if(inst.config.runtype == defines.OUT_MODULE and self.out_buffer == "") then + command_queue:command_sent() + end + return defines.OK + end + + local client = {} + client.module_instance = inst + client.name = inst.name .. "#0" + function client:process_response() end + function client:process_timeout() end + function client:get_read_handles() + return { handle } + end + function client:get_write_handles() + if(handle.out_buffer ~= "") then + return { handle } + else + return {} + end + end + function client:cleanup() + rawio.close(handle.fd) + end + handle.client_instance = client + + local connect_handle = {} + connect_handle.fd = handle.fd + function connect_handle:read() return defines.OK end + function connect_handle:write() + local connected, err = tcp.connect(handle.fd) + if(not connected) then + return defines.KILL_MODULE + end + inst.connected = true + client_list:register(client) + return defines.OK + end + connect_handle.client_instance = client + + function inst:cleanup() + client_list:unregister_by_module(self) + end + function inst:get_read_handles() + return {} + end + function inst:get_write_handles() + if(not inst.connected) then + return { connect_handle } + else + return {} + end + end + function inst:start_command(command) + handle.out_buffer = command + end + setmetatable(inst, {}) + getmetatable(inst).__gc = function() inst:cleanup() end + + return inst +end + +return tcp_connect diff --git a/src/modules/tcp_listen.lua b/src/modules/tcp_listen.lua index 9fa28ed..365e116 100644 --- a/src/modules/tcp_listen.lua +++ b/src/modules/tcp_listen.lua @@ -68,7 +68,7 @@ function tcp_listen:new(config, runtype) local client_handle = {} client_handle.fd = new_client - client_handle.client_instance = nil + client_handle.client_instance = nil client_handle.in_buffer = "" client_handle.out_buffer = "" function client_handle:read() @@ -112,10 +112,10 @@ function tcp_listen:new(config, runtype) client.name = inst.name .. "#" .. tcp.endtostring(addr) function client:process_response() end function client:process_timeout() end - function client:get_read_handles() + function client:get_read_handles() return { client_handle } end - function client:get_write_handles() + function client:get_write_handles() if(client_handle.out_buffer ~= "") then return { client_handle } else -- cgit v1.2.3