From 08958e48551df6ef742afaf1c868a16c00e950d9 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 25 Nov 2008 17:49:34 +0000 Subject: anyrtpproxy allmost finished ToDo: - syncrhonisation does not work by now - fix recv non blocking issue --- src/anyrtpproxy/anyrtpproxy.cpp | 54 ++++++++++++++++---------------------- src/anyrtpproxy/commandHandler.cpp | 48 +++++++++++++++++++-------------- 2 files changed, 52 insertions(+), 50 deletions(-) (limited to 'src/anyrtpproxy') diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp index 91cb0e9..c96b086 100644 --- a/src/anyrtpproxy/anyrtpproxy.cpp +++ b/src/anyrtpproxy/anyrtpproxy.cpp @@ -61,16 +61,14 @@ #define MAX_PACKET_SIZE 1500 -typedef boost::asio::ip::udp rtp_proto; - -void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running) +void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running) { cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started"; try { Buffer buf(u_int32_t(MAX_PACKET_SIZE)); - rtp_proto::endpoint remote_end; + RtpSession::proto::endpoint remote_end; while(1) { buf.setLength(MAX_PACKET_SIZE); @@ -91,8 +89,8 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca continue; buf.setLength(len); -// if((dir == 1 && remote_end != session.getRemoteEnd1()) || -// (dir == 2 && remote_end != session.getRemoteEnd2())) + if((dir == 1 && remote_end != session.getRemoteEnd1()) || + (dir == 2 && remote_end != session.getRemoteEnd2())) { if(gOpt.getNat() || (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || @@ -100,11 +98,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca { cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to " << remote_end; -// if(dir == 1) -// session.setRemoteEnd1(remote_end); -// if(dir == 2) -// session.setRemoteEnd2(remote_end); - + if(dir == 1) + session.setRemoteEnd1(remote_end); + if(dir == 2) + session.setRemoteEnd2(remote_end); + if(!gOpt.getNat()) { // with nat enabled sync is not needed SyncCommand sc(call_id); queue->push(sc); @@ -116,11 +114,11 @@ void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string ca session.setSeen1(); session.setSeen2(); -// if(dir == 1) -// sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2()); -// else if(dir == 2) -// sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1()); -// else break; + if(dir == 1) + sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2()); + else if(dir == 2) + sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1()); + else break; } } catch(std::exception &e) @@ -137,9 +135,9 @@ public: ListenerData() { ios1_ = new boost::asio::io_service(); - sock1_ = new rtp_proto::socket(*ios1_); + sock1_ = new RtpSession::proto::socket(*ios1_); ios2_ = new boost::asio::io_service(); - sock2_ = new rtp_proto::socket(*ios2_); + sock2_ = new RtpSession::proto::socket(*ios2_); } ~ListenerData() { @@ -151,8 +149,8 @@ public: boost::asio::io_service* ios1_; boost::asio::io_service* ios2_; - rtp_proto::socket* sock1_; - rtp_proto::socket* sock2_; + RtpSession::proto::socket* sock1_; + RtpSession::proto::socket* sock2_; boost::thread* thread1_; boost::thread* thread2_; bool running1_; @@ -181,17 +179,11 @@ void listenerManager(void* p) { ListenerData* ld = new ListenerData(); - rtp_proto::resolver resolver1(*(ld->ios1_)); - rtp_proto::resolver::query query1(session.getLocalAddr(), session.getLocalPort1()); - rtp_proto::endpoint e1 = *resolver1.resolve(query1); - ld->sock1_->open(e1.protocol()); - ld->sock1_->bind(e1); - - rtp_proto::resolver resolver2(*(ld->ios2_)); - rtp_proto::resolver::query query2(session.getLocalAddr(), session.getLocalPort2()); - rtp_proto::endpoint e2 = *resolver2.resolve(query2); - ld->sock2_->open(e2.protocol()); - ld->sock2_->bind(e2); + ld->sock1_->open(session.getLocalEnd1().protocol()); + ld->sock1_->bind(session.getLocalEnd1()); + + ld->sock2_->open(session.getLocalEnd2().protocol()); + ld->sock2_->bind(session.getLocalEnd2()); ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_))); ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_))); diff --git a/src/anyrtpproxy/commandHandler.cpp b/src/anyrtpproxy/commandHandler.cpp index c95d338..c8604d6 100644 --- a/src/anyrtpproxy/commandHandler.cpp +++ b/src/anyrtpproxy/commandHandler.cpp @@ -34,6 +34,7 @@ #include #include +#include #include @@ -171,6 +172,7 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad try { + RtpSession::proto::resolver resolver(io_service_); bool is_new; RtpSession& session = gRtpSessionTable.getOrNewSession(call_id, is_new); if(is_new) @@ -183,22 +185,32 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad if( port2) port_window_.freePort(port2); throw std::runtime_error("no free port found"); } - - if(gOpt.getLocalAddr() == "") - session.setLocalAddr("0.0.0.0"); - else - session.setLocalAddr(gOpt.getLocalAddr()); - session.setLocalPort1(port1); - session.setLocalPort2(port2); + std::stringstream ps1, ps2; + ps1 << port1; + ps2 << port2; + + RtpSession::proto::endpoint e1, e2; + if(gOpt.getLocalAddr() == "") { + RtpSession::proto::resolver::query query1(ps1.str()); + e1 = *resolver.resolve(query1); + RtpSession::proto::resolver::query query2(ps2.str()); + e2 = *resolver.resolve(query2); + } + else { + RtpSession::proto::resolver::query query1(gOpt.getLocalAddr(),ps1.str()); + e1 = *resolver.resolve(query1); + RtpSession::proto::resolver::query query2(gOpt.getLocalAddr(),ps2.str()); + e2 = *resolver.resolve(query2); + } + + session.setLocalEnd1(e1); + session.setLocalEnd2(e2); } - istringstream iss(port); - u_int16_t rport; - iss >> rport; - session.setRemotePort1(rport); - session.setRemoteAddr1(addr); + RtpSession::proto::resolver::query query(addr,port); + session.setRemoteEnd1(*resolver.resolve(query)); ostringstream oss; - oss << session.getLocalPort2(); + oss << session.getLocalEnd2().port(); return oss.str(); } catch(std::exception& e) @@ -215,17 +227,15 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a try { RtpSession& session = gRtpSessionTable.getSession(call_id); - istringstream iss(port); - u_int16_t rport; - iss >> rport; - session.setRemotePort2(rport); - session.setRemoteAddr2(addr); + RtpSession::proto::resolver resolver(io_service_); + RtpSession::proto::resolver::query query(addr,port); + session.setRemoteEnd2(*resolver.resolve(query)); session.isComplete(true); SyncCommand sc(call_id); queue_.push(sc); ostringstream oss; - oss << session.getLocalPort1(); + oss << session.getLocalEnd1().port(); return oss.str(); } catch(std::exception& e) -- cgit v1.2.3