summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--anyrtpproxy/Makefile4
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp205
-rw-r--r--anyrtpproxy/commandHandler.cpp7
-rw-r--r--rtpSession.cpp7
-rw-r--r--rtpSession.h2
-rw-r--r--rtpSessionTable.cpp10
-rw-r--r--rtpSessionTable.h2
-rw-r--r--syncSocket.cpp14
8 files changed, 159 insertions, 92 deletions
diff --git a/anyrtpproxy/Makefile b/anyrtpproxy/Makefile
index 6b0e4a1..7e3475f 100644
--- a/anyrtpproxy/Makefile
+++ b/anyrtpproxy/Makefile
@@ -33,6 +33,7 @@ OBJS = anyrtpproxy.o \
../networkPrefix.o \
../Sockets/libSockets.a \
commandHandler.o \
+ callIdQueue.o \
options.o
EXECUTABLE = anyrtpproxy
@@ -51,6 +52,9 @@ connectionList.o: connectionList.cpp connectionList.h
commandHandler.o: commandHandler.cpp commandHandler.h
$(C++) $(CCFLAGS) $< -c
+callIdQueue.o: callIdQueue.cpp callIdQueue.h
+ $(C++) $(CCFLAGS) $< -c
+
anyrtpproxy.o: anyrtpproxy.cpp
$(C++) $(CCFLAGS) $< -c
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index 280f99b..a308308 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -1,3 +1,33 @@
+/*
+ * anytun
+ *
+ * The secure anycast tunneling protocol (satp) defines a protocol used
+ * for communication between any combination of unicast and anycast
+ * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
+ * mode and allows tunneling of every ETHER TYPE protocol (e.g.
+ * ethernet, ip, arp ...). satp directly includes cryptography and
+ * message authentication based on the methodes used by SRTP. It is
+ * intended to deliver a generic, scaleable and secure solution for
+ * tunneling and relaying of packets of any protocol.
+ *
+ *
+ * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program (see the file COPYING included with this
+ * distribution); if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
#include <iostream>
#include <fcntl.h>
@@ -22,6 +52,7 @@
#include "../threadUtils.hpp"
#include "commandHandler.h"
+#include "callIdQueue.h"
#include "options.h"
#include <map>
@@ -40,99 +71,112 @@ public:
OptionConnectTo & connto;
};
-
-class ControlHost : public Host
+class ListenerThreadParam
{
public:
- ControlHost() : Host("",0) {};
- bool operator<(const ControlHost& cmp_to)
+ ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d)
+ {};
+
+ UDPSocket& sock1_;
+ UDPSocket& sock2_;
+ std::string call_id_;
+ int dir_;
+};
+
+void* listener(void* p)
+{
+ ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
+
+ cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
+
+ try
{
- return port_ < cmp_to.port_;
+ Buffer buf(u_int32_t(MAX_PACKET_SIZE));
+ string remote_addr;
+ u_int16_t remote_port;
+ while(1) {
+ buf.setLength(MAX_PACKET_SIZE);
+ u_int32_t len;
+ if(param->dir_ == 1)
+ len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
+ else if(param->dir_ == 2)
+ len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
+ buf.setLength(len);
+
+ RtpSession& session = gRtpSessionTable.getSession(param->call_id_);
+ if(session.isDead())
+ break;
+
+ //TODO: if weak? don't check but save the new(?) remote addr into list
+ if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) ||
+ (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2())))
+ continue;
+
+ if(param->dir_ == 1)
+ param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2());
+ else if(param->dir_ == 2)
+ param->sock1_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr1(), session.getRemotePort1());
+ }
}
-};
+ catch(std::exception &e)
+ {
+ cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what();
+ }
+ cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally";
-class ControlHostMap
+ pthread_exit(NULL);
+}
+
+class ListenerData
{
public:
-
-
-private:
- ::Mutex mutex;
-
- std::map<ControlHost, std::pair<UDPSocket*, pthread_t> > control_hosts_;
+ ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2)
+ {};
+
+ UDPSocket* sock1_;
+ UDPSocket* sock2_;
+ pthread_t threads1_;
+ pthread_t threads2_;
+ ListenerThreadParam params1_;
+ ListenerThreadParam params2_;
};
-void* sender(void* dont_use_me)
+void* listenerManager(void* dont_use_me)
{
-// try
-// {
-// HostList remote_host_list(gOpt.getRemoteHosts());
-// UDPSocket control_sock(gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_);
-
-// Buffer buf(u_int32_t(MAX_PACKET_SIZE));
-// string remote_host;
-// u_int16_t remote_port;
-// while(1) {
-// buf.setLength(MAX_PACKET_SIZE);
-// u_int32_t len = control_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port);
-// buf.setLength(len);
-
-//TODO????//TODO????//TODO????//TODO????//TODO????//TODO????//TODO???? control.setHost(remote_host, remote_port);
-
-// SenderThreadParam receiverParam = {control_host, control_sock, sock, gOpt.getRemoteHosts().front()};
-// pthread_t receiverThread;
-// pthread_create(&receiverThread, NULL, receiver, &receiverParam);
-// pthread_detach(receiverThread);
+ try
+ {
+ std::map<std::string, ListenerData> listenerMap;
+ while(1)
+ {
+ std::string call_id = gCallIdQueue.front(); // waits for semphor and returns next call_id
+ gCallIdQueue.pop();
-
-// HostList::const_iterator it = remote_host_list.begin();
-// for(;it != remote_host_list.end(); it++)
-// param->sock_.sendTo(buf.getBuf(), buf.getLength(), it->addr_, it->port_);
-// }
-// }
-// catch(std::exception &e)
-// {
-// cLog.msg(Log::PRIO_ERR) << "sender exiting because: " << e.what() << std::endl;
-// }
- pthread_exit(NULL);
-}
+ RtpSession& session = gRtpSessionTable.getSession(call_id);
+ cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
+ << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
+ << session.getLocalAddr() << ":" << session.getLocalPort2();
-void* receiver(void* p)
-{
-// SenderThreadParam* param = reinterpret_cast<SenderThreadParam*>(p);
-
-// try
-// {
-// Buffer buf(u_int32_t(MAX_PACKET_SIZE));
-// string remote_host;
-// u_int16_t remote_port;
-
-// while(1) {
-// buf.setLength(MAX_PACKET_SIZE);
-// u_int32_t len = param->sock_.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port);
-// buf.setLength(len);
-
-// if(remote_host != param->first_receiver_.addr_ || remote_port != param->first_receiver_.port_)
-// continue;
+ UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
+ UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
+
+ ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
+ ListenerThreadParam(*sock1, *sock2, call_id, 2));
+ ld.sock1_ = sock1;
+ ld.sock2_ = sock2;
+ pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
+ pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
-// Host control_host = param->control_.getHost();
-// if(control_host.addr_ == "" || !control_host.port_)
-// {
-// cLog.msg(Log::PRIO_NOTICE) << "no control host detected till now, ignoring packet";
-// continue;
-// }
-
-// param->control_sock_.sendTo(buf.getBuf(), buf.getLength(), control_host.addr_, control_host.port_);
-// }
-// }
-// catch(std::exception &e)
-// {
-// cLog.msg(Log::PRIO_ERR) << "receiver exiting because: " << e.what() << std::endl;
-// }
+ listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
+ }
+ }
+ catch(std::exception &e)
+ {
+ cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because: " << e.what();
+ }
pthread_exit(NULL);
-}
+}
void chrootAndDrop(string const& chrootdir, string const& username)
{
@@ -241,9 +285,10 @@ int main(int argc, char* argv[])
SignalController sig;
sig.init();
-// pthread_t senderThread;
-// pthread_create(&senderThread, NULL, sender, NULL);
-// pthread_detach(senderThread);
+ pthread_t listenerManagerThread;
+ pthread_create(&listenerManagerThread, NULL, listenerManager, NULL);
+ pthread_detach(listenerManagerThread);
+
pthread_t syncListenerThread;
SyncQueue queue;
diff --git a/anyrtpproxy/commandHandler.cpp b/anyrtpproxy/commandHandler.cpp
index 58d9974..53e3e5f 100644
--- a/anyrtpproxy/commandHandler.cpp
+++ b/anyrtpproxy/commandHandler.cpp
@@ -40,6 +40,7 @@
#include "../syncQueue.h"
#include "../syncCommand.h"
#include "../rtpSessionTable.h"
+#include "callIdQueue.h"
#define MAX_COMMAND_LENGTH 1000
@@ -187,7 +188,7 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad
queue_.push(sc);
ostringstream oss;
- oss << session.getLocalPort1();
+ oss << session.getLocalPort2();
return oss.str();
}
catch(std::exception& e)
@@ -212,10 +213,10 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a
SyncCommand sc(call_id);
queue_.push(sc);
- session.init();
+ gCallIdQueue.push(call_id);
ostringstream oss;
- oss << session.getLocalPort2();
+ oss << session.getLocalPort1();
return oss.str();
}
catch(std::exception& e)
diff --git a/rtpSession.cpp b/rtpSession.cpp
index 403f9b2..a547519 100644
--- a/rtpSession.cpp
+++ b/rtpSession.cpp
@@ -35,13 +35,6 @@ RtpSession::RtpSession() : in_sync_(false), dead_(false), local_addr_(""), local
{
}
-void RtpSession::init()
-{
- Lock lock(mutex_);
-
-// TODO: start threads
-}
-
void RtpSession::reinit()
{
Lock lock(mutex_);
diff --git a/rtpSession.h b/rtpSession.h
index 0a21e5e..9059c2c 100644
--- a/rtpSession.h
+++ b/rtpSession.h
@@ -41,8 +41,6 @@ class RtpSession
public:
RtpSession();
- void init();
-
bool isDead();
bool isDead(bool d);
diff --git a/rtpSessionTable.cpp b/rtpSessionTable.cpp
index 1556752..8b361a4 100644
--- a/rtpSessionTable.cpp
+++ b/rtpSessionTable.cpp
@@ -107,6 +107,16 @@ RtpSession& RtpSessionTable::getSession(const std::string & call_id)
throw std::runtime_error("session not found");
}
+RtpSessionMap::iterator RtpSessionTable::getBeginUnlocked()
+{
+ return map_.begin();
+}
+
+RtpSessionMap::iterator RtpSessionTable::getEndUnlocked()
+{
+ return map_.end();
+}
+
void RtpSessionTable::clear()
{
Lock lock(mutex_);
diff --git a/rtpSessionTable.h b/rtpSessionTable.h
index 19aec6f..61bfdeb 100644
--- a/rtpSessionTable.h
+++ b/rtpSessionTable.h
@@ -49,6 +49,8 @@ public:
bool empty();
void clear();
::Mutex& getMutex();
+ RtpSessionMap::iterator getBeginUnlocked();
+ RtpSessionMap::iterator getEndUnlocked();
RtpSession& getOrNewSession(const std::string & call_id, bool& isnew);
RtpSession& getOrNewSessionUnlocked(const std::string & call_id, bool& isnew);
RtpSession& getSession(const std::string & call_id);
diff --git a/syncSocket.cpp b/syncSocket.cpp
index 6681ba3..96da70c 100644
--- a/syncSocket.cpp
+++ b/syncSocket.cpp
@@ -2,6 +2,7 @@
#include <iostream>
#include <string>
#include "routingTable.h"
+#include "rtpSessionTable.h"
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
@@ -54,6 +55,19 @@ void SyncSocket::OnAccept()
Send(lengthout.str());
Send(sout.str());
}
+ //TODO Locking here
+ RtpSessionMap::iterator rit = gRtpSessionTable.getBeginUnlocked();
+ for (;rit!=gRtpSessionTable.getEndUnlocked();++rit)
+ {
+ std::ostringstream sout;
+ boost::archive::text_oarchive oa(sout);
+ const SyncCommand scom(rit->first);
+ oa << scom;
+ std::stringstream lengthout;
+ lengthout << std::setw(5) << std::setfill('0') << sout.str().size()<< ' ';
+ Send(lengthout.str());
+ Send(sout.str());
+ }
}
//void StatusSocket::InitSSLServer()