From 63a7daa4f27a83bf49ce7ffaf95b68c4a0e15231 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 28 Feb 2008 23:18:08 +0000 Subject: anyrtpproxy finished ?? --- PracticalSocket.cpp | 19 ++++++++ PracticalSocket.h | 4 ++ anyrtpproxy/anyrtpproxy.cpp | 106 +++++++++++++++++++++++++++++++++++++++----- anyrtpproxy/options.cpp | 27 +++++++++-- anyrtpproxy/options.h | 3 ++ 5 files changed, 145 insertions(+), 14 deletions(-) diff --git a/PracticalSocket.cpp b/PracticalSocket.cpp index 658bc6e..5d39557 100644 --- a/PracticalSocket.cpp +++ b/PracticalSocket.cpp @@ -186,6 +186,14 @@ void Socket::setLocalAddressAndPort(const string &localAddress, } } +void Socket::setSocketOpt(int optionName, const void* optionValue, socklen_t optionLen) + throw(SocketException) +{ + if (::setsockopt(sockDesc, SOL_SOCKET, optionName, optionValue, optionLen) < 0) { + throw SocketException("setSockopt failed", true); + } +} + void Socket::cleanUp() throw(SocketException) { #ifdef WIN32 if (WSACleanup() != 0) { @@ -282,6 +290,8 @@ TCPSocket::TCPSocket(int newConnSD) : CommunicatingSocket(newConnSD) { TCPServerSocket::TCPServerSocket(unsigned short localPort, int queueLen) throw(SocketException) : Socket(SOCK_STREAM, IPPROTO_TCP) { + const int opt = 1; + setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt)); setLocalPort(localPort); setListen(queueLen); } @@ -289,6 +299,8 @@ TCPServerSocket::TCPServerSocket(unsigned short localPort, int queueLen) TCPServerSocket::TCPServerSocket(const string &localAddress, unsigned short localPort, int queueLen) throw(SocketException) : Socket(SOCK_STREAM, IPPROTO_TCP) { + const int opt = 1; + setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt)); setLocalAddressAndPort(localAddress, localPort); setListen(queueLen); } @@ -312,17 +324,24 @@ void TCPServerSocket::setListen(int queueLen) throw(SocketException) { UDPSocket::UDPSocket() throw(SocketException) : CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) { + + const int opt = 1; + setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt)); setBroadcast(); } UDPSocket::UDPSocket(unsigned short localPort) throw(SocketException) : CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) { + const int opt = 1; + setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt)); setLocalPort(localPort); setBroadcast(); } UDPSocket::UDPSocket(const string &localAddress, unsigned short localPort) throw(SocketException) : CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) { + const int opt = 1; + setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt)); setLocalAddressAndPort(localAddress, localPort); setBroadcast(); } diff --git a/PracticalSocket.h b/PracticalSocket.h index dcb39a9..0192cfd 100644 --- a/PracticalSocket.h +++ b/PracticalSocket.h @@ -128,6 +128,10 @@ public: void setLocalAddressAndPort(const string &localAddress, unsigned short localPort = 0) throw(SocketException); + + void setSocketOpt(int optionName, const void* optionValue, socklen_t optionLen) + throw(SocketException); + /** * If WinSock, unload the WinSock DLLs; otherwise do nothing. We ignore * this in our sample client code but include it in the library for diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index 311dc4f..e856e16 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -16,17 +16,49 @@ #define MAX_PACKET_SIZE 1500 -void* worker(void* l) +class OpenSerHost { - IfListElement* interface = reinterpret_cast(l); +public: + OpenSerHost() : host_("",0) {}; + + IfListElement getHost() { + Lock lock(mutex); + return host_; + } + + void setHost(std::string host, u_int16_t port) + { + Lock lock(mutex); + if(host_.host_ != host || host_.port_ != port) + cLog.msg(Log::PRIO_NOTICE) << "openSer Host detected at " << host << ":" << port; + + host_.host_ = host; + host_.port_ = port; + } + +private: + Mutex mutex; + + IfListElement host_; +}; + +struct ThreadParam +{ + OpenSerHost& open_ser_; + IfListElement interface_; +}; + +void* sender(void* p) +{ + ThreadParam* param = reinterpret_cast(p); try { - UDPSocket recv_sock(interface->host_, interface->port_); - UDPSocket send_sock; - IfList remote_host_list = gOpt.getRemoteHosts(); + UDPSocket recv_sock(param->interface_.host_, param->interface_.port_); + UDPSocket send_sock(gOpt.getSendPort()); + IfList remote_host_list(gOpt.getRemoteHosts()); - cLog.msg(Log::PRIO_NOTICE) << "worker listening on: " << interface->toString(); + cLog.msg(Log::PRIO_NOTICE) << "sender listening on: " << param->interface_.toString(); Buffer buf(u_int32_t(MAX_PACKET_SIZE)); while(1) { @@ -37,6 +69,8 @@ void* worker(void* l) u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); buf.setLength(len); + param->open_ser_.setHost(remote_host, remote_port); + IfList::const_iterator it = remote_host_list.begin(); for(;it != remote_host_list.end(); it++) send_sock.sendTo(buf.getBuf(), buf.getLength(), it->host_, it->port_); @@ -44,12 +78,52 @@ void* worker(void* l) } catch(std::exception &e) { - cLog.msg(Log::PRIO_ERR) << "worker(" << interface->toString() << ") exiting because: " << e.what() << std::endl; + cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; } pthread_exit(NULL); } + +void* receiver(void* p) +{ + ThreadParam* param = reinterpret_cast(p); + + try + { + UDPSocket sock(gOpt.getSendPort()); + + cLog.msg(Log::PRIO_NOTICE) << "receiver listening for packets from: " << param->interface_.toString(); + + Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + while(1) { + string remote_host; + u_int16_t remote_port; + + buf.setLength(MAX_PACKET_SIZE); + u_int32_t len = sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); + buf.setLength(len); + + if(remote_host != param->interface_.host_ || remote_port != param->interface_.port_) + continue; + + IfListElement openSerHost = param->open_ser_.getHost(); + if(openSerHost.host_ == "" || !openSerHost.port_) + { + cLog.msg(Log::PRIO_NOTICE) << "no openser host detected till now, ignoring packet"; + continue; + } + + sock.sendTo(buf.getBuf(), buf.getLength(), openSerHost.host_, openSerHost.port_); + } + } + catch(std::exception &e) + { + cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; + } + pthread_exit(NULL); +} + void chrootAndDrop(string const& chrootdir, string const& username) { if (getuid() != 0) @@ -122,16 +196,26 @@ int main(int argc, char* argv[]) SignalController sig; sig.init(); - + OpenSerHost open_ser; + std::list params; IfList listeners(gOpt.getLocalInterfaces()); IfList::iterator it = listeners.begin(); for(;it != listeners.end();++it) { - pthread_t workerThread; - pthread_create(&workerThread, NULL, worker, static_cast(&(*it))); - pthread_detach(workerThread); + ThreadParam param = {open_ser, *it}; + params.push_back(param); + pthread_t senderThread; + pthread_create(&senderThread, NULL, sender, &(params.back())); + pthread_detach(senderThread); } + + ThreadParam param = {open_ser, gOpt.getRemoteHosts().front()}; + params.push_back(param); + pthread_t receiverThread; + pthread_create(&receiverThread, NULL, receiver, &(params.back())); + pthread_detach(receiverThread); + int ret = sig.run(); return ret; diff --git a/anyrtpproxy/options.cpp b/anyrtpproxy/options.cpp index 6665f3d..6e7f291 100644 --- a/anyrtpproxy/options.cpp +++ b/anyrtpproxy/options.cpp @@ -56,6 +56,7 @@ Options::Options() username_ = "nobody"; chroot_dir_ = "/var/run"; daemonize_ = true; + send_port_ = 22220; local_interfaces_.push_back(IfListElement("0.0.0.0", 22221)); remote_hosts_.push_back(IfListElement("127.0.0.1", 22222)); } @@ -142,7 +143,7 @@ bool Options::parse(int argc, char* argv[]) PARSE_SCALAR_PARAM("-u","--user", username_) PARSE_SCALAR_PARAM("-c","--chroot-dir", chroot_dir_) PARSE_INVERSE_BOOL_PARAM("-d","--nodaemonize", daemonize_) - PARSE_SCALAR_PARAM("-c","--chroot-dir", chroot_dir_) + PARSE_SCALAR_PARAM("-p","--port", send_port_) PARSE_CSLIST_PARAM("-l","--listen", local_interfaces_) PARSE_CSLIST_PARAM("-r","--hosts", remote_hosts_) else @@ -168,8 +169,14 @@ bool Options::sanityCheck() void Options::printUsage() { std::cout << "USAGE:" << std::endl; - std::cout << "plain_tool [-h|--help] prints this..." << std::endl; -// std::cout << " [-K|--key] master key to use for encryption" << std::endl; + std::cout << "anyrtpproxy [-h|--help] prints this..." << std::endl; + std::cout << " [-t|--chroot] chroot and drop priviledges" << std::endl; + std::cout << " [-u|--username] in case of chroot run as this user" << std::endl; + std::cout << " [-c|--chroot-dir] directory to make a chroot to" << std::endl; + std::cout << " [-d|--nodaemonize] don't run in background" << std::endl; + std::cout << " [-p|--port] use this port to send out packets" << std::endl; + std::cout << " [-l|--listen] [,[: ..] a list of local interfaces to listen on" << std::endl; + std::cout << " [-r|--hosts] [,[: ..] a list of remote hosts to send duplicates to" << std::endl; } void Options::printOptions() @@ -180,6 +187,7 @@ void Options::printOptions() std::cout << "username='" << username_ << "'" << std::endl; std::cout << "chroot-dir='" << chroot_dir_ << "'" << std::endl; std::cout << "daemonize='" << daemonize_ << "'" << std::endl; + std::cout << "send-port='" << send_port_ << "'" << std::endl; std::cout << "local interfaces='"; IfList::const_iterator it=local_interfaces_.begin(); for(u_int32_t i=0; it != local_interfaces_.end(); ++it, ++i) @@ -264,6 +272,19 @@ Options& Options::setDaemonize(bool d) return *this; } +u_int16_t Options::getSendPort() +{ + Lock lock(mutex); + return send_port_; +} + +Options& Options::setSendPort(u_int16_t p) +{ + Lock lock(mutex); + send_port_ = p; + return *this; +} + IfList Options::getLocalInterfaces() { Lock lock(mutex); diff --git a/anyrtpproxy/options.h b/anyrtpproxy/options.h index 48df36a..48139f7 100644 --- a/anyrtpproxy/options.h +++ b/anyrtpproxy/options.h @@ -77,6 +77,8 @@ public: Options& setChrootDir(std::string c); bool getDaemonize(); Options& setDaemonize(bool d); + u_int16_t getSendPort(); + Options& setSendPort(u_int16_t p); IfList getLocalInterfaces(); IfList getRemoteHosts(); @@ -104,6 +106,7 @@ private: std::string username_; std::string chroot_dir_; bool daemonize_; + u_int16_t send_port_; IfList local_interfaces_; IfList remote_hosts_; }; -- cgit v1.2.3