From 877044dc40ee5482cc56444ba9bcd55e50a8745d Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 18 Mar 2008 18:45:46 +0000 Subject: anyrtpproxy cleanup works now --- anyrtpproxy/anyrtpproxy.cpp | 71 +++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 28 deletions(-) (limited to 'anyrtpproxy/anyrtpproxy.cpp') diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index cf5b69d..a0f94fa 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -75,20 +75,22 @@ public: class ListenerThreadParam { public: - ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d) + ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), + dir_(d), running_(true) {}; UDPSocket& sock1_; UDPSocket& sock2_; std::string call_id_; int dir_; + bool running_; }; void* listener(void* p) { ListenerThreadParam* param = reinterpret_cast(p); - cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; + cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; try { @@ -99,16 +101,21 @@ void* listener(void* p) buf.setLength(MAX_PACKET_SIZE); u_int32_t len=0; if(param->dir_ == 1) - len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port); + len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000); else if(param->dir_ == 2) - len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port); + len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000); else break; - buf.setLength(len); RtpSession& session = gRtpSessionTable.getSession(param->call_id_); - if(session.isDead()) + if(session.isDead()) { + cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting"; break; + } + if(!len) + continue; + buf.setLength(len); + if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) || (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2()))) { @@ -130,30 +137,30 @@ void* listener(void* p) { cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what(); } - cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally"; - + param->running_ = false; + gCallIdQueue.push(param->call_id_); pthread_exit(NULL); } class ListenerData { public: - ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2) + ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2) {}; UDPSocket* sock1_; UDPSocket* sock2_; - pthread_t threads1_; - pthread_t threads2_; - ListenerThreadParam params1_; - ListenerThreadParam params2_; + pthread_t thread1_; + pthread_t thread2_; + ListenerThreadParam param1_; + ListenerThreadParam param2_; }; void* listenerManager(void* dont_use_me) { try { - std::map listenerMap; + std::map listenerMap; while(1) { std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id @@ -163,27 +170,35 @@ void* listenerManager(void* dont_use_me) if(!session.isComplete()) continue; - std::map::iterator it; + std::map::iterator it; it = listenerMap.find(call_id); if(it == listenerMap.end()) // listener Threads not existing yet { - cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: " - << session.getLocalAddr() << ":" << session.getLocalPort1() << " " - << session.getLocalAddr() << ":" << session.getLocalPort2(); - UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); - ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1), - ListenerThreadParam(*sock1, *sock2, call_id, 2)); - ld.sock1_ = sock1; - ld.sock2_ = sock2; - pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_)); - pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_)); + ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1), + ListenerThreadParam(*sock1, *sock2, call_id, 2)); + ld->sock1_ = sock1; + ld->sock2_ = sock2; + pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_)); + pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_)); - std::pair::iterator, bool> ret; - ret = listenerMap.insert(std::map::value_type(call_id, ld)); - it = ret.first; + std::pair::iterator, bool> ret; + ret = listenerMap.insert(std::map::value_type(call_id, ld)); + continue; + } + + if(!it->second->param1_.running_ && !it->second->param2_.running_) + { + cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up"; + pthread_join(it->second->thread1_, NULL); + pthread_join(it->second->thread2_, NULL); + delete it->second->sock1_; + delete it->second->sock2_; + delete it->second; + listenerMap.erase(it); + gRtpSessionTable.delSession(call_id); continue; } // TODO: reinit if session is changed or cleanup if it is daed -- cgit v1.2.3