summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--PracticalSocket.cpp42
-rw-r--r--PracticalSocket.h25
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp71
3 files changed, 110 insertions, 28 deletions
diff --git a/PracticalSocket.cpp b/PracticalSocket.cpp
index 5d39557..6f7b51c 100644
--- a/PracticalSocket.cpp
+++ b/PracticalSocket.cpp
@@ -62,6 +62,7 @@
#include <arpa/inet.h> // For inet_addr()
#include <unistd.h> // For close()
#include <netinet/in.h> // For sockaddr_in
+ #include <poll.h>
typedef void raw_type; // Type used for raw data on this platform
#endif
@@ -250,6 +251,24 @@ int CommunicatingSocket::recv(void *buffer, int bufferLen)
return rtn;
}
+int CommunicatingSocket::recvNonBlocking(void *buffer, int bufferLen, int timeOut)
+ throw(SocketException)
+{
+ struct pollfd pfd[1];
+ pfd[0].fd = sockDesc;
+ pfd[0].events = POLLIN;
+ int rtn = poll(pfd,1,timeOut);
+ if(rtn > 0) {
+ if ((rtn = ::recv(sockDesc, (raw_type *) buffer, bufferLen, 0)) < 0) {
+ throw SocketException("non blocking receive failed", true);
+ }
+ if(!rtn) {
+ throw SocketException("connection closed by peer", false);
+ }
+ }
+ return rtn;
+}
+
string CommunicatingSocket::getForeignAddress()
throw(SocketException) {
sockaddr_in addr;
@@ -399,6 +418,29 @@ int UDPSocket::recvFrom(void *buffer, int bufferLen, string &sourceAddress,
return rtn;
}
+int UDPSocket::recvFromNonBlocking(void *buffer, int bufferLen, string &sourceAddress,
+ unsigned short &sourcePort, int timeOut) throw(SocketException) {
+ sockaddr_in clntAddr;
+ socklen_t addrLen = sizeof(clntAddr);
+ struct pollfd pfd[1];
+ pfd[0].fd = sockDesc;
+ pfd[0].events = POLLIN;
+ int rtn = poll(pfd,1,timeOut);
+ if(rtn > 0) {
+ if ((rtn = recvfrom(sockDesc, (raw_type *) buffer, bufferLen, 0,
+ (sockaddr *) &clntAddr, (socklen_t *) &addrLen)) < 0) {
+ throw SocketException("Receive failed (recvfrom())", true);
+ }
+ if(!rtn) {
+ throw SocketException("connection closed by peer", false);
+ }
+ }
+ sourceAddress = inet_ntoa(clntAddr.sin_addr);
+ sourcePort = ntohs(clntAddr.sin_port);
+
+ return rtn;
+}
+
void UDPSocket::setMulticastTTL(unsigned char multicastTTL) throw(SocketException) {
if (setsockopt(sockDesc, IPPROTO_IP, IP_MULTICAST_TTL,
(raw_type *) &multicastTTL, sizeof(multicastTTL)) < 0) {
diff --git a/PracticalSocket.h b/PracticalSocket.h
index 0192cfd..58c6424 100644
--- a/PracticalSocket.h
+++ b/PracticalSocket.h
@@ -202,6 +202,17 @@ public:
int recv(void *buffer, int bufferLen) throw(SocketException);
/**
+ * Read into the given buffer up to bufferLen bytes data from this
+ * socket. Call connect() before recvNonBlocking().
+ * @param buffer buffer to receive the data
+ * @param bufferLen maximum number of bytes to read into buffer
+ * @param timeout timout in ms
+ * @return number of bytes read, 0 for timeout, and -1 for error
+ * @exception SocketException thrown if unable to receive data
+ */
+ int recvNonBlocking(void *buffer, int bufferLen, int timeout) throw(SocketException);
+
+ /**
* Get the foreign address. Call connect() before calling recv()
* @return foreign address
* @exception SocketException thrown if unable to fetch foreign address
@@ -348,6 +359,20 @@ public:
unsigned short &sourcePort) throw(SocketException);
/**
+ * Read read up to bufferLen bytes data from this socket. The given buffer
+ * is where the data will be placed
+ * @param buffer buffer to receive data
+ * @param bufferLen maximum number of bytes to receive
+ * @param sourceAddress address of datagram source
+ * @param sourcePort port of data source
+ * @param timeout int ms
+ * @return number of bytes received and -1 for error
+ * @exception SocketException thrown if unable to receive datagram
+ */
+ int recvFromNonBlocking(void *buffer, int bufferLen, string &sourceAddress,
+ unsigned short &sourcePort, int timeout) throw(SocketException);
+
+ /**
* Set the multicast TTL
* @param multicastTTL multicast TTL
* @exception SocketException thrown if unable to set TTL
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index cf5b69d..a0f94fa 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -75,20 +75,22 @@ public:
class ListenerThreadParam
{
public:
- ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d)
+ ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c),
+ dir_(d), running_(true)
{};
UDPSocket& sock1_;
UDPSocket& sock2_;
std::string call_id_;
int dir_;
+ bool running_;
};
void* listener(void* p)
{
ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
- cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
try
{
@@ -99,16 +101,21 @@ void* listener(void* p)
buf.setLength(MAX_PACKET_SIZE);
u_int32_t len=0;
if(param->dir_ == 1)
- len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
+ len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
else if(param->dir_ == 2)
- len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
+ len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
else break;
- buf.setLength(len);
RtpSession& session = gRtpSessionTable.getSession(param->call_id_);
- if(session.isDead())
+ if(session.isDead()) {
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting";
break;
+ }
+ if(!len)
+ continue;
+ buf.setLength(len);
+
if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) ||
(param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2())))
{
@@ -130,30 +137,30 @@ void* listener(void* p)
{
cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what();
}
- cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally";
-
+ param->running_ = false;
+ gCallIdQueue.push(param->call_id_);
pthread_exit(NULL);
}
class ListenerData
{
public:
- ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2)
+ ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2)
{};
UDPSocket* sock1_;
UDPSocket* sock2_;
- pthread_t threads1_;
- pthread_t threads2_;
- ListenerThreadParam params1_;
- ListenerThreadParam params2_;
+ pthread_t thread1_;
+ pthread_t thread2_;
+ ListenerThreadParam param1_;
+ ListenerThreadParam param2_;
};
void* listenerManager(void* dont_use_me)
{
try
{
- std::map<std::string, ListenerData> listenerMap;
+ std::map<std::string, ListenerData*> listenerMap;
while(1)
{
std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
@@ -163,27 +170,35 @@ void* listenerManager(void* dont_use_me)
if(!session.isComplete())
continue;
- std::map<std::string, ListenerData>::iterator it;
+ std::map<std::string, ListenerData*>::iterator it;
it = listenerMap.find(call_id);
if(it == listenerMap.end()) // listener Threads not existing yet
{
- cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
- << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
- << session.getLocalAddr() << ":" << session.getLocalPort2();
-
UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
- ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
- ListenerThreadParam(*sock1, *sock2, call_id, 2));
- ld.sock1_ = sock1;
- ld.sock2_ = sock2;
- pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
- pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
+ ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1),
+ ListenerThreadParam(*sock1, *sock2, call_id, 2));
+ ld->sock1_ = sock1;
+ ld->sock2_ = sock2;
+ pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_));
+ pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_));
- std::pair<std::map<std::string, ListenerData>::iterator, bool> ret;
- ret = listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
- it = ret.first;
+ std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
+ ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
+ continue;
+ }
+
+ if(!it->second->param1_.running_ && !it->second->param2_.running_)
+ {
+ cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
+ pthread_join(it->second->thread1_, NULL);
+ pthread_join(it->second->thread2_, NULL);
+ delete it->second->sock1_;
+ delete it->second->sock2_;
+ delete it->second;
+ listenerMap.erase(it);
+ gRtpSessionTable.delSession(call_id);
continue;
}
// TODO: reinit if session is changed or cleanup if it is daed