summaryrefslogtreecommitdiff
path: root/src/anyrtpproxy
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-11-25 17:49:34 +0000
committerChristian Pointner <equinox@anytun.org>2008-11-25 17:49:34 +0000
commit08958e48551df6ef742afaf1c868a16c00e950d9 (patch)
treea28d90898729fdfc91e8bb94ce9b881d85d7cb68 /src/anyrtpproxy
parentanyrtpproxy ported to boost threads and boost asio still some work (diff)
anyrtpproxy allmost finished
ToDo: - syncrhonisation does not work by now - fix recv non blocking issue
Diffstat (limited to 'src/anyrtpproxy')
-rw-r--r--src/anyrtpproxy/anyrtpproxy.cpp54
-rw-r--r--src/anyrtpproxy/commandHandler.cpp48
2 files changed, 52 insertions, 50 deletions
diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp
index 91cb0e9..c96b086 100644
--- a/src/anyrtpproxy/anyrtpproxy.cpp
+++ b/src/anyrtpproxy/anyrtpproxy.cpp
@@ -61,16 +61,14 @@
#define MAX_PACKET_SIZE 1500
-typedef boost::asio::ip::udp rtp_proto;
-
-void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
+void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
{
cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
try
{
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
- rtp_proto::endpoint remote_end;
+ RtpSession::proto::endpoint remote_end;
while(1) {
buf.setLength(MAX_PACKET_SIZE);
@@ -91,8 +89,8 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca
continue;
buf.setLength(len);
-// if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
-// (dir == 2 && remote_end != session.getRemoteEnd2()))
+ if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
+ (dir == 2 && remote_end != session.getRemoteEnd2()))
{
if(gOpt.getNat() ||
(!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
@@ -100,11 +98,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca
{
cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
<< remote_end;
-// if(dir == 1)
-// session.setRemoteEnd1(remote_end);
-// if(dir == 2)
-// session.setRemoteEnd2(remote_end);
-
+ if(dir == 1)
+ session.setRemoteEnd1(remote_end);
+ if(dir == 2)
+ session.setRemoteEnd2(remote_end);
+
if(!gOpt.getNat()) { // with nat enabled sync is not needed
SyncCommand sc(call_id);
queue->push(sc);
@@ -116,11 +114,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca
session.setSeen1();
session.setSeen2();
-// if(dir == 1)
-// sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
-// else if(dir == 2)
-// sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
-// else break;
+ if(dir == 1)
+ sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
+ else if(dir == 2)
+ sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
+ else break;
}
}
catch(std::exception &e)
@@ -137,9 +135,9 @@ public:
ListenerData()
{
ios1_ = new boost::asio::io_service();
- sock1_ = new rtp_proto::socket(*ios1_);
+ sock1_ = new RtpSession::proto::socket(*ios1_);
ios2_ = new boost::asio::io_service();
- sock2_ = new rtp_proto::socket(*ios2_);
+ sock2_ = new RtpSession::proto::socket(*ios2_);
}
~ListenerData()
{
@@ -151,8 +149,8 @@ public:
boost::asio::io_service* ios1_;
boost::asio::io_service* ios2_;
- rtp_proto::socket* sock1_;
- rtp_proto::socket* sock2_;
+ RtpSession::proto::socket* sock1_;
+ RtpSession::proto::socket* sock2_;
boost::thread* thread1_;
boost::thread* thread2_;
bool running1_;
@@ -181,17 +179,11 @@ void listenerManager(void* p)
{
ListenerData* ld = new ListenerData();
- rtp_proto::resolver resolver1(*(ld->ios1_));
- rtp_proto::resolver::query query1(session.getLocalAddr(), session.getLocalPort1());
- rtp_proto::endpoint e1 = *resolver1.resolve(query1);
- ld->sock1_->open(e1.protocol());
- ld->sock1_->bind(e1);
-
- rtp_proto::resolver resolver2(*(ld->ios2_));
- rtp_proto::resolver::query query2(session.getLocalAddr(), session.getLocalPort2());
- rtp_proto::endpoint e2 = *resolver2.resolve(query2);
- ld->sock2_->open(e2.protocol());
- ld->sock2_->bind(e2);
+ ld->sock1_->open(session.getLocalEnd1().protocol());
+ ld->sock1_->bind(session.getLocalEnd1());
+
+ ld->sock2_->open(session.getLocalEnd2().protocol());
+ ld->sock2_->bind(session.getLocalEnd2());
ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_)));
ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_)));
diff --git a/src/anyrtpproxy/commandHandler.cpp b/src/anyrtpproxy/commandHandler.cpp
index c95d338..c8604d6 100644
--- a/src/anyrtpproxy/commandHandler.cpp
+++ b/src/anyrtpproxy/commandHandler.cpp
@@ -34,6 +34,7 @@
#include <iomanip>
#include <iostream>
+#include <sstream>
#include <boost/bind.hpp>
@@ -171,6 +172,7 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad
try
{
+ RtpSession::proto::resolver resolver(io_service_);
bool is_new;
RtpSession& session = gRtpSessionTable.getOrNewSession(call_id, is_new);
if(is_new)
@@ -183,22 +185,32 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad
if( port2) port_window_.freePort(port2);
throw std::runtime_error("no free port found");
}
-
- if(gOpt.getLocalAddr() == "")
- session.setLocalAddr("0.0.0.0");
- else
- session.setLocalAddr(gOpt.getLocalAddr());
- session.setLocalPort1(port1);
- session.setLocalPort2(port2);
+ std::stringstream ps1, ps2;
+ ps1 << port1;
+ ps2 << port2;
+
+ RtpSession::proto::endpoint e1, e2;
+ if(gOpt.getLocalAddr() == "") {
+ RtpSession::proto::resolver::query query1(ps1.str());
+ e1 = *resolver.resolve(query1);
+ RtpSession::proto::resolver::query query2(ps2.str());
+ e2 = *resolver.resolve(query2);
+ }
+ else {
+ RtpSession::proto::resolver::query query1(gOpt.getLocalAddr(),ps1.str());
+ e1 = *resolver.resolve(query1);
+ RtpSession::proto::resolver::query query2(gOpt.getLocalAddr(),ps2.str());
+ e2 = *resolver.resolve(query2);
+ }
+
+ session.setLocalEnd1(e1);
+ session.setLocalEnd2(e2);
}
- istringstream iss(port);
- u_int16_t rport;
- iss >> rport;
- session.setRemotePort1(rport);
- session.setRemoteAddr1(addr);
+ RtpSession::proto::resolver::query query(addr,port);
+ session.setRemoteEnd1(*resolver.resolve(query));
ostringstream oss;
- oss << session.getLocalPort2();
+ oss << session.getLocalEnd2().port();
return oss.str();
}
catch(std::exception& e)
@@ -215,17 +227,15 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a
try
{
RtpSession& session = gRtpSessionTable.getSession(call_id);
- istringstream iss(port);
- u_int16_t rport;
- iss >> rport;
- session.setRemotePort2(rport);
- session.setRemoteAddr2(addr);
+ RtpSession::proto::resolver resolver(io_service_);
+ RtpSession::proto::resolver::query query(addr,port);
+ session.setRemoteEnd2(*resolver.resolve(query));
session.isComplete(true);
SyncCommand sc(call_id);
queue_.push(sc);
ostringstream oss;
- oss << session.getLocalPort1();
+ oss << session.getLocalEnd1().port();
return oss.str();
}
catch(std::exception& e)