From 63a7daa4f27a83bf49ce7ffaf95b68c4a0e15231 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 28 Feb 2008 23:18:08 +0000 Subject: anyrtpproxy finished ?? --- anyrtpproxy/anyrtpproxy.cpp | 106 +++++++++++++++++++++++++++++++++++++++----- anyrtpproxy/options.cpp | 27 +++++++++-- anyrtpproxy/options.h | 3 ++ 3 files changed, 122 insertions(+), 14 deletions(-) (limited to 'anyrtpproxy') diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index 311dc4f..e856e16 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -16,17 +16,49 @@ #define MAX_PACKET_SIZE 1500 -void* worker(void* l) +class OpenSerHost { - IfListElement* interface = reinterpret_cast(l); +public: + OpenSerHost() : host_("",0) {}; + + IfListElement getHost() { + Lock lock(mutex); + return host_; + } + + void setHost(std::string host, u_int16_t port) + { + Lock lock(mutex); + if(host_.host_ != host || host_.port_ != port) + cLog.msg(Log::PRIO_NOTICE) << "openSer Host detected at " << host << ":" << port; + + host_.host_ = host; + host_.port_ = port; + } + +private: + Mutex mutex; + + IfListElement host_; +}; + +struct ThreadParam +{ + OpenSerHost& open_ser_; + IfListElement interface_; +}; + +void* sender(void* p) +{ + ThreadParam* param = reinterpret_cast(p); try { - UDPSocket recv_sock(interface->host_, interface->port_); - UDPSocket send_sock; - IfList remote_host_list = gOpt.getRemoteHosts(); + UDPSocket recv_sock(param->interface_.host_, param->interface_.port_); + UDPSocket send_sock(gOpt.getSendPort()); + IfList remote_host_list(gOpt.getRemoteHosts()); - cLog.msg(Log::PRIO_NOTICE) << "worker listening on: " << interface->toString(); + cLog.msg(Log::PRIO_NOTICE) << "sender listening on: " << param->interface_.toString(); Buffer buf(u_int32_t(MAX_PACKET_SIZE)); while(1) { @@ -37,6 +69,8 @@ void* worker(void* l) u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); buf.setLength(len); + param->open_ser_.setHost(remote_host, remote_port); + IfList::const_iterator it = remote_host_list.begin(); for(;it != remote_host_list.end(); it++) send_sock.sendTo(buf.getBuf(), buf.getLength(), it->host_, it->port_); @@ -44,12 +78,52 @@ void* worker(void* l) } catch(std::exception &e) { - cLog.msg(Log::PRIO_ERR) << "worker(" << interface->toString() << ") exiting because: " << e.what() << std::endl; + cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; } pthread_exit(NULL); } + +void* receiver(void* p) +{ + ThreadParam* param = reinterpret_cast(p); + + try + { + UDPSocket sock(gOpt.getSendPort()); + + cLog.msg(Log::PRIO_NOTICE) << "receiver listening for packets from: " << param->interface_.toString(); + + Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + while(1) { + string remote_host; + u_int16_t remote_port; + + buf.setLength(MAX_PACKET_SIZE); + u_int32_t len = sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); + buf.setLength(len); + + if(remote_host != param->interface_.host_ || remote_port != param->interface_.port_) + continue; + + IfListElement openSerHost = param->open_ser_.getHost(); + if(openSerHost.host_ == "" || !openSerHost.port_) + { + cLog.msg(Log::PRIO_NOTICE) << "no openser host detected till now, ignoring packet"; + continue; + } + + sock.sendTo(buf.getBuf(), buf.getLength(), openSerHost.host_, openSerHost.port_); + } + } + catch(std::exception &e) + { + cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; + } + pthread_exit(NULL); +} + void chrootAndDrop(string const& chrootdir, string const& username) { if (getuid() != 0) @@ -122,16 +196,26 @@ int main(int argc, char* argv[]) SignalController sig; sig.init(); - + OpenSerHost open_ser; + std::list params; IfList listeners(gOpt.getLocalInterfaces()); IfList::iterator it = listeners.begin(); for(;it != listeners.end();++it) { - pthread_t workerThread; - pthread_create(&workerThread, NULL, worker, static_cast(&(*it))); - pthread_detach(workerThread); + ThreadParam param = {open_ser, *it}; + params.push_back(param); + pthread_t senderThread; + pthread_create(&senderThread, NULL, sender, &(params.back())); + pthread_detach(senderThread); } + + ThreadParam param = {open_ser, gOpt.getRemoteHosts().front()}; + params.push_back(param); + pthread_t receiverThread; + pthread_create(&receiverThread, NULL, receiver, &(params.back())); + pthread_detach(receiverThread); + int ret = sig.run(); return ret; diff --git a/anyrtpproxy/options.cpp b/anyrtpproxy/options.cpp index 6665f3d..6e7f291 100644 --- a/anyrtpproxy/options.cpp +++ b/anyrtpproxy/options.cpp @@ -56,6 +56,7 @@ Options::Options() username_ = "nobody"; chroot_dir_ = "/var/run"; daemonize_ = true; + send_port_ = 22220; local_interfaces_.push_back(IfListElement("0.0.0.0", 22221)); remote_hosts_.push_back(IfListElement("127.0.0.1", 22222)); } @@ -142,7 +143,7 @@ bool Options::parse(int argc, char* argv[]) PARSE_SCALAR_PARAM("-u","--user", username_) PARSE_SCALAR_PARAM("-c","--chroot-dir", chroot_dir_) PARSE_INVERSE_BOOL_PARAM("-d","--nodaemonize", daemonize_) - PARSE_SCALAR_PARAM("-c","--chroot-dir", chroot_dir_) + PARSE_SCALAR_PARAM("-p","--port", send_port_) PARSE_CSLIST_PARAM("-l","--listen", local_interfaces_) PARSE_CSLIST_PARAM("-r","--hosts", remote_hosts_) else @@ -168,8 +169,14 @@ bool Options::sanityCheck() void Options::printUsage() { std::cout << "USAGE:" << std::endl; - std::cout << "plain_tool [-h|--help] prints this..." << std::endl; -// std::cout << " [-K|--key] master key to use for encryption" << std::endl; + std::cout << "anyrtpproxy [-h|--help] prints this..." << std::endl; + std::cout << " [-t|--chroot] chroot and drop priviledges" << std::endl; + std::cout << " [-u|--username] in case of chroot run as this user" << std::endl; + std::cout << " [-c|--chroot-dir] directory to make a chroot to" << std::endl; + std::cout << " [-d|--nodaemonize] don't run in background" << std::endl; + std::cout << " [-p|--port] use this port to send out packets" << std::endl; + std::cout << " [-l|--listen] [,[: ..] a list of local interfaces to listen on" << std::endl; + std::cout << " [-r|--hosts] [,[: ..] a list of remote hosts to send duplicates to" << std::endl; } void Options::printOptions() @@ -180,6 +187,7 @@ void Options::printOptions() std::cout << "username='" << username_ << "'" << std::endl; std::cout << "chroot-dir='" << chroot_dir_ << "'" << std::endl; std::cout << "daemonize='" << daemonize_ << "'" << std::endl; + std::cout << "send-port='" << send_port_ << "'" << std::endl; std::cout << "local interfaces='"; IfList::const_iterator it=local_interfaces_.begin(); for(u_int32_t i=0; it != local_interfaces_.end(); ++it, ++i) @@ -264,6 +272,19 @@ Options& Options::setDaemonize(bool d) return *this; } +u_int16_t Options::getSendPort() +{ + Lock lock(mutex); + return send_port_; +} + +Options& Options::setSendPort(u_int16_t p) +{ + Lock lock(mutex); + send_port_ = p; + return *this; +} + IfList Options::getLocalInterfaces() { Lock lock(mutex); diff --git a/anyrtpproxy/options.h b/anyrtpproxy/options.h index 48df36a..48139f7 100644 --- a/anyrtpproxy/options.h +++ b/anyrtpproxy/options.h @@ -77,6 +77,8 @@ public: Options& setChrootDir(std::string c); bool getDaemonize(); Options& setDaemonize(bool d); + u_int16_t getSendPort(); + Options& setSendPort(u_int16_t p); IfList getLocalInterfaces(); IfList getRemoteHosts(); @@ -104,6 +106,7 @@ private: std::string username_; std::string chroot_dir_; bool daemonize_; + u_int16_t send_port_; IfList local_interfaces_; IfList remote_hosts_; }; -- cgit v1.2.3