diff options
Diffstat (limited to 'anyrtpproxy/anyrtpproxy.cpp')
-rw-r--r-- | anyrtpproxy/anyrtpproxy.cpp | 121 |
1 files changed, 50 insertions, 71 deletions
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index e7be929..41cb643 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -16,44 +16,38 @@ #define MAX_PACKET_SIZE 1500 -class OpenSerHost +class ControlHost { public: - OpenSerHost() : host_("",0) {}; + ControlHost() : host_("",0) {}; - IfListElement getHost() { + Host getHost() { Lock lock(mutex); return host_; } - u_int16_t getLocalPort() { - Lock lock(mutex); - return local_port_; - } - - void setHost(std::string host, u_int16_t port, u_int16_t local_port) + void setHost(std::string addr, u_int16_t port) { Lock lock(mutex); - if(host_.host_ != host || host_.port_ != port) - cLog.msg(Log::PRIO_NOTICE) << "openSer Host detected at " << host << ":" << port - << " received at local port " << local_port; + if(host_.addr_ != addr || host_.port_ != port) + cLog.msg(Log::PRIO_NOTICE) << "control Host detected at " << addr << ":" << port; - host_.host_ = host; + host_.addr_ = addr; host_.port_ = port; - local_port_ = local_port; } private: Mutex mutex; - IfListElement host_; - u_int16_t local_port_; + Host host_; }; struct ThreadParam { - OpenSerHost& open_ser_; - IfListElement interface_; + ControlHost& control_; + UDPSocket& control_sock_; + UDPSocket& sock_; + Host first_receiver_; }; void* sender(void* p) @@ -62,31 +56,26 @@ void* sender(void* p) try { - UDPSocket recv_sock(param->interface_.host_, param->interface_.port_); - UDPSocket send_sock(gOpt.getSendPort()); - IfList remote_host_list(gOpt.getRemoteHosts()); - - cLog.msg(Log::PRIO_NOTICE) << "sender listening on: " << param->interface_.toString(); + HostList remote_host_list(gOpt.getRemoteHosts()); Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + string remote_host; + u_int16_t remote_port; while(1) { - string remote_host; - u_int16_t remote_port; - buf.setLength(MAX_PACKET_SIZE); - u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); + u_int32_t len = param->control_sock_.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); buf.setLength(len); - param->open_ser_.setHost(remote_host, remote_port, param->interface_.port_); + param->control_.setHost(remote_host, remote_port); - IfList::const_iterator it = remote_host_list.begin(); + HostList::const_iterator it = remote_host_list.begin(); for(;it != remote_host_list.end(); it++) - send_sock.sendTo(buf.getBuf(), buf.getLength(), it->host_, it->port_); + param->sock_.sendTo(buf.getBuf(), buf.getLength(), it->addr_, it->port_); } } catch(std::exception &e) { - cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; + cLog.msg(Log::PRIO_ERR) << "sender exiting because: " << e.what() << std::endl; } pthread_exit(NULL); } @@ -99,42 +88,31 @@ void* receiver(void* p) try { - UDPSocket recv_sock(gOpt.getSendPort()); - UDPSocket send_sock; - u_int16_t local_port = 0; - - cLog.msg(Log::PRIO_NOTICE) << "receiver listening for packets from: " << param->interface_.toString(); - Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + string remote_host; + u_int16_t remote_port; + while(1) { - string remote_host; - u_int16_t remote_port; - buf.setLength(MAX_PACKET_SIZE); - u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); + u_int32_t len = param->sock_.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); buf.setLength(len); - if(remote_host != param->interface_.host_ || remote_port != param->interface_.port_) + if(remote_host != param->first_receiver_.addr_ || remote_port != param->first_receiver_.port_) continue; - IfListElement openSerHost = param->open_ser_.getHost(); - if(openSerHost.host_ == "" || !openSerHost.port_) + Host control_host = param->control_.getHost(); + if(control_host.addr_ == "" || !control_host.port_) { - cLog.msg(Log::PRIO_NOTICE) << "no openser host detected till now, ignoring packet"; + cLog.msg(Log::PRIO_NOTICE) << "no control host detected till now, ignoring packet"; continue; } - if(local_port != param->open_ser_.getLocalPort()) - { - local_port = param->open_ser_.getLocalPort(); - send_sock.setLocalPort(local_port); - } - send_sock.sendTo(buf.getBuf(), buf.getLength(), openSerHost.host_, openSerHost.port_); + param->control_sock_.sendTo(buf.getBuf(), buf.getLength(), control_host.addr_, control_host.port_); } } catch(std::exception &e) { - cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl; + cLog.msg(Log::PRIO_ERR) << "receiver exiting because: " << e.what() << std::endl; } pthread_exit(NULL); } @@ -212,27 +190,28 @@ int main(int argc, char* argv[]) SignalController sig; sig.init(); - OpenSerHost open_ser; - std::list<ThreadParam> params; - IfList listeners(gOpt.getLocalInterfaces()); - IfList::iterator it = listeners.begin(); - for(;it != listeners.end();++it) - { - ThreadParam param = {open_ser, *it}; - params.push_back(param); + try { + ControlHost control_host; + UDPSocket control_sock(gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_); + UDPSocket sock(gOpt.getSendPort()); + + ThreadParam senderParam = {control_host, control_sock, sock, gOpt.getRemoteHosts().front()}; pthread_t senderThread; - pthread_create(&senderThread, NULL, sender, &(params.back())); + pthread_create(&senderThread, NULL, sender, &senderParam); pthread_detach(senderThread); - } - - ThreadParam param = {open_ser, gOpt.getRemoteHosts().front()}; - params.push_back(param); - pthread_t receiverThread; - pthread_create(&receiverThread, NULL, receiver, &(params.back())); - pthread_detach(receiverThread); + + ThreadParam receiverParam = {control_host, control_sock, sock, gOpt.getRemoteHosts().front()}; + pthread_t receiverThread; + pthread_create(&receiverThread, NULL, receiver, &receiverParam); + pthread_detach(receiverThread); - int ret = sig.run(); - - return ret; + int ret = sig.run(); + return ret; + } + catch(std::exception& e) + { + cLog.msg(Log::PRIO_ERR) << "an error occurred: " << e.what(); + return -1; + } } |