summaryrefslogtreecommitdiff
path: root/anyrtpproxy
diff options
context:
space:
mode:
Diffstat (limited to 'anyrtpproxy')
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp71
1 files changed, 43 insertions, 28 deletions
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index cf5b69d..a0f94fa 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -75,20 +75,22 @@ public:
class ListenerThreadParam
{
public:
- ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d)
+ ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c),
+ dir_(d), running_(true)
{};
UDPSocket& sock1_;
UDPSocket& sock2_;
std::string call_id_;
int dir_;
+ bool running_;
};
void* listener(void* p)
{
ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
- cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
try
{
@@ -99,16 +101,21 @@ void* listener(void* p)
buf.setLength(MAX_PACKET_SIZE);
u_int32_t len=0;
if(param->dir_ == 1)
- len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
+ len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
else if(param->dir_ == 2)
- len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
+ len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
else break;
- buf.setLength(len);
RtpSession& session = gRtpSessionTable.getSession(param->call_id_);
- if(session.isDead())
+ if(session.isDead()) {
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting";
break;
+ }
+ if(!len)
+ continue;
+ buf.setLength(len);
+
if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) ||
(param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2())))
{
@@ -130,30 +137,30 @@ void* listener(void* p)
{
cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what();
}
- cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally";
-
+ param->running_ = false;
+ gCallIdQueue.push(param->call_id_);
pthread_exit(NULL);
}
class ListenerData
{
public:
- ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2)
+ ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2)
{};
UDPSocket* sock1_;
UDPSocket* sock2_;
- pthread_t threads1_;
- pthread_t threads2_;
- ListenerThreadParam params1_;
- ListenerThreadParam params2_;
+ pthread_t thread1_;
+ pthread_t thread2_;
+ ListenerThreadParam param1_;
+ ListenerThreadParam param2_;
};
void* listenerManager(void* dont_use_me)
{
try
{
- std::map<std::string, ListenerData> listenerMap;
+ std::map<std::string, ListenerData*> listenerMap;
while(1)
{
std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
@@ -163,27 +170,35 @@ void* listenerManager(void* dont_use_me)
if(!session.isComplete())
continue;
- std::map<std::string, ListenerData>::iterator it;
+ std::map<std::string, ListenerData*>::iterator it;
it = listenerMap.find(call_id);
if(it == listenerMap.end()) // listener Threads not existing yet
{
- cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
- << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
- << session.getLocalAddr() << ":" << session.getLocalPort2();
-
UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
- ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
- ListenerThreadParam(*sock1, *sock2, call_id, 2));
- ld.sock1_ = sock1;
- ld.sock2_ = sock2;
- pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
- pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
+ ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1),
+ ListenerThreadParam(*sock1, *sock2, call_id, 2));
+ ld->sock1_ = sock1;
+ ld->sock2_ = sock2;
+ pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_));
+ pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_));
- std::pair<std::map<std::string, ListenerData>::iterator, bool> ret;
- ret = listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
- it = ret.first;
+ std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
+ ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
+ continue;
+ }
+
+ if(!it->second->param1_.running_ && !it->second->param2_.running_)
+ {
+ cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
+ pthread_join(it->second->thread1_, NULL);
+ pthread_join(it->second->thread2_, NULL);
+ delete it->second->sock1_;
+ delete it->second->sock2_;
+ delete it->second;
+ listenerMap.erase(it);
+ gRtpSessionTable.delSession(call_id);
continue;
}
// TODO: reinit if session is changed or cleanup if it is daed