From ce8ae5556b90de67f23ddb974da76bee3e43b6fd Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 13 Mar 2008 17:32:00 +0000 Subject: anyrtpproxy can handled most commands TODO: start threads for relaying --- Makefile | 1 + anyrtpproxy/anyrtpproxy.cpp | 6 +- anyrtpproxy/commandHandler.cpp | 64 +++++++++++++++++++-- anyrtpproxy/commandHandler.h | 8 ++- anyrtpproxy/options.cpp | 1 + rtpSession.cpp | 128 ++++++++++++++++++++++++++++++++++++++++- rtpSession.h | 63 ++++++++++++++++++-- rtpSessionTable.cpp | 62 +++++++------------- rtpSessionTable.h | 13 ++--- syncSocketHandler.cpp | 2 +- 10 files changed, 278 insertions(+), 70 deletions(-) diff --git a/Makefile b/Makefile index 4eabe84..28216cd 100644 --- a/Makefile +++ b/Makefile @@ -100,6 +100,7 @@ ANYCTROBJS = log.o \ rtpSessionTable.o \ rtpSession.o \ syncRtpCommand.o \ + PracticalSocket.o \ anyctrOptions.o \ router.o \ routingTable.o \ diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index df79563..280f99b 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -248,10 +248,6 @@ int main(int argc, char* argv[]) SyncQueue queue; ConnectToList connect_to = gOpt.getConnectTo(); -// Example -// gRtpSessionTable.addSession(std::string("callid"),RtpSession()); -// SyncCommand sc (std::string("callid")); -// queue.push(sc); ThreadParam p( queue,*(new OptionConnectTo())); if ( gOpt.getLocalSyncPort()) pthread_create(&syncListenerThread, NULL, syncListener, &p); @@ -264,7 +260,7 @@ int main(int argc, char* argv[]) pthread_create(& connectThreads.back(), NULL, syncConnector, point); } - CommandHandler cmd(gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_); + CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_); int ret = sig.run(); return ret; diff --git a/anyrtpproxy/commandHandler.cpp b/anyrtpproxy/commandHandler.cpp index cac5c4b..efb3c82 100644 --- a/anyrtpproxy/commandHandler.cpp +++ b/anyrtpproxy/commandHandler.cpp @@ -37,15 +37,20 @@ #include "commandHandler.h" #include "../buffer.h" #include "../log.h" +#include "../syncQueue.h" +#include "../syncCommand.h" +#include "../rtpSessionTable.h" #define MAX_COMMAND_LENGTH 1000 -CommandHandler::CommandHandler(u_int16_t lp) : running_(true), control_sock_(lp), local_address_("0.0.0.0"), local_port_(lp) +CommandHandler::CommandHandler(SyncQueue& q, u_int16_t lp) : queue_(q), running_(true), control_sock_(lp), + local_address_("0.0.0.0"), local_port_(lp) { pthread_create(&thread_, NULL, run, this); } -CommandHandler::CommandHandler(string la, u_int16_t lp) : running_(true), control_sock_(la, lp), local_address_(la), local_port_(lp) +CommandHandler::CommandHandler(SyncQueue& q, string la, u_int16_t lp) : queue_(q), running_(true), control_sock_(la, lp), + local_address_(la), local_port_(lp) { pthread_create(&thread_, NULL, run, this); } @@ -159,7 +164,26 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad std::cout << "received request[" << modifiers << "] command ('" << call_id << "','" << addr << "','" << port << "','" << from_tag << "','" << to_tag << "')" << std::endl; - return RET_OK; + try + { + gRtpSessionTable.addSession(call_id, new RtpSession()); + u_int16_t port = 35000; // TODO: get next available port + RtpSession& session = gRtpSessionTable.getSession(call_id); + session.setLocalPort(port); + session.setLocalAddr("0.0.0.0"); // TODO: read this from config; + session.setRemotePort1(port); + session.setRemoteAddr1(addr); + SyncCommand sc(call_id); + queue_.push(sc); + + ostringstream oss; + oss << port; + return oss.str(); + } + catch(std::exception& e) + { + return RET_ERR_UNKNOWN; // TODO: change to corret error value + } } string CommandHandler::handleResponse(string modifiers, string call_id, string addr, string port, string from_tag, string to_tag) @@ -167,14 +191,42 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a std::cout << "received response[" << modifiers << "] command ('" << call_id << "','" << addr << "','" << port << "','" << from_tag << "','" << to_tag << "')" << std::endl; - return RET_OK; + try + { + RtpSession& session = gRtpSessionTable.getSession(call_id); + u_int16_t port = session.getLocalPort(); + session.setRemotePort2(port); + session.setRemoteAddr2(addr); + SyncCommand sc(call_id); + queue_.push(sc); + + ostringstream oss; + oss << port; + return oss.str(); + } + catch(std::exception& e) + { + return RET_ERR_UNKNOWN; // TODO: change to corret error value + } } string CommandHandler::handleDelete(string call_id, string from_tag, string to_tag) { std::cout << "received delete command ('" << call_id << "','" << from_tag << "','" << to_tag << "')" << std::endl; - return RET_OK; + try + { + RtpSession& session = gRtpSessionTable.getSession(call_id); + session.isDead(true); + SyncCommand sc(call_id); + queue_.push(sc); + + return RET_OK; + } + catch(std::exception& e) + { + return RET_ERR_UNKNOWN; // TODO: change to corret error value + } } string CommandHandler::handleVersion() @@ -194,7 +246,7 @@ string CommandHandler::handleVersionF(string date_code) string CommandHandler::handleInfo() { - std::cout << "received info command" << std::endl; + std::cout << "received info command, ignoring" << std::endl; return RET_OK; } diff --git a/anyrtpproxy/commandHandler.h b/anyrtpproxy/commandHandler.h index 41bff2d..6e75bc4 100644 --- a/anyrtpproxy/commandHandler.h +++ b/anyrtpproxy/commandHandler.h @@ -34,14 +34,15 @@ #include #include "../datatypes.h" #include "../PracticalSocket.h" +#include "../syncQueue.h" using std::string; class CommandHandler { public: - CommandHandler(u_int16_t lp); - CommandHandler(string la, u_int16_t lp); + CommandHandler(SyncQueue& q, u_int16_t lp); + CommandHandler(SyncQueue& q, string la, u_int16_t lp); ~CommandHandler(); bool isRunning(); @@ -54,6 +55,7 @@ public: #define RET_OK "0" #define RET_ERR_SYNTAX "E1" + #define RET_ERR_UNKNOWN "E2" #define BASE_VERSION "20040107" #define SUP_VERSION "20050322" @@ -73,6 +75,8 @@ private: string handleInfo(); pthread_t thread_; + SyncQueue& queue_; + bool running_; UDPSocket control_sock_; string local_address_; diff --git a/anyrtpproxy/options.cpp b/anyrtpproxy/options.cpp index 0c55c90..b81f348 100644 --- a/anyrtpproxy/options.cpp +++ b/anyrtpproxy/options.cpp @@ -57,6 +57,7 @@ Options::Options() : control_interface_("0.0.0.0", 22222) username_ = "nobody"; chroot_dir_ = "/var/run"; daemonize_ = true; + local_sync_port_ = 2023; } Options::~Options() diff --git a/rtpSession.cpp b/rtpSession.cpp index 6933917..ef48538 100644 --- a/rtpSession.cpp +++ b/rtpSession.cpp @@ -28,12 +28,136 @@ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "PracticalSocket.h" + #include "rtpSession.h" -RtpSession::RtpSession() +RtpSession::RtpSession() : in_sync_(false), sock_(NULL), dead_(false), local_port_(0), local_addr_(""), + remote_addr1_(""), remote_addr2_(""), remote_port1_(0), remote_port2_(0) +{ +} + +RtpSession::~RtpSession() +{ + if(sock_) + delete reinterpret_cast(sock_); +} + +void RtpSession::init() +{ + Lock lock(mutex_); + + if(sock_ || !local_port_) + return; + + sock_ = new UDPSocket(local_addr_, local_port_); +// TODO: start threads +} + +void RtpSession::reinitSock() +{ + if(sock_) + delete reinterpret_cast(sock_); + sock_ = NULL; + + if(!local_port_) + return; + + sock_ = new UDPSocket(local_addr_, local_port_); +} + +bool RtpSession::isDead() +{ + Lock lock(mutex_); + return dead_; +} + +bool RtpSession::isDead(bool d) +{ + Lock Lock(mutex_); + return dead_ = d; +} + +u_int16_t RtpSession::getLocalPort() +{ + Lock lock(mutex_); + return local_port_; +} + +RtpSession& RtpSession::setLocalPort(u_int16_t p) +{ + Lock lock(mutex_); + in_sync_ = false; + local_port_ = p; + return *this; +} + +std::string RtpSession::getLocalAddr() +{ + Lock lock(mutex_); + return local_addr_; +} + +RtpSession& RtpSession::setLocalAddr(std::string a) +{ + Lock lock(mutex_); + in_sync_ = false; + local_addr_ = a; + return *this; +} + +u_int16_t RtpSession::getRemotePort1() +{ + Lock lock(mutex_); + return remote_port1_; +} + +RtpSession& RtpSession::setRemotePort1(u_int16_t p) +{ + Lock lock(mutex_); + in_sync_ = false; + remote_port1_ = p; + return *this; +} + +std::string RtpSession::getRemoteAddr1() +{ + Lock lock(mutex_); + return remote_addr1_; +} + +RtpSession& RtpSession::setRemoteAddr1(std::string a) +{ + Lock lock(mutex_); + in_sync_ = false; + remote_addr1_ = a; + return *this; +} + +u_int16_t RtpSession::getRemotePort2() +{ + Lock lock(mutex_); + return remote_port2_; +} + +RtpSession& RtpSession::setRemotePort2(u_int16_t p) +{ + Lock lock(mutex_); + in_sync_ = false; + remote_port2_ = p; + return *this; +} + +std::string RtpSession::getRemoteAddr2() { + Lock lock(mutex_); + return remote_addr2_; } -RtpSession::RtpSession(const RtpSession & src) +RtpSession& RtpSession::setRemoteAddr2(std::string a) { + Lock lock(mutex_); + in_sync_ = false; + remote_addr2_ = a; + return *this; } diff --git a/rtpSession.h b/rtpSession.h index 218843c..4a43d3f 100644 --- a/rtpSession.h +++ b/rtpSession.h @@ -33,24 +33,79 @@ #include "threadUtils.hpp" +#include + #include #include class RtpSession { public: - RtpSession(const RtpSession & src); RtpSession(); + ~RtpSession(); + + void init(); + + bool isDead(); + bool isDead(bool d); + + u_int16_t getLocalPort(); + RtpSession& setLocalPort(u_int16_t p); + std::string getLocalAddr(); + RtpSession& setLocalAddr(std::string a); + + u_int16_t getRemotePort1(); + RtpSession& setRemotePort1(u_int16_t p); + std::string getRemoteAddr1(); + RtpSession& setRemoteAddr1(std::string a); + + u_int16_t getRemotePort2(); + RtpSession& setRemotePort2(u_int16_t p); + std::string getRemoteAddr2(); + RtpSession& setRemoteAddr2(std::string a); private: + RtpSession(const RtpSession & src); + + void reinitSock(); + //TODO: check if this is ok - //Mutex mutex_; friend class boost::serialization::access; template void serialize(Archive & ar, const unsigned int version) - { - //Lock lock(mutex_); + { + Lock lock(mutex_); + + std::cout << "seralize called: " << local_port_ << "," << local_addr_ << std::endl; + + + u_int16_t old_local_port = local_port_; + std::string old_local_addr = local_addr_; + + ar & dead_; + ar & local_addr_; + ar & local_port_; + ar & remote_addr1_; + ar & remote_port1_; + ar & remote_addr2_; + ar & remote_port2_; + + if(old_local_port != local_port_ || old_local_addr != local_addr_) + reinitSock(); + + in_sync_ = true; } + + bool in_sync_; + ::Mutex mutex_; + void* sock_; + + bool dead_; + u_int16_t local_port_; + std::string local_addr_; + std::string remote_addr1_, remote_addr2_; + u_int16_t remote_port1_, remote_port2_; }; + #endif diff --git a/rtpSessionTable.cpp b/rtpSessionTable.cpp index 45f99c5..a577613 100644 --- a/rtpSessionTable.cpp +++ b/rtpSessionTable.cpp @@ -55,71 +55,49 @@ RtpSessionTable::~RtpSessionTable() { } -void RtpSessionTable::addSession(const std::string & pref, const RtpSession & ses ) +void RtpSessionTable::addSession(const std::string & call_id, RtpSession* ses ) { Lock lock(mutex_); - std::pair ret = map_.insert(RtpSessionMap::value_type(pref,ses)); + std::pair ret = map_.insert(RtpSessionMap::value_type(call_id,ses)); if(!ret.second) { map_.erase(ret.first); - map_.insert(RtpSessionMap::value_type(pref,ses)); + map_.insert(RtpSessionMap::value_type(call_id,ses)); } } -void RtpSessionTable::delSession(const std::string & pref ) +void RtpSessionTable::delSession(const std::string & call_id ) { Lock lock(mutex_); - - map_.erase(map_.find(pref)); -} - -/*u_int16_t RtpSessionTable::getRtpSession(const std::string & addr) -{ - Lock lock(mutex_); - if (map_.empty()) - return 0; - NetworkPrefix prefix(addr,32); - //TODO Routing algorithem isnt working!!! - RoutingMap::iterator it = map_.lower_bound(prefix); -// it--; - if (it!=map_.end()) - return it->second; - it=map_.begin(); - return it->second; -} -*/ -RtpSession& RtpSessionTable::getOrNewSessionUnlocked(const std::string & addr) -{ - RtpSessionMap::iterator it = map_.find(addr); + RtpSessionMap::iterator it = map_.find(call_id); if(it!=map_.end()) - return it->second; + delete it->second; - map_.insert(RtpSessionMap::value_type(addr, RtpSession())); - it = map_.find(addr); - return it->second; + map_.erase(it); } -uint16_t RtpSessionTable::getCountUnlocked() +RtpSession& RtpSessionTable::getOrNewSessionUnlocked(const std::string & call_id) { - RtpSessionMap::iterator it = map_.begin(); - uint16_t routes=0; - for (;it!=map_.end();++it) - routes++; - return routes; -} + RtpSessionMap::iterator it = map_.find(call_id); + if(it!=map_.end()) + return *(it->second); -RtpSessionMap::iterator RtpSessionTable::getBeginUnlocked() -{ - return map_.begin(); + map_.insert(RtpSessionMap::value_type(call_id, new RtpSession())); + it = map_.find(call_id); + return *(it->second); } -RtpSessionMap::iterator RtpSessionTable::getEndUnlocked() +RtpSession& RtpSessionTable::getSession(const std::string & call_id) { - return map_.end(); + RtpSessionMap::iterator it = map_.find(call_id); + if(it!=map_.end()) + return *(it->second); + + throw std::runtime_error("session not found"); } void RtpSessionTable::clear() diff --git a/rtpSessionTable.h b/rtpSessionTable.h index 5b14082..d4140ac 100644 --- a/rtpSessionTable.h +++ b/rtpSessionTable.h @@ -36,7 +36,7 @@ #include "threadUtils.hpp" #include "datatypes.h" #include "rtpSession.h" -typedef std::map RtpSessionMap; +typedef std::map RtpSessionMap; class RtpSessionTable { @@ -44,16 +44,13 @@ public: static RtpSessionTable& instance(); RtpSessionTable(); ~RtpSessionTable(); - void addSession(const std::string & , const RtpSession &); - void delSession(const std::string & ); - //u_int16_t getRoute(const NetworkAddress &); + void addSession(const std::string & call_id, RtpSession* ses); + void delSession(const std::string & call_id); bool empty(); void clear(); ::Mutex& getMutex(); - RtpSession& getOrNewSessionUnlocked(const std::string & addr); - uint16_t getCountUnlocked(); - RtpSessionMap::iterator getBeginUnlocked(); - RtpSessionMap::iterator getEndUnlocked(); + RtpSession& getOrNewSessionUnlocked(const std::string & call_id); + RtpSession& getSession(const std::string & call_id); private: static ::Mutex instMutex; diff --git a/syncSocketHandler.cpp b/syncSocketHandler.cpp index 06906b8..25f4c43 100644 --- a/syncSocketHandler.cpp +++ b/syncSocketHandler.cpp @@ -27,7 +27,7 @@ int SyncSocketHandler::Select(long sec,long usec) std::string sendstr = queue_.pop(); for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); it++) { - Socket *p = (*it).second; + ::SOCKETS_NAMESPACE::Socket *p = (*it).second; TcpSocket *p3 = dynamic_cast(p); //SyncListenSocket *p4 = dynamic_cast *>(p); if (p3) -- cgit v1.2.3