summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-11-25 16:58:46 +0000
committerChristian Pointner <equinox@anytun.org>2008-11-25 16:58:46 +0000
commit0020c166f8c4aba4c3d64803009e764d7a8473c7 (patch)
treefd25edd86341b1c88038557b3b96363c926dc6bf /src
parentanyrtpproxy: commandhanlder uses now boost::asio and boost::thread (diff)
anyrtpproxy ported to boost threads and boost asio still some work
Diffstat (limited to 'src')
-rw-r--r--src/anyrtpproxy/anyrtpproxy.cpp183
-rw-r--r--src/anyrtpproxy/commandHandler.cpp2
-rw-r--r--src/anyrtpproxy/options.cpp2
3 files changed, 96 insertions, 91 deletions
diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp
index bbfd412..91cb0e9 100644
--- a/src/anyrtpproxy/anyrtpproxy.cpp
+++ b/src/anyrtpproxy/anyrtpproxy.cpp
@@ -31,6 +31,8 @@
#include <iostream>
+#include <boost/asio.hpp>
+
#include <fcntl.h>
#include <pwd.h>
#include <grp.h>
@@ -59,55 +61,29 @@
#define MAX_PACKET_SIZE 1500
+typedef boost::asio::ip::udp rtp_proto;
-class ThreadParam
-{
-public:
- ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
- : queue(queue_),connto(connto_)
- {};
- SyncQueue & queue;
- OptionConnectTo & connto;
-};
-
-class ListenerThreadParam
+void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
{
-public:
- ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d, SyncQueue& q) : sock1_(s1), sock2_(s2), call_id_(c),
- dir_(d), running_(true), queue_(q)
- {};
-
- UDPSocket& sock1_;
- UDPSocket& sock2_;
- std::string call_id_;
- int dir_;
- bool running_;
- SyncQueue& queue_;
-};
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
-void* listener(void* p)
-{
- ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
-
- cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
-
try
{
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
- string remote_addr;
- u_int16_t remote_port;
+ rtp_proto::endpoint remote_end;
+
while(1) {
buf.setLength(MAX_PACKET_SIZE);
u_int32_t len=0;
- if(param->dir_ == 1)
- len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
- else if(param->dir_ == 2)
- len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
+ if(dir == 1)
+ len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
+ else if(dir == 2)
+ len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
else break;
- RtpSession& session = gRtpSessionTable.getSession(param->call_id_);
+ RtpSession& session = gRtpSessionTable.getSession(call_id);
if(session.isDead()) {
- cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting";
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
break;
}
@@ -115,27 +91,23 @@ void* listener(void* p)
continue;
buf.setLength(len);
- if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) ||
- (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2())))
+// if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
+// (dir == 2 && remote_end != session.getRemoteEnd2()))
{
if(gOpt.getNat() ||
- (!gOpt.getNoNatOnce() && ((param->dir_ == 1 && !session.getSeen1()) ||
- (param->dir_ == 2 && !session.getSeen2()))))
+ (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
+ (dir == 2 && !session.getSeen2()))))
{
- cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") setting remote host to "
- << remote_addr << ":" << remote_port;
- if(param->dir_ == 1) {
- session.setRemotePort1(remote_port);
- session.setRemoteAddr1(remote_addr);
- }
- if(param->dir_ == 2) {
- session.setRemotePort2(remote_port);
- session.setRemoteAddr2(remote_addr);
- }
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
+ << remote_end;
+// if(dir == 1)
+// session.setRemoteEnd1(remote_end);
+// if(dir == 2)
+// session.setRemoteEnd2(remote_end);
if(!gOpt.getNat()) { // with nat enabled sync is not needed
- SyncCommand sc(param->call_id_);
- param->queue_.push(sc);
+ SyncCommand sc(call_id);
+ queue->push(sc);
}
}
else
@@ -144,37 +116,50 @@ void* listener(void* p)
session.setSeen1();
session.setSeen2();
- if(param->dir_ == 1)
- param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2());
- else if(param->dir_ == 2)
- param->sock1_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr1(), session.getRemotePort1());
- else break;
+// if(dir == 1)
+// sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
+// else if(dir == 2)
+// sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
+// else break;
}
}
catch(std::exception &e)
{
- cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what();
+ cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
}
- param->running_ = false;
- gCallIdQueue.push(param->call_id_);
- pthread_exit(NULL);
+ *running = false;
+ gCallIdQueue.push(call_id);
}
class ListenerData
{
public:
- ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2)
- {};
-
- UDPSocket* sock1_;
- UDPSocket* sock2_;
- pthread_t thread1_;
- pthread_t thread2_;
- ListenerThreadParam param1_;
- ListenerThreadParam param2_;
+ ListenerData()
+ {
+ ios1_ = new boost::asio::io_service();
+ sock1_ = new rtp_proto::socket(*ios1_);
+ ios2_ = new boost::asio::io_service();
+ sock2_ = new rtp_proto::socket(*ios2_);
+ }
+ ~ListenerData()
+ {
+ if(sock1_) delete sock1_;
+ if(ios1_) delete ios1_;
+ if(sock2_) delete sock2_;
+ if(ios2_) delete ios2_;
+ }
+
+ boost::asio::io_service* ios1_;
+ boost::asio::io_service* ios2_;
+ rtp_proto::socket* sock1_;
+ rtp_proto::socket* sock2_;
+ boost::thread* thread1_;
+ boost::thread* thread2_;
+ bool running1_;
+ bool running2_;
};
-void* listenerManager(void* p)
+void listenerManager(void* p)
{
SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
@@ -193,29 +178,40 @@ void* listenerManager(void* p)
std::map<std::string, ListenerData*>::iterator it;
it = listenerMap.find(call_id);
if(it == listenerMap.end()) // listener Threads not existing yet
- {
- UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
- UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
-
- ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1, *queue_),
- ListenerThreadParam(*sock1, *sock2, call_id, 2, *queue_));
- ld->sock1_ = sock1;
- ld->sock2_ = sock2;
- pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_));
- pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_));
-
+ {
+ ListenerData* ld = new ListenerData();
+
+ rtp_proto::resolver resolver1(*(ld->ios1_));
+ rtp_proto::resolver::query query1(session.getLocalAddr(), session.getLocalPort1());
+ rtp_proto::endpoint e1 = *resolver1.resolve(query1);
+ ld->sock1_->open(e1.protocol());
+ ld->sock1_->bind(e1);
+
+ rtp_proto::resolver resolver2(*(ld->ios2_));
+ rtp_proto::resolver::query query2(session.getLocalAddr(), session.getLocalPort2());
+ rtp_proto::endpoint e2 = *resolver2.resolve(query2);
+ ld->sock2_->open(e2.protocol());
+ ld->sock2_->bind(e2);
+
+ ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_)));
+ ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_)));
+
std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
continue;
}
- if(!it->second->param1_.running_ && !it->second->param2_.running_)
+ if(!it->second->running1_ && !it->second->running2_)
{
cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
- pthread_join(it->second->thread1_, NULL);
- pthread_join(it->second->thread2_, NULL);
- delete it->second->sock1_;
- delete it->second->sock2_;
+ if(it->second->thread1_) {
+ it->second->thread1_->join();
+ delete it->second->thread1_;
+ }
+ if(it->second->thread2_) {
+ it->second->thread2_->join();
+ delete it->second->thread2_;
+ }
delete it->second;
listenerMap.erase(it);
gRtpSessionTable.delSession(call_id);
@@ -230,7 +226,6 @@ void* listenerManager(void* p)
}
}
cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
- pthread_exit(NULL);
}
void chrootAndDrop(string const& chrootdir, string const& username)
@@ -286,6 +281,16 @@ void daemonize()
umask(027);
}
+class ThreadParam
+{
+public:
+ ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
+ : queue(queue_),connto(connto_)
+ {};
+ SyncQueue & queue;
+ OptionConnectTo & connto;
+};
+
void syncConnector(void* p)
{
ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
diff --git a/src/anyrtpproxy/commandHandler.cpp b/src/anyrtpproxy/commandHandler.cpp
index 495f6e6..c95d338 100644
--- a/src/anyrtpproxy/commandHandler.cpp
+++ b/src/anyrtpproxy/commandHandler.cpp
@@ -94,7 +94,7 @@ void CommandHandler::run(void* s)
self->control_sock_.send_to(boost::asio::buffer(ret.c_str(), ret.length()), remote_end);
}
}
- catch(SocketException &e)
+ catch(std::exception& e)
{
self->running_ = false;
}
diff --git a/src/anyrtpproxy/options.cpp b/src/anyrtpproxy/options.cpp
index e3f3850..fa0879e 100644
--- a/src/anyrtpproxy/options.cpp
+++ b/src/anyrtpproxy/options.cpp
@@ -191,7 +191,7 @@ bool Options::parse(int argc, char* argv[])
bool Options::sanityCheck()
{
- if(!control_interface_.port_) control_interface_.port_ = 22222;
+ if(control_interface_.port_ == "") control_interface_.port_ = "22222";
return true;
}