From dc3e77c4284b84b71ddabf1a813b18224b775217 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 17 Mar 2008 18:48:16 +0000 Subject: finally added callIdQueue (sorry) threads get started @ sync now TODO: cleanup threads and session on daed --- anyrtpproxy/anyrtpproxy.cpp | 52 +++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 21 deletions(-) (limited to 'anyrtpproxy/anyrtpproxy.cpp') diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index a308308..2b6184e 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -86,9 +86,9 @@ public: void* listener(void* p) { ListenerThreadParam* param = reinterpret_cast(p); - + cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; - + try { Buffer buf(u_int32_t(MAX_PACKET_SIZE)); @@ -132,7 +132,7 @@ class ListenerData public: ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2) {}; - + UDPSocket* sock1_; UDPSocket* sock2_; pthread_t threads1_; @@ -148,28 +148,38 @@ void* listenerManager(void* dont_use_me) std::map listenerMap; while(1) { - std::string call_id = gCallIdQueue.front(); // waits for semphor and returns next call_id + std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id gCallIdQueue.pop(); RtpSession& session = gRtpSessionTable.getSession(call_id); + if(!session.isComplete()) + continue; - - cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: " - << session.getLocalAddr() << ":" << session.getLocalPort1() << " " - << session.getLocalAddr() << ":" << session.getLocalPort2(); - - UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); - UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); - - ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1), - ListenerThreadParam(*sock1, *sock2, call_id, 2)); - ld.sock1_ = sock1; - ld.sock2_ = sock2; - pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_)); - pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_)); - - listenerMap.insert(std::map::value_type(call_id, ld)); - } + std::map::iterator it; + it = listenerMap.find(call_id); + if(it == listenerMap.end()) // listener Threads not existing yet + { + cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: " + << session.getLocalAddr() << ":" << session.getLocalPort1() << " " + << session.getLocalAddr() << ":" << session.getLocalPort2(); + + UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); + UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); + + ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1), + ListenerThreadParam(*sock1, *sock2, call_id, 2)); + ld.sock1_ = sock1; + ld.sock2_ = sock2; + pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_)); + pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_)); + + std::pair::iterator, bool> ret; + ret = listenerMap.insert(std::map::value_type(call_id, ld)); + it = ret.first; + continue; + } + // TODO: reinit if session is changed or cleanup if it is daed + } } catch(std::exception &e) { -- cgit v1.2.3