From cc85b6841709aa7a9912a1974689d9625258e2d6 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 18 Mar 2008 21:08:56 +0000 Subject: added nat detection to anyrtpproxy --- anyrtpproxy/anyrtpproxy.cpp | 60 +++++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index ac2670a..7b2ef81 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -75,8 +75,8 @@ public: class ListenerThreadParam { public: - ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), - dir_(d), running_(true) + ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d, SyncQueue& q) : sock1_(s1), sock2_(s2), call_id_(c), + dir_(d), running_(true), queue_(q) {}; UDPSocket& sock1_; @@ -84,6 +84,7 @@ public: std::string call_id_; int dir_; bool running_; + SyncQueue& queue_; }; void* listener(void* p) @@ -119,9 +120,31 @@ void* listener(void* p) if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) || (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2()))) { - //TODO: if weak? don't check but save the new(?) remote addr into list - continue; + if(gOpt.getNat() || + (!gOpt.getNoNatOnce() && ((param->dir_ == 1 && session.getSeen1()) || + (param->dir_ == 2 && session.getSeen2())))) + { + cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") setting remote host to" + << remote_addr << ":" << remote_port; + if(param->dir_ == 1) { + session.setRemotePort1(remote_port); + session.setRemoteAddr1(remote_addr); + } + if(param->dir_ == 2) { + session.setRemotePort2(remote_port); + session.setRemoteAddr2(remote_addr); + } + + if(!gOpt.getNat()) { // with nat enabled sync is not needed + SyncCommand sc(param->call_id_); + param->queue_.push(sc); + } + } + else + continue; } + session.setSeen1(); + session.setSeen2(); if(param->dir_ == 1) param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2()); @@ -153,12 +176,14 @@ public: ListenerThreadParam param2_; }; -void* listenerManager(void* dont_use_me) +void* listenerManager(void* p) { - try + SyncQueue* queue_ = reinterpret_cast(p); + + std::map listenerMap; + while(1) { - std::map listenerMap; - while(1) + try { std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id gCallIdQueue.pop(); @@ -174,8 +199,8 @@ void* listenerManager(void* dont_use_me) UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); - ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1), - ListenerThreadParam(*sock1, *sock2, call_id, 2)); + ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1, *queue_), + ListenerThreadParam(*sock1, *sock2, call_id, 2, *queue_)); ld->sock1_ = sock1; ld->sock2_ = sock2; pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_)); @@ -200,11 +225,13 @@ void* listenerManager(void* dont_use_me) } // TODO: reinit if session changed } + catch(std::exception &e) + { + cLog.msg(Log::PRIO_ERR) << "listenerManager restarting because: " << e.what(); + usleep(1000); + } } - catch(std::exception &e) - { - cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because: " << e.what(); - } + cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason"; pthread_exit(NULL); } @@ -315,13 +342,14 @@ int main(int argc, char* argv[]) SignalController sig; sig.init(); + SyncQueue queue; + pthread_t listenerManagerThread; - pthread_create(&listenerManagerThread, NULL, listenerManager, NULL); + pthread_create(&listenerManagerThread, NULL, listenerManager, &queue); pthread_detach(listenerManagerThread); pthread_t syncListenerThread; - SyncQueue queue; ConnectToList connect_to = gOpt.getConnectTo(); ThreadParam p( queue,*(new OptionConnectTo())); if ( gOpt.getLocalSyncPort()) -- cgit v1.2.3