From 08958e48551df6ef742afaf1c868a16c00e950d9 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 25 Nov 2008 17:49:34 +0000 Subject: anyrtpproxy allmost finished ToDo: - syncrhonisation does not work by now - fix recv non blocking issue --- src/anyrtpproxy/anyrtpproxy.cpp | 54 ++++++++++++--------------- src/anyrtpproxy/commandHandler.cpp | 48 ++++++++++++++---------- src/rtpSession.cpp | 75 +++++++++----------------------------- src/rtpSession.h | 68 ++++++++++++++++++++-------------- 4 files changed, 110 insertions(+), 135 deletions(-) diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp index 91cb0e9..c96b086 100644 --- a/src/anyrtpproxy/anyrtpproxy.cpp +++ b/src/anyrtpproxy/anyrtpproxy.cpp @@ -61,16 +61,14 @@ #define MAX_PACKET_SIZE 1500 -typedef boost::asio::ip::udp rtp_proto; - -void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running) +void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running) { cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started"; try { Buffer buf(u_int32_t(MAX_PACKET_SIZE)); - rtp_proto::endpoint remote_end; + RtpSession::proto::endpoint remote_end; while(1) { buf.setLength(MAX_PACKET_SIZE); @@ -91,8 +89,8 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca continue; buf.setLength(len); -// if((dir == 1 && remote_end != session.getRemoteEnd1()) || -// (dir == 2 && remote_end != session.getRemoteEnd2())) + if((dir == 1 && remote_end != session.getRemoteEnd1()) || + (dir == 2 && remote_end != session.getRemoteEnd2())) { if(gOpt.getNat() || (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || @@ -100,11 +98,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca { cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to " << remote_end; -// if(dir == 1) -// session.setRemoteEnd1(remote_end); -// if(dir == 2) -// session.setRemoteEnd2(remote_end); - + if(dir == 1) + session.setRemoteEnd1(remote_end); + if(dir == 2) + session.setRemoteEnd2(remote_end); + if(!gOpt.getNat()) { // with nat enabled sync is not needed SyncCommand sc(call_id); queue->push(sc); @@ -116,11 +114,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca session.setSeen1(); session.setSeen2(); -// if(dir == 1) -// sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2()); -// else if(dir == 2) -// sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1()); -// else break; + if(dir == 1) + sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2()); + else if(dir == 2) + sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1()); + else break; } } catch(std::exception &e) @@ -137,9 +135,9 @@ public: ListenerData() { ios1_ = new boost::asio::io_service(); - sock1_ = new rtp_proto::socket(*ios1_); + sock1_ = new RtpSession::proto::socket(*ios1_); ios2_ = new boost::asio::io_service(); - sock2_ = new rtp_proto::socket(*ios2_); + sock2_ = new RtpSession::proto::socket(*ios2_); } ~ListenerData() { @@ -151,8 +149,8 @@ public: boost::asio::io_service* ios1_; boost::asio::io_service* ios2_; - rtp_proto::socket* sock1_; - rtp_proto::socket* sock2_; + RtpSession::proto::socket* sock1_; + RtpSession::proto::socket* sock2_; boost::thread* thread1_; boost::thread* thread2_; bool running1_; @@ -181,17 +179,11 @@ void listenerManager(void* p) { ListenerData* ld = new ListenerData(); - rtp_proto::resolver resolver1(*(ld->ios1_)); - rtp_proto::resolver::query query1(session.getLocalAddr(), session.getLocalPort1()); - rtp_proto::endpoint e1 = *resolver1.resolve(query1); - ld->sock1_->open(e1.protocol()); - ld->sock1_->bind(e1); - - rtp_proto::resolver resolver2(*(ld->ios2_)); - rtp_proto::resolver::query query2(session.getLocalAddr(), session.getLocalPort2()); - rtp_proto::endpoint e2 = *resolver2.resolve(query2); - ld->sock2_->open(e2.protocol()); - ld->sock2_->bind(e2); + ld->sock1_->open(session.getLocalEnd1().protocol()); + ld->sock1_->bind(session.getLocalEnd1()); + + ld->sock2_->open(session.getLocalEnd2().protocol()); + ld->sock2_->bind(session.getLocalEnd2()); ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_))); ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_))); diff --git a/src/anyrtpproxy/commandHandler.cpp b/src/anyrtpproxy/commandHandler.cpp index c95d338..c8604d6 100644 --- a/src/anyrtpproxy/commandHandler.cpp +++ b/src/anyrtpproxy/commandHandler.cpp @@ -34,6 +34,7 @@ #include #include +#include #include @@ -171,6 +172,7 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad try { + RtpSession::proto::resolver resolver(io_service_); bool is_new; RtpSession& session = gRtpSessionTable.getOrNewSession(call_id, is_new); if(is_new) @@ -183,22 +185,32 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad if( port2) port_window_.freePort(port2); throw std::runtime_error("no free port found"); } - - if(gOpt.getLocalAddr() == "") - session.setLocalAddr("0.0.0.0"); - else - session.setLocalAddr(gOpt.getLocalAddr()); - session.setLocalPort1(port1); - session.setLocalPort2(port2); + std::stringstream ps1, ps2; + ps1 << port1; + ps2 << port2; + + RtpSession::proto::endpoint e1, e2; + if(gOpt.getLocalAddr() == "") { + RtpSession::proto::resolver::query query1(ps1.str()); + e1 = *resolver.resolve(query1); + RtpSession::proto::resolver::query query2(ps2.str()); + e2 = *resolver.resolve(query2); + } + else { + RtpSession::proto::resolver::query query1(gOpt.getLocalAddr(),ps1.str()); + e1 = *resolver.resolve(query1); + RtpSession::proto::resolver::query query2(gOpt.getLocalAddr(),ps2.str()); + e2 = *resolver.resolve(query2); + } + + session.setLocalEnd1(e1); + session.setLocalEnd2(e2); } - istringstream iss(port); - u_int16_t rport; - iss >> rport; - session.setRemotePort1(rport); - session.setRemoteAddr1(addr); + RtpSession::proto::resolver::query query(addr,port); + session.setRemoteEnd1(*resolver.resolve(query)); ostringstream oss; - oss << session.getLocalPort2(); + oss << session.getLocalEnd2().port(); return oss.str(); } catch(std::exception& e) @@ -215,17 +227,15 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a try { RtpSession& session = gRtpSessionTable.getSession(call_id); - istringstream iss(port); - u_int16_t rport; - iss >> rport; - session.setRemotePort2(rport); - session.setRemoteAddr2(addr); + RtpSession::proto::resolver resolver(io_service_); + RtpSession::proto::resolver::query query(addr,port); + session.setRemoteEnd2(*resolver.resolve(query)); session.isComplete(true); SyncCommand sc(call_id); queue_.push(sc); ostringstream oss; - oss << session.getLocalPort1(); + oss << session.getLocalEnd1().port(); return oss.str(); } catch(std::exception& e) diff --git a/src/rtpSession.cpp b/src/rtpSession.cpp index f87a216..b3e688b 100644 --- a/src/rtpSession.cpp +++ b/src/rtpSession.cpp @@ -34,8 +34,7 @@ #include "anyrtpproxy/callIdQueue.h" RtpSession::RtpSession(const std::string& call_id) : in_sync_(false), call_id_(call_id) , dead_(false), complete_(false), - local_addr_("") , local_port1_(0), local_port2_(0), - remote_addr1_(""), remote_addr2_(""), remote_port1_(0), remote_port2_(0),seen1_(false), seen2_(false) + seen1_(false), seen2_(false) { } @@ -68,20 +67,6 @@ bool RtpSession::isComplete(bool c) return complete_ = c; } -std::string RtpSession::getLocalAddr() -{ - Lock lock(mutex_); - return local_addr_; -} - -RtpSession& RtpSession::setLocalAddr(std::string a) -{ - Lock lock(mutex_); - in_sync_ = false; - local_addr_ = a; - return *this; -} - bool RtpSession::getSeen1() { Lock lock(mutex_); @@ -110,86 +95,60 @@ RtpSession& RtpSession::setSeen2() return *this; } -u_int16_t RtpSession::getLocalPort1() -{ - Lock lock(mutex_); - return local_port1_; -} - -RtpSession& RtpSession::setLocalPort1(u_int16_t p) -{ - Lock lock(mutex_); - in_sync_ = false; - local_port1_ = p; - return *this; -} - -u_int16_t RtpSession::getLocalPort2() +RtpSession::proto::endpoint RtpSession::getLocalEnd1() { Lock lock(mutex_); - return local_port2_; + return local_end1_; } -RtpSession& RtpSession::setLocalPort2(u_int16_t p) +RtpSession& RtpSession::setLocalEnd1(RtpSession::proto::endpoint e) { Lock lock(mutex_); in_sync_ = false; - local_port2_ = p; + local_end1_ = e; return *this; } -u_int16_t RtpSession::getRemotePort1() +RtpSession::proto::endpoint RtpSession::getLocalEnd2() { Lock lock(mutex_); - return remote_port1_; + return local_end2_; } -RtpSession& RtpSession::setRemotePort1(u_int16_t p) +RtpSession& RtpSession::setLocalEnd2(RtpSession::proto::endpoint e) { Lock lock(mutex_); in_sync_ = false; - remote_port1_ = p; + local_end2_ = e; return *this; } -std::string RtpSession::getRemoteAddr1() +RtpSession::proto::endpoint RtpSession::getRemoteEnd1() { Lock lock(mutex_); - return remote_addr1_; + return remote_end1_; } -RtpSession& RtpSession::setRemoteAddr1(std::string a) +RtpSession& RtpSession::setRemoteEnd1(RtpSession::proto::endpoint e) { Lock lock(mutex_); in_sync_ = false; - remote_addr1_ = a; + remote_end1_ = e; return *this; } -u_int16_t RtpSession::getRemotePort2() +RtpSession::proto::endpoint RtpSession::getRemoteEnd2() { Lock lock(mutex_); - return remote_port2_; + return remote_end2_; } -RtpSession& RtpSession::setRemotePort2(u_int16_t p) +RtpSession& RtpSession::setRemoteEnd2(RtpSession::proto::endpoint e) { Lock lock(mutex_); in_sync_ = false; - remote_port2_ = p; + remote_end2_ = e; return *this; } -std::string RtpSession::getRemoteAddr2() -{ - Lock lock(mutex_); - return remote_addr2_; -} -RtpSession& RtpSession::setRemoteAddr2(std::string a) -{ - Lock lock(mutex_); - in_sync_ = false; - remote_addr2_ = a; - return *this; -} diff --git a/src/rtpSession.h b/src/rtpSession.h index 5749ecd..1aec46f 100644 --- a/src/rtpSession.h +++ b/src/rtpSession.h @@ -32,6 +32,8 @@ #ifndef _RTPSESSION_H_ #define _RTPSESSION_H_ +#include + #include "threadUtils.hpp" #include @@ -40,6 +42,8 @@ class RtpSession { public: + typedef boost::asio::ip::udp proto; + RtpSession(const std::string& call_id); bool isDead(); @@ -48,23 +52,15 @@ public: bool isComplete(); bool isComplete(bool c); - std::string getLocalAddr(); - RtpSession& setLocalAddr(std::string a); - u_int16_t getLocalPort1(); - RtpSession& setLocalPort1(u_int16_t p); - u_int16_t getLocalPort2(); - RtpSession& setLocalPort2(u_int16_t p); - - - u_int16_t getRemotePort1(); - RtpSession& setRemotePort1(u_int16_t p); - std::string getRemoteAddr1(); - RtpSession& setRemoteAddr1(std::string a); + proto::endpoint getLocalEnd1(); + RtpSession& setLocalEnd1(proto::endpoint e); + proto::endpoint getLocalEnd2(); + RtpSession& setLocalEnd2(proto::endpoint e); - u_int16_t getRemotePort2(); - RtpSession& setRemotePort2(u_int16_t p); - std::string getRemoteAddr2(); - RtpSession& setRemoteAddr2(std::string a); + proto::endpoint getRemoteEnd1(); + RtpSession& setRemoteEnd1(proto::endpoint e); + proto::endpoint getRemoteEnd2(); + RtpSession& setRemoteEnd2(proto::endpoint e); RtpSession& setSeen1(); bool getSeen1(); @@ -84,18 +80,38 @@ private: { Lock lock(mutex_); + // address of local_end1 and local_end2 are always equal + std::string local_addr(local_end1_.address().to_string()); + u_int16_t local_port1 = local_end1_.port(); + u_int16_t local_port2 = local_end2_.port(); + + std::string remote_addr1(remote_end1_.address().to_string()); + u_int16_t remote_port1 = remote_end1_.port(); + std::string remote_addr2(remote_end2_.address().to_string()); + u_int16_t remote_port2 = remote_end2_.port(); + ar & dead_; ar & complete_; - ar & local_addr_; - ar & local_port1_; - ar & local_port2_; - ar & remote_addr1_; - ar & remote_port1_; - ar & remote_addr2_; - ar & remote_port2_; + ar & local_addr; + ar & local_port1; + ar & local_port2; + ar & remote_addr1; + ar & remote_port1; + ar & remote_addr2; + ar & remote_port2; ar & seen1_; ar & seen2_; + proto::endpoint local_end1(boost::asio::ip::address::from_string(local_addr), local_port1); + local_end1_ = local_end1; + proto::endpoint local_end2(boost::asio::ip::address::from_string(local_addr), local_port2); + local_end2_ = local_end2; + + proto::endpoint remote_end1(boost::asio::ip::address::from_string(remote_addr1), remote_port1); + remote_end1_ = remote_end1; + proto::endpoint remote_end2(boost::asio::ip::address::from_string(remote_addr2), remote_port2); + remote_end2_ = remote_end2; + if(complete_ && !dead_) reinit(); @@ -108,10 +124,8 @@ private: const std::string& call_id_; bool dead_; bool complete_; - std::string local_addr_; - u_int16_t local_port1_, local_port2_; - std::string remote_addr1_, remote_addr2_; - u_int16_t remote_port1_, remote_port2_; + proto::endpoint local_end1_, local_end2_; + proto::endpoint remote_end1_, remote_end2_; bool seen1_,seen2_; //has at least 1 packet been recieved? }; -- cgit v1.2.3