From 63a7daa4f27a83bf49ce7ffaf95b68c4a0e15231 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 28 Feb 2008 23:18:08 +0000 Subject: anyrtpproxy finished ?? --- anyrtpproxy/anyrtpproxy.cpp | 106 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 11 deletions(-) (limited to 'anyrtpproxy/anyrtpproxy.cpp') diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index 311dc4f..e856e16 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -16,17 +16,49 @@ #define MAX_PACKET_SIZE 1500 -void* worker(void* l) +class OpenSerHost { - IfListElement* interface = reinterpret_cast(l); +public: + OpenSerHost() : host_("",0) {}; + + IfListElement getHost() { + Lock lock(mutex); + return host_; + } + + void setHost(std::string host, u_int16_t port) + { + Lock lock(mutex); + if(host_.host_ != host || host_.port_ != port) + cLog.msg(Log::PRIO_NOTICE) << "openSer Host detected at " << host << ":" << port; + + host_.host_ = host; + host_.port_ = port; + } + +private: + Mutex mutex; + + IfListElement host_; +}; + +struct ThreadParam +{ + OpenSerHost& open_ser_; + IfListElement interface_; +}; + +void* sender(void* p) +{ + ThreadParam* param = reinterpret_cast(p); try { - UDPSocket recv_sock(interface->host_, interface->port_); - UDPSocket send_sock; - IfList remote_host_list = gOpt.getRemoteHosts(); + UDPSocket recv_sock(param->interface_.host_, param->interface_.port_); + UDPSocket send_sock(gOpt.getSendPort()); + IfList remote_host_list(gOpt.getRemoteHosts()); - cLog.msg(Log::PRIO_NOTICE) << "worker listening on: " << interface->toString(); + cLog.msg(Log::PRIO_NOTICE) << "sender listening on: " << param->interface_.toString(); Buffer buf(u_int32_t(MAX_PACKET_SIZE)); while(1) { @@ -37,6 +69,8 @@ void* worker(void* l) u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); buf.setLength(len); + param->open_ser_.setHost(remote_host, remote_port); + IfList::const_iterator it = remote_host_list.begin(); for(;it != remote_host_list.end(); it++) send_sock.sendTo(buf.getBuf(), buf.getLength(), it->host_, it->port_); @@ -44,12 +78,52 @@ void* worker(void* l) } catch(std::exception &e) { - cLog.msg(Log::PRIO_ERR) << "worker(" << interface->toString() << ") exiting because: " << e.what() << std::endl; + cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; } pthread_exit(NULL); } + +void* receiver(void* p) +{ + ThreadParam* param = reinterpret_cast(p); + + try + { + UDPSocket sock(gOpt.getSendPort()); + + cLog.msg(Log::PRIO_NOTICE) << "receiver listening for packets from: " << param->interface_.toString(); + + Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + while(1) { + string remote_host; + u_int16_t remote_port; + + buf.setLength(MAX_PACKET_SIZE); + u_int32_t len = sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); + buf.setLength(len); + + if(remote_host != param->interface_.host_ || remote_port != param->interface_.port_) + continue; + + IfListElement openSerHost = param->open_ser_.getHost(); + if(openSerHost.host_ == "" || !openSerHost.port_) + { + cLog.msg(Log::PRIO_NOTICE) << "no openser host detected till now, ignoring packet"; + continue; + } + + sock.sendTo(buf.getBuf(), buf.getLength(), openSerHost.host_, openSerHost.port_); + } + } + catch(std::exception &e) + { + cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; + } + pthread_exit(NULL); +} + void chrootAndDrop(string const& chrootdir, string const& username) { if (getuid() != 0) @@ -122,16 +196,26 @@ int main(int argc, char* argv[]) SignalController sig; sig.init(); - + OpenSerHost open_ser; + std::list params; IfList listeners(gOpt.getLocalInterfaces()); IfList::iterator it = listeners.begin(); for(;it != listeners.end();++it) { - pthread_t workerThread; - pthread_create(&workerThread, NULL, worker, static_cast(&(*it))); - pthread_detach(workerThread); + ThreadParam param = {open_ser, *it}; + params.push_back(param); + pthread_t senderThread; + pthread_create(&senderThread, NULL, sender, &(params.back())); + pthread_detach(senderThread); } + + ThreadParam param = {open_ser, gOpt.getRemoteHosts().front()}; + params.push_back(param); + pthread_t receiverThread; + pthread_create(&receiverThread, NULL, receiver, &(params.back())); + pthread_detach(receiverThread); + int ret = sig.run(); return ret; -- cgit v1.2.3