summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-11-25 17:49:34 +0000
committerChristian Pointner <equinox@anytun.org>2008-11-25 17:49:34 +0000
commit08958e48551df6ef742afaf1c868a16c00e950d9 (patch)
treea28d90898729fdfc91e8bb94ce9b881d85d7cb68
parentanyrtpproxy ported to boost threads and boost asio still some work (diff)
anyrtpproxy allmost finished
ToDo: - syncrhonisation does not work by now - fix recv non blocking issue
-rw-r--r--src/anyrtpproxy/anyrtpproxy.cpp54
-rw-r--r--src/anyrtpproxy/commandHandler.cpp48
-rw-r--r--src/rtpSession.cpp75
-rw-r--r--src/rtpSession.h68
4 files changed, 110 insertions, 135 deletions
diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp
index 91cb0e9..c96b086 100644
--- a/src/anyrtpproxy/anyrtpproxy.cpp
+++ b/src/anyrtpproxy/anyrtpproxy.cpp
@@ -61,16 +61,14 @@
#define MAX_PACKET_SIZE 1500
-typedef boost::asio::ip::udp rtp_proto;
-
-void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
+void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
{
cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
try
{
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
- rtp_proto::endpoint remote_end;
+ RtpSession::proto::endpoint remote_end;
while(1) {
buf.setLength(MAX_PACKET_SIZE);
@@ -91,8 +89,8 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca
continue;
buf.setLength(len);
-// if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
-// (dir == 2 && remote_end != session.getRemoteEnd2()))
+ if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
+ (dir == 2 && remote_end != session.getRemoteEnd2()))
{
if(gOpt.getNat() ||
(!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
@@ -100,11 +98,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca
{
cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
<< remote_end;
-// if(dir == 1)
-// session.setRemoteEnd1(remote_end);
-// if(dir == 2)
-// session.setRemoteEnd2(remote_end);
-
+ if(dir == 1)
+ session.setRemoteEnd1(remote_end);
+ if(dir == 2)
+ session.setRemoteEnd2(remote_end);
+
if(!gOpt.getNat()) { // with nat enabled sync is not needed
SyncCommand sc(call_id);
queue->push(sc);
@@ -116,11 +114,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca
session.setSeen1();
session.setSeen2();
-// if(dir == 1)
-// sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
-// else if(dir == 2)
-// sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
-// else break;
+ if(dir == 1)
+ sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
+ else if(dir == 2)
+ sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
+ else break;
}
}
catch(std::exception &e)
@@ -137,9 +135,9 @@ public:
ListenerData()
{
ios1_ = new boost::asio::io_service();
- sock1_ = new rtp_proto::socket(*ios1_);
+ sock1_ = new RtpSession::proto::socket(*ios1_);
ios2_ = new boost::asio::io_service();
- sock2_ = new rtp_proto::socket(*ios2_);
+ sock2_ = new RtpSession::proto::socket(*ios2_);
}
~ListenerData()
{
@@ -151,8 +149,8 @@ public:
boost::asio::io_service* ios1_;
boost::asio::io_service* ios2_;
- rtp_proto::socket* sock1_;
- rtp_proto::socket* sock2_;
+ RtpSession::proto::socket* sock1_;
+ RtpSession::proto::socket* sock2_;
boost::thread* thread1_;
boost::thread* thread2_;
bool running1_;
@@ -181,17 +179,11 @@ void listenerManager(void* p)
{
ListenerData* ld = new ListenerData();
- rtp_proto::resolver resolver1(*(ld->ios1_));
- rtp_proto::resolver::query query1(session.getLocalAddr(), session.getLocalPort1());
- rtp_proto::endpoint e1 = *resolver1.resolve(query1);
- ld->sock1_->open(e1.protocol());
- ld->sock1_->bind(e1);
-
- rtp_proto::resolver resolver2(*(ld->ios2_));
- rtp_proto::resolver::query query2(session.getLocalAddr(), session.getLocalPort2());
- rtp_proto::endpoint e2 = *resolver2.resolve(query2);
- ld->sock2_->open(e2.protocol());
- ld->sock2_->bind(e2);
+ ld->sock1_->open(session.getLocalEnd1().protocol());
+ ld->sock1_->bind(session.getLocalEnd1());
+
+ ld->sock2_->open(session.getLocalEnd2().protocol());
+ ld->sock2_->bind(session.getLocalEnd2());
ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_)));
ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_)));
diff --git a/src/anyrtpproxy/commandHandler.cpp b/src/anyrtpproxy/commandHandler.cpp
index c95d338..c8604d6 100644
--- a/src/anyrtpproxy/commandHandler.cpp
+++ b/src/anyrtpproxy/commandHandler.cpp
@@ -34,6 +34,7 @@
#include <iomanip>
#include <iostream>
+#include <sstream>
#include <boost/bind.hpp>
@@ -171,6 +172,7 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad
try
{
+ RtpSession::proto::resolver resolver(io_service_);
bool is_new;
RtpSession& session = gRtpSessionTable.getOrNewSession(call_id, is_new);
if(is_new)
@@ -183,22 +185,32 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad
if( port2) port_window_.freePort(port2);
throw std::runtime_error("no free port found");
}
-
- if(gOpt.getLocalAddr() == "")
- session.setLocalAddr("0.0.0.0");
- else
- session.setLocalAddr(gOpt.getLocalAddr());
- session.setLocalPort1(port1);
- session.setLocalPort2(port2);
+ std::stringstream ps1, ps2;
+ ps1 << port1;
+ ps2 << port2;
+
+ RtpSession::proto::endpoint e1, e2;
+ if(gOpt.getLocalAddr() == "") {
+ RtpSession::proto::resolver::query query1(ps1.str());
+ e1 = *resolver.resolve(query1);
+ RtpSession::proto::resolver::query query2(ps2.str());
+ e2 = *resolver.resolve(query2);
+ }
+ else {
+ RtpSession::proto::resolver::query query1(gOpt.getLocalAddr(),ps1.str());
+ e1 = *resolver.resolve(query1);
+ RtpSession::proto::resolver::query query2(gOpt.getLocalAddr(),ps2.str());
+ e2 = *resolver.resolve(query2);
+ }
+
+ session.setLocalEnd1(e1);
+ session.setLocalEnd2(e2);
}
- istringstream iss(port);
- u_int16_t rport;
- iss >> rport;
- session.setRemotePort1(rport);
- session.setRemoteAddr1(addr);
+ RtpSession::proto::resolver::query query(addr,port);
+ session.setRemoteEnd1(*resolver.resolve(query));
ostringstream oss;
- oss << session.getLocalPort2();
+ oss << session.getLocalEnd2().port();
return oss.str();
}
catch(std::exception& e)
@@ -215,17 +227,15 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a
try
{
RtpSession& session = gRtpSessionTable.getSession(call_id);
- istringstream iss(port);
- u_int16_t rport;
- iss >> rport;
- session.setRemotePort2(rport);
- session.setRemoteAddr2(addr);
+ RtpSession::proto::resolver resolver(io_service_);
+ RtpSession::proto::resolver::query query(addr,port);
+ session.setRemoteEnd2(*resolver.resolve(query));
session.isComplete(true);
SyncCommand sc(call_id);
queue_.push(sc);
ostringstream oss;
- oss << session.getLocalPort1();
+ oss << session.getLocalEnd1().port();
return oss.str();
}
catch(std::exception& e)
diff --git a/src/rtpSession.cpp b/src/rtpSession.cpp
index f87a216..b3e688b 100644
--- a/src/rtpSession.cpp
+++ b/src/rtpSession.cpp
@@ -34,8 +34,7 @@
#include "anyrtpproxy/callIdQueue.h"
RtpSession::RtpSession(const std::string& call_id) : in_sync_(false), call_id_(call_id) , dead_(false), complete_(false),
- local_addr_("") , local_port1_(0), local_port2_(0),
- remote_addr1_(""), remote_addr2_(""), remote_port1_(0), remote_port2_(0),seen1_(false), seen2_(false)
+ seen1_(false), seen2_(false)
{
}
@@ -68,20 +67,6 @@ bool RtpSession::isComplete(bool c)
return complete_ = c;
}
-std::string RtpSession::getLocalAddr()
-{
- Lock lock(mutex_);
- return local_addr_;
-}
-
-RtpSession& RtpSession::setLocalAddr(std::string a)
-{
- Lock lock(mutex_);
- in_sync_ = false;
- local_addr_ = a;
- return *this;
-}
-
bool RtpSession::getSeen1()
{
Lock lock(mutex_);
@@ -110,86 +95,60 @@ RtpSession& RtpSession::setSeen2()
return *this;
}
-u_int16_t RtpSession::getLocalPort1()
-{
- Lock lock(mutex_);
- return local_port1_;
-}
-
-RtpSession& RtpSession::setLocalPort1(u_int16_t p)
-{
- Lock lock(mutex_);
- in_sync_ = false;
- local_port1_ = p;
- return *this;
-}
-
-u_int16_t RtpSession::getLocalPort2()
+RtpSession::proto::endpoint RtpSession::getLocalEnd1()
{
Lock lock(mutex_);
- return local_port2_;
+ return local_end1_;
}
-RtpSession& RtpSession::setLocalPort2(u_int16_t p)
+RtpSession& RtpSession::setLocalEnd1(RtpSession::proto::endpoint e)
{
Lock lock(mutex_);
in_sync_ = false;
- local_port2_ = p;
+ local_end1_ = e;
return *this;
}
-u_int16_t RtpSession::getRemotePort1()
+RtpSession::proto::endpoint RtpSession::getLocalEnd2()
{
Lock lock(mutex_);
- return remote_port1_;
+ return local_end2_;
}
-RtpSession& RtpSession::setRemotePort1(u_int16_t p)
+RtpSession& RtpSession::setLocalEnd2(RtpSession::proto::endpoint e)
{
Lock lock(mutex_);
in_sync_ = false;
- remote_port1_ = p;
+ local_end2_ = e;
return *this;
}
-std::string RtpSession::getRemoteAddr1()
+RtpSession::proto::endpoint RtpSession::getRemoteEnd1()
{
Lock lock(mutex_);
- return remote_addr1_;
+ return remote_end1_;
}
-RtpSession& RtpSession::setRemoteAddr1(std::string a)
+RtpSession& RtpSession::setRemoteEnd1(RtpSession::proto::endpoint e)
{
Lock lock(mutex_);
in_sync_ = false;
- remote_addr1_ = a;
+ remote_end1_ = e;
return *this;
}
-u_int16_t RtpSession::getRemotePort2()
+RtpSession::proto::endpoint RtpSession::getRemoteEnd2()
{
Lock lock(mutex_);
- return remote_port2_;
+ return remote_end2_;
}
-RtpSession& RtpSession::setRemotePort2(u_int16_t p)
+RtpSession& RtpSession::setRemoteEnd2(RtpSession::proto::endpoint e)
{
Lock lock(mutex_);
in_sync_ = false;
- remote_port2_ = p;
+ remote_end2_ = e;
return *this;
}
-std::string RtpSession::getRemoteAddr2()
-{
- Lock lock(mutex_);
- return remote_addr2_;
-}
-RtpSession& RtpSession::setRemoteAddr2(std::string a)
-{
- Lock lock(mutex_);
- in_sync_ = false;
- remote_addr2_ = a;
- return *this;
-}
diff --git a/src/rtpSession.h b/src/rtpSession.h
index 5749ecd..1aec46f 100644
--- a/src/rtpSession.h
+++ b/src/rtpSession.h
@@ -32,6 +32,8 @@
#ifndef _RTPSESSION_H_
#define _RTPSESSION_H_
+#include <boost/asio.hpp>
+
#include "threadUtils.hpp"
#include <boost/archive/text_oarchive.hpp>
@@ -40,6 +42,8 @@
class RtpSession
{
public:
+ typedef boost::asio::ip::udp proto;
+
RtpSession(const std::string& call_id);
bool isDead();
@@ -48,23 +52,15 @@ public:
bool isComplete();
bool isComplete(bool c);
- std::string getLocalAddr();
- RtpSession& setLocalAddr(std::string a);
- u_int16_t getLocalPort1();
- RtpSession& setLocalPort1(u_int16_t p);
- u_int16_t getLocalPort2();
- RtpSession& setLocalPort2(u_int16_t p);
-
-
- u_int16_t getRemotePort1();
- RtpSession& setRemotePort1(u_int16_t p);
- std::string getRemoteAddr1();
- RtpSession& setRemoteAddr1(std::string a);
+ proto::endpoint getLocalEnd1();
+ RtpSession& setLocalEnd1(proto::endpoint e);
+ proto::endpoint getLocalEnd2();
+ RtpSession& setLocalEnd2(proto::endpoint e);
- u_int16_t getRemotePort2();
- RtpSession& setRemotePort2(u_int16_t p);
- std::string getRemoteAddr2();
- RtpSession& setRemoteAddr2(std::string a);
+ proto::endpoint getRemoteEnd1();
+ RtpSession& setRemoteEnd1(proto::endpoint e);
+ proto::endpoint getRemoteEnd2();
+ RtpSession& setRemoteEnd2(proto::endpoint e);
RtpSession& setSeen1();
bool getSeen1();
@@ -84,18 +80,38 @@ private:
{
Lock lock(mutex_);
+ // address of local_end1 and local_end2 are always equal
+ std::string local_addr(local_end1_.address().to_string());
+ u_int16_t local_port1 = local_end1_.port();
+ u_int16_t local_port2 = local_end2_.port();
+
+ std::string remote_addr1(remote_end1_.address().to_string());
+ u_int16_t remote_port1 = remote_end1_.port();
+ std::string remote_addr2(remote_end2_.address().to_string());
+ u_int16_t remote_port2 = remote_end2_.port();
+
ar & dead_;
ar & complete_;
- ar & local_addr_;
- ar & local_port1_;
- ar & local_port2_;
- ar & remote_addr1_;
- ar & remote_port1_;
- ar & remote_addr2_;
- ar & remote_port2_;
+ ar & local_addr;
+ ar & local_port1;
+ ar & local_port2;
+ ar & remote_addr1;
+ ar & remote_port1;
+ ar & remote_addr2;
+ ar & remote_port2;
ar & seen1_;
ar & seen2_;
+ proto::endpoint local_end1(boost::asio::ip::address::from_string(local_addr), local_port1);
+ local_end1_ = local_end1;
+ proto::endpoint local_end2(boost::asio::ip::address::from_string(local_addr), local_port2);
+ local_end2_ = local_end2;
+
+ proto::endpoint remote_end1(boost::asio::ip::address::from_string(remote_addr1), remote_port1);
+ remote_end1_ = remote_end1;
+ proto::endpoint remote_end2(boost::asio::ip::address::from_string(remote_addr2), remote_port2);
+ remote_end2_ = remote_end2;
+
if(complete_ && !dead_)
reinit();
@@ -108,10 +124,8 @@ private:
const std::string& call_id_;
bool dead_;
bool complete_;
- std::string local_addr_;
- u_int16_t local_port1_, local_port2_;
- std::string remote_addr1_, remote_addr2_;
- u_int16_t remote_port1_, remote_port2_;
+ proto::endpoint local_end1_, local_end2_;
+ proto::endpoint remote_end1_, remote_end2_;
bool seen1_,seen2_; //has at least 1 packet been recieved?
};