summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-03-13 17:32:00 +0000
committerChristian Pointner <equinox@anytun.org>2008-03-13 17:32:00 +0000
commitce8ae5556b90de67f23ddb974da76bee3e43b6fd (patch)
tree8d0c3582abadc4fe181336edf40a03b251dd424f
parentrenamed cpp files (diff)
anyrtpproxy can handled most commands
TODO: start threads for relaying
-rw-r--r--Makefile1
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp6
-rw-r--r--anyrtpproxy/commandHandler.cpp64
-rw-r--r--anyrtpproxy/commandHandler.h8
-rw-r--r--anyrtpproxy/options.cpp1
-rw-r--r--rtpSession.cpp128
-rw-r--r--rtpSession.h63
-rw-r--r--rtpSessionTable.cpp62
-rw-r--r--rtpSessionTable.h13
-rw-r--r--syncSocketHandler.cpp2
10 files changed, 278 insertions, 70 deletions
diff --git a/Makefile b/Makefile
index 4eabe84..28216cd 100644
--- a/Makefile
+++ b/Makefile
@@ -100,6 +100,7 @@ ANYCTROBJS = log.o \
rtpSessionTable.o \
rtpSession.o \
syncRtpCommand.o \
+ PracticalSocket.o \
anyctrOptions.o \
router.o \
routingTable.o \
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index df79563..280f99b 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -248,10 +248,6 @@ int main(int argc, char* argv[])
SyncQueue queue;
ConnectToList connect_to = gOpt.getConnectTo();
-// Example
-// gRtpSessionTable.addSession(std::string("callid"),RtpSession());
-// SyncCommand sc (std::string("callid"));
-// queue.push(sc);
ThreadParam p( queue,*(new OptionConnectTo()));
if ( gOpt.getLocalSyncPort())
pthread_create(&syncListenerThread, NULL, syncListener, &p);
@@ -264,7 +260,7 @@ int main(int argc, char* argv[])
pthread_create(& connectThreads.back(), NULL, syncConnector, point);
}
- CommandHandler cmd(gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_);
+ CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_);
int ret = sig.run();
return ret;
diff --git a/anyrtpproxy/commandHandler.cpp b/anyrtpproxy/commandHandler.cpp
index cac5c4b..efb3c82 100644
--- a/anyrtpproxy/commandHandler.cpp
+++ b/anyrtpproxy/commandHandler.cpp
@@ -37,15 +37,20 @@
#include "commandHandler.h"
#include "../buffer.h"
#include "../log.h"
+#include "../syncQueue.h"
+#include "../syncCommand.h"
+#include "../rtpSessionTable.h"
#define MAX_COMMAND_LENGTH 1000
-CommandHandler::CommandHandler(u_int16_t lp) : running_(true), control_sock_(lp), local_address_("0.0.0.0"), local_port_(lp)
+CommandHandler::CommandHandler(SyncQueue& q, u_int16_t lp) : queue_(q), running_(true), control_sock_(lp),
+ local_address_("0.0.0.0"), local_port_(lp)
{
pthread_create(&thread_, NULL, run, this);
}
-CommandHandler::CommandHandler(string la, u_int16_t lp) : running_(true), control_sock_(la, lp), local_address_(la), local_port_(lp)
+CommandHandler::CommandHandler(SyncQueue& q, string la, u_int16_t lp) : queue_(q), running_(true), control_sock_(la, lp),
+ local_address_(la), local_port_(lp)
{
pthread_create(&thread_, NULL, run, this);
}
@@ -159,7 +164,26 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad
std::cout << "received request[" << modifiers << "] command ('" << call_id << "','" << addr << "','" << port
<< "','" << from_tag << "','" << to_tag << "')" << std::endl;
- return RET_OK;
+ try
+ {
+ gRtpSessionTable.addSession(call_id, new RtpSession());
+ u_int16_t port = 35000; // TODO: get next available port
+ RtpSession& session = gRtpSessionTable.getSession(call_id);
+ session.setLocalPort(port);
+ session.setLocalAddr("0.0.0.0"); // TODO: read this from config;
+ session.setRemotePort1(port);
+ session.setRemoteAddr1(addr);
+ SyncCommand sc(call_id);
+ queue_.push(sc);
+
+ ostringstream oss;
+ oss << port;
+ return oss.str();
+ }
+ catch(std::exception& e)
+ {
+ return RET_ERR_UNKNOWN; // TODO: change to corret error value
+ }
}
string CommandHandler::handleResponse(string modifiers, string call_id, string addr, string port, string from_tag, string to_tag)
@@ -167,14 +191,42 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a
std::cout << "received response[" << modifiers << "] command ('" << call_id << "','" << addr << "','" << port
<< "','" << from_tag << "','" << to_tag << "')" << std::endl;
- return RET_OK;
+ try
+ {
+ RtpSession& session = gRtpSessionTable.getSession(call_id);
+ u_int16_t port = session.getLocalPort();
+ session.setRemotePort2(port);
+ session.setRemoteAddr2(addr);
+ SyncCommand sc(call_id);
+ queue_.push(sc);
+
+ ostringstream oss;
+ oss << port;
+ return oss.str();
+ }
+ catch(std::exception& e)
+ {
+ return RET_ERR_UNKNOWN; // TODO: change to corret error value
+ }
}
string CommandHandler::handleDelete(string call_id, string from_tag, string to_tag)
{
std::cout << "received delete command ('" << call_id << "','" << from_tag << "','" << to_tag << "')" << std::endl;
- return RET_OK;
+ try
+ {
+ RtpSession& session = gRtpSessionTable.getSession(call_id);
+ session.isDead(true);
+ SyncCommand sc(call_id);
+ queue_.push(sc);
+
+ return RET_OK;
+ }
+ catch(std::exception& e)
+ {
+ return RET_ERR_UNKNOWN; // TODO: change to corret error value
+ }
}
string CommandHandler::handleVersion()
@@ -194,7 +246,7 @@ string CommandHandler::handleVersionF(string date_code)
string CommandHandler::handleInfo()
{
- std::cout << "received info command" << std::endl;
+ std::cout << "received info command, ignoring" << std::endl;
return RET_OK;
}
diff --git a/anyrtpproxy/commandHandler.h b/anyrtpproxy/commandHandler.h
index 41bff2d..6e75bc4 100644
--- a/anyrtpproxy/commandHandler.h
+++ b/anyrtpproxy/commandHandler.h
@@ -34,14 +34,15 @@
#include <string>
#include "../datatypes.h"
#include "../PracticalSocket.h"
+#include "../syncQueue.h"
using std::string;
class CommandHandler
{
public:
- CommandHandler(u_int16_t lp);
- CommandHandler(string la, u_int16_t lp);
+ CommandHandler(SyncQueue& q, u_int16_t lp);
+ CommandHandler(SyncQueue& q, string la, u_int16_t lp);
~CommandHandler();
bool isRunning();
@@ -54,6 +55,7 @@ public:
#define RET_OK "0"
#define RET_ERR_SYNTAX "E1"
+ #define RET_ERR_UNKNOWN "E2"
#define BASE_VERSION "20040107"
#define SUP_VERSION "20050322"
@@ -73,6 +75,8 @@ private:
string handleInfo();
pthread_t thread_;
+ SyncQueue& queue_;
+
bool running_;
UDPSocket control_sock_;
string local_address_;
diff --git a/anyrtpproxy/options.cpp b/anyrtpproxy/options.cpp
index 0c55c90..b81f348 100644
--- a/anyrtpproxy/options.cpp
+++ b/anyrtpproxy/options.cpp
@@ -57,6 +57,7 @@ Options::Options() : control_interface_("0.0.0.0", 22222)
username_ = "nobody";
chroot_dir_ = "/var/run";
daemonize_ = true;
+ local_sync_port_ = 2023;
}
Options::~Options()
diff --git a/rtpSession.cpp b/rtpSession.cpp
index 6933917..ef48538 100644
--- a/rtpSession.cpp
+++ b/rtpSession.cpp
@@ -28,12 +28,136 @@
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
+#include "PracticalSocket.h"
+
#include "rtpSession.h"
-RtpSession::RtpSession()
+RtpSession::RtpSession() : in_sync_(false), sock_(NULL), dead_(false), local_port_(0), local_addr_(""),
+ remote_addr1_(""), remote_addr2_(""), remote_port1_(0), remote_port2_(0)
+{
+}
+
+RtpSession::~RtpSession()
+{
+ if(sock_)
+ delete reinterpret_cast<UDPSocket*>(sock_);
+}
+
+void RtpSession::init()
+{
+ Lock lock(mutex_);
+
+ if(sock_ || !local_port_)
+ return;
+
+ sock_ = new UDPSocket(local_addr_, local_port_);
+// TODO: start threads
+}
+
+void RtpSession::reinitSock()
+{
+ if(sock_)
+ delete reinterpret_cast<UDPSocket*>(sock_);
+ sock_ = NULL;
+
+ if(!local_port_)
+ return;
+
+ sock_ = new UDPSocket(local_addr_, local_port_);
+}
+
+bool RtpSession::isDead()
+{
+ Lock lock(mutex_);
+ return dead_;
+}
+
+bool RtpSession::isDead(bool d)
+{
+ Lock Lock(mutex_);
+ return dead_ = d;
+}
+
+u_int16_t RtpSession::getLocalPort()
+{
+ Lock lock(mutex_);
+ return local_port_;
+}
+
+RtpSession& RtpSession::setLocalPort(u_int16_t p)
+{
+ Lock lock(mutex_);
+ in_sync_ = false;
+ local_port_ = p;
+ return *this;
+}
+
+std::string RtpSession::getLocalAddr()
+{
+ Lock lock(mutex_);
+ return local_addr_;
+}
+
+RtpSession& RtpSession::setLocalAddr(std::string a)
+{
+ Lock lock(mutex_);
+ in_sync_ = false;
+ local_addr_ = a;
+ return *this;
+}
+
+u_int16_t RtpSession::getRemotePort1()
+{
+ Lock lock(mutex_);
+ return remote_port1_;
+}
+
+RtpSession& RtpSession::setRemotePort1(u_int16_t p)
+{
+ Lock lock(mutex_);
+ in_sync_ = false;
+ remote_port1_ = p;
+ return *this;
+}
+
+std::string RtpSession::getRemoteAddr1()
+{
+ Lock lock(mutex_);
+ return remote_addr1_;
+}
+
+RtpSession& RtpSession::setRemoteAddr1(std::string a)
+{
+ Lock lock(mutex_);
+ in_sync_ = false;
+ remote_addr1_ = a;
+ return *this;
+}
+
+u_int16_t RtpSession::getRemotePort2()
+{
+ Lock lock(mutex_);
+ return remote_port2_;
+}
+
+RtpSession& RtpSession::setRemotePort2(u_int16_t p)
+{
+ Lock lock(mutex_);
+ in_sync_ = false;
+ remote_port2_ = p;
+ return *this;
+}
+
+std::string RtpSession::getRemoteAddr2()
{
+ Lock lock(mutex_);
+ return remote_addr2_;
}
-RtpSession::RtpSession(const RtpSession & src)
+RtpSession& RtpSession::setRemoteAddr2(std::string a)
{
+ Lock lock(mutex_);
+ in_sync_ = false;
+ remote_addr2_ = a;
+ return *this;
}
diff --git a/rtpSession.h b/rtpSession.h
index 218843c..4a43d3f 100644
--- a/rtpSession.h
+++ b/rtpSession.h
@@ -33,24 +33,79 @@
#include "threadUtils.hpp"
+#include <iostream>
+
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
class RtpSession
{
public:
- RtpSession(const RtpSession & src);
RtpSession();
+ ~RtpSession();
+
+ void init();
+
+ bool isDead();
+ bool isDead(bool d);
+
+ u_int16_t getLocalPort();
+ RtpSession& setLocalPort(u_int16_t p);
+ std::string getLocalAddr();
+ RtpSession& setLocalAddr(std::string a);
+
+ u_int16_t getRemotePort1();
+ RtpSession& setRemotePort1(u_int16_t p);
+ std::string getRemoteAddr1();
+ RtpSession& setRemoteAddr1(std::string a);
+
+ u_int16_t getRemotePort2();
+ RtpSession& setRemotePort2(u_int16_t p);
+ std::string getRemoteAddr2();
+ RtpSession& setRemoteAddr2(std::string a);
private:
+ RtpSession(const RtpSession & src);
+
+ void reinitSock();
+
//TODO: check if this is ok
- //Mutex mutex_;
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
- {
- //Lock lock(mutex_);
+ {
+ Lock lock(mutex_);
+
+ std::cout << "seralize called: " << local_port_ << "," << local_addr_ << std::endl;
+
+
+ u_int16_t old_local_port = local_port_;
+ std::string old_local_addr = local_addr_;
+
+ ar & dead_;
+ ar & local_addr_;
+ ar & local_port_;
+ ar & remote_addr1_;
+ ar & remote_port1_;
+ ar & remote_addr2_;
+ ar & remote_port2_;
+
+ if(old_local_port != local_port_ || old_local_addr != local_addr_)
+ reinitSock();
+
+ in_sync_ = true;
}
+
+ bool in_sync_;
+ ::Mutex mutex_;
+ void* sock_;
+
+ bool dead_;
+ u_int16_t local_port_;
+ std::string local_addr_;
+ std::string remote_addr1_, remote_addr2_;
+ u_int16_t remote_port1_, remote_port2_;
};
+
#endif
diff --git a/rtpSessionTable.cpp b/rtpSessionTable.cpp
index 45f99c5..a577613 100644
--- a/rtpSessionTable.cpp
+++ b/rtpSessionTable.cpp
@@ -55,71 +55,49 @@ RtpSessionTable::~RtpSessionTable()
{
}
-void RtpSessionTable::addSession(const std::string & pref, const RtpSession & ses )
+void RtpSessionTable::addSession(const std::string & call_id, RtpSession* ses )
{
Lock lock(mutex_);
- std::pair<RtpSessionMap::iterator, bool> ret = map_.insert(RtpSessionMap::value_type(pref,ses));
+ std::pair<RtpSessionMap::iterator, bool> ret = map_.insert(RtpSessionMap::value_type(call_id,ses));
if(!ret.second)
{
map_.erase(ret.first);
- map_.insert(RtpSessionMap::value_type(pref,ses));
+ map_.insert(RtpSessionMap::value_type(call_id,ses));
}
}
-void RtpSessionTable::delSession(const std::string & pref )
+void RtpSessionTable::delSession(const std::string & call_id )
{
Lock lock(mutex_);
-
- map_.erase(map_.find(pref));
-}
-
-/*u_int16_t RtpSessionTable::getRtpSession(const std::string & addr)
-{
- Lock lock(mutex_);
- if (map_.empty())
- return 0;
- NetworkPrefix prefix(addr,32);
- //TODO Routing algorithem isnt working!!!
- RoutingMap::iterator it = map_.lower_bound(prefix);
-// it--;
- if (it!=map_.end())
- return it->second;
- it=map_.begin();
- return it->second;
-}
-*/
-RtpSession& RtpSessionTable::getOrNewSessionUnlocked(const std::string & addr)
-{
- RtpSessionMap::iterator it = map_.find(addr);
+ RtpSessionMap::iterator it = map_.find(call_id);
if(it!=map_.end())
- return it->second;
+ delete it->second;
- map_.insert(RtpSessionMap::value_type(addr, RtpSession()));
- it = map_.find(addr);
- return it->second;
+ map_.erase(it);
}
-uint16_t RtpSessionTable::getCountUnlocked()
+RtpSession& RtpSessionTable::getOrNewSessionUnlocked(const std::string & call_id)
{
- RtpSessionMap::iterator it = map_.begin();
- uint16_t routes=0;
- for (;it!=map_.end();++it)
- routes++;
- return routes;
-}
+ RtpSessionMap::iterator it = map_.find(call_id);
+ if(it!=map_.end())
+ return *(it->second);
-RtpSessionMap::iterator RtpSessionTable::getBeginUnlocked()
-{
- return map_.begin();
+ map_.insert(RtpSessionMap::value_type(call_id, new RtpSession()));
+ it = map_.find(call_id);
+ return *(it->second);
}
-RtpSessionMap::iterator RtpSessionTable::getEndUnlocked()
+RtpSession& RtpSessionTable::getSession(const std::string & call_id)
{
- return map_.end();
+ RtpSessionMap::iterator it = map_.find(call_id);
+ if(it!=map_.end())
+ return *(it->second);
+
+ throw std::runtime_error("session not found");
}
void RtpSessionTable::clear()
diff --git a/rtpSessionTable.h b/rtpSessionTable.h
index 5b14082..d4140ac 100644
--- a/rtpSessionTable.h
+++ b/rtpSessionTable.h
@@ -36,7 +36,7 @@
#include "threadUtils.hpp"
#include "datatypes.h"
#include "rtpSession.h"
-typedef std::map<std::string,RtpSession> RtpSessionMap;
+typedef std::map<std::string,RtpSession*> RtpSessionMap;
class RtpSessionTable
{
@@ -44,16 +44,13 @@ public:
static RtpSessionTable& instance();
RtpSessionTable();
~RtpSessionTable();
- void addSession(const std::string & , const RtpSession &);
- void delSession(const std::string & );
- //u_int16_t getRoute(const NetworkAddress &);
+ void addSession(const std::string & call_id, RtpSession* ses);
+ void delSession(const std::string & call_id);
bool empty();
void clear();
::Mutex& getMutex();
- RtpSession& getOrNewSessionUnlocked(const std::string & addr);
- uint16_t getCountUnlocked();
- RtpSessionMap::iterator getBeginUnlocked();
- RtpSessionMap::iterator getEndUnlocked();
+ RtpSession& getOrNewSessionUnlocked(const std::string & call_id);
+ RtpSession& getSession(const std::string & call_id);
private:
static ::Mutex instMutex;
diff --git a/syncSocketHandler.cpp b/syncSocketHandler.cpp
index 06906b8..25f4c43 100644
--- a/syncSocketHandler.cpp
+++ b/syncSocketHandler.cpp
@@ -27,7 +27,7 @@ int SyncSocketHandler::Select(long sec,long usec)
std::string sendstr = queue_.pop();
for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); it++)
{
- Socket *p = (*it).second;
+ ::SOCKETS_NAMESPACE::Socket *p = (*it).second;
TcpSocket *p3 = dynamic_cast<TcpSocket *>(p);
//SyncListenSocket<SyncSocket,ConnectionList> *p4 = dynamic_cast<SyncListenSocket<SyncSocket,ConnectionList> *>(p);
if (p3)