summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-03-17 18:48:16 +0000
committerChristian Pointner <equinox@anytun.org>2008-03-17 18:48:16 +0000
commitdc3e77c4284b84b71ddabf1a813b18224b775217 (patch)
tree18703ca611ac1ba8719219d11fae5e6c434c23f7
parentfirst working version (diff)
finally added callIdQueue (sorry)
threads get started @ sync now TODO: cleanup threads and session on daed
-rw-r--r--Makefile10
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp52
-rw-r--r--anyrtpproxy/callIdQueue.cpp76
-rw-r--r--anyrtpproxy/callIdQueue.h72
-rw-r--r--anyrtpproxy/commandHandler.cpp1
-rw-r--r--rtpSession.cpp21
-rw-r--r--rtpSession.h8
-rw-r--r--rtpSessionTable.cpp22
-rw-r--r--rtpSessionTable.h1
9 files changed, 215 insertions, 48 deletions
diff --git a/Makefile b/Makefile
index 28216cd..c91c3be 100644
--- a/Makefile
+++ b/Makefile
@@ -59,6 +59,7 @@ OBJS = tunDevice.o \
keyDerivation.o \
rtpSessionTable.o \
rtpSession.o \
+ anyrtpproxy/callIdQueue.o \
mpi.o \
cipherFactory.o \
authAlgoFactory.o \
@@ -97,10 +98,11 @@ ANYCTROBJS = log.o \
signalController.o \
connectionList.o \
connectionParam.o \
- rtpSessionTable.o \
- rtpSession.o \
- syncRtpCommand.o \
- PracticalSocket.o \
+ rtpSessionTable.o \
+ rtpSession.o \
+ anyrtpproxy/callIdQueue.o \
+ syncRtpCommand.o \
+ PracticalSocket.o \
anyctrOptions.o \
router.o \
routingTable.o \
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index a308308..2b6184e 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -86,9 +86,9 @@ public:
void* listener(void* p)
{
ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
-
+
cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
-
+
try
{
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
@@ -132,7 +132,7 @@ class ListenerData
public:
ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2)
{};
-
+
UDPSocket* sock1_;
UDPSocket* sock2_;
pthread_t threads1_;
@@ -148,28 +148,38 @@ void* listenerManager(void* dont_use_me)
std::map<std::string, ListenerData> listenerMap;
while(1)
{
- std::string call_id = gCallIdQueue.front(); // waits for semphor and returns next call_id
+ std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
gCallIdQueue.pop();
RtpSession& session = gRtpSessionTable.getSession(call_id);
+ if(!session.isComplete())
+ continue;
-
- cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
- << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
- << session.getLocalAddr() << ":" << session.getLocalPort2();
-
- UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
- UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
-
- ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
- ListenerThreadParam(*sock1, *sock2, call_id, 2));
- ld.sock1_ = sock1;
- ld.sock2_ = sock2;
- pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
- pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
-
- listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
- }
+ std::map<std::string, ListenerData>::iterator it;
+ it = listenerMap.find(call_id);
+ if(it == listenerMap.end()) // listener Threads not existing yet
+ {
+ cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
+ << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
+ << session.getLocalAddr() << ":" << session.getLocalPort2();
+
+ UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
+ UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
+
+ ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
+ ListenerThreadParam(*sock1, *sock2, call_id, 2));
+ ld.sock1_ = sock1;
+ ld.sock2_ = sock2;
+ pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
+ pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
+
+ std::pair<std::map<std::string, ListenerData>::iterator, bool> ret;
+ ret = listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
+ it = ret.first;
+ continue;
+ }
+ // TODO: reinit if session is changed or cleanup if it is daed
+ }
}
catch(std::exception &e)
{
diff --git a/anyrtpproxy/callIdQueue.cpp b/anyrtpproxy/callIdQueue.cpp
new file mode 100644
index 0000000..bc90f08
--- /dev/null
+++ b/anyrtpproxy/callIdQueue.cpp
@@ -0,0 +1,76 @@
+/*
+ * anytun
+ *
+ * The secure anycast tunneling protocol (satp) defines a protocol used
+ * for communication between any combination of unicast and anycast
+ * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
+ * mode and allows tunneling of every ETHER TYPE protocol (e.g.
+ * ethernet, ip, arp ...). satp directly includes cryptography and
+ * message authentication based on the methodes used by SRTP. It is
+ * intended to deliver a generic, scaleable and secure solution for
+ * tunneling and relaying of packets of any protocol.
+ *
+ *
+ * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program (see the file COPYING included with this
+ * distribution); if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include "callIdQueue.h"
+
+CallIdQueue* CallIdQueue::inst = NULL;
+Mutex CallIdQueue::instMutex;
+CallIdQueue& gCallIdQueue = CallIdQueue::instance();
+
+CallIdQueue& CallIdQueue::instance()
+{
+ Lock lock(instMutex);
+ static instanceCleaner c;
+ if(!inst)
+ inst = new CallIdQueue();
+
+ return *inst;
+}
+
+CallIdQueue::CallIdQueue()
+{
+}
+
+CallIdQueue::~CallIdQueue()
+{
+ while(!callids_.empty())
+ pop();
+}
+
+std::string& CallIdQueue::front()
+{
+ sem_.down();
+ Lock lock(mutex_);
+ return callids_.front();
+}
+
+void CallIdQueue::push(std::string c)
+{
+ Lock lock(mutex_);
+ callids_.push(c);
+ sem_.up();
+}
+
+void CallIdQueue::pop()
+{
+ Lock lock(mutex_);
+ callids_.pop();
+}
+
diff --git a/anyrtpproxy/callIdQueue.h b/anyrtpproxy/callIdQueue.h
new file mode 100644
index 0000000..67e543d
--- /dev/null
+++ b/anyrtpproxy/callIdQueue.h
@@ -0,0 +1,72 @@
+/*
+ * anytun
+ *
+ * The secure anycast tunneling protocol (satp) defines a protocol used
+ * for communication between any combination of unicast and anycast
+ * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
+ * mode and allows tunneling of every ETHER TYPE protocol (e.g.
+ * ethernet, ip, arp ...). satp directly includes cryptography and
+ * message authentication based on the methodes used by SRTP. It is
+ * intended to deliver a generic, scaleable and secure solution for
+ * tunneling and relaying of packets of any protocol.
+ *
+ *
+ * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program (see the file COPYING included with this
+ * distribution); if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef __CALLID_QUEUE_H__
+#define __CALLID_QUEUE_H__
+
+#include <queue>
+#include <string>
+
+#include "../threadUtils.hpp"
+
+class CallIdQueue
+{
+public:
+ static CallIdQueue& instance();
+
+ std::string& front();
+ void push(std::string c);
+ void pop();
+
+private:
+ CallIdQueue();
+ ~CallIdQueue();
+
+ void operator=(const CallIdQueue &src);
+ CallIdQueue(const CallIdQueue &src);
+
+ static CallIdQueue* inst;
+ static ::Mutex instMutex;
+ class instanceCleaner {
+ public: ~instanceCleaner() {
+ if(CallIdQueue::inst != 0)
+ delete CallIdQueue::inst;
+ }
+ };
+ friend class instanceCleaner;
+
+ ::Mutex mutex_;
+ Semaphore sem_;
+ std::queue<std::string> callids_;
+};
+
+extern CallIdQueue& gCallIdQueue;
+
+#endif
diff --git a/anyrtpproxy/commandHandler.cpp b/anyrtpproxy/commandHandler.cpp
index 53e3e5f..71f4126 100644
--- a/anyrtpproxy/commandHandler.cpp
+++ b/anyrtpproxy/commandHandler.cpp
@@ -210,6 +210,7 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a
iss >> rport;
session.setRemotePort2(rport);
session.setRemoteAddr2(addr);
+ session.isComplete(true);
SyncCommand sc(call_id);
queue_.push(sc);
diff --git a/rtpSession.cpp b/rtpSession.cpp
index a547519..3dcc14b 100644
--- a/rtpSession.cpp
+++ b/rtpSession.cpp
@@ -30,8 +30,11 @@
#include "rtpSession.h"
-RtpSession::RtpSession() : in_sync_(false), dead_(false), local_addr_(""), local_port1_(0), local_port2_(0),
- remote_addr1_(""), remote_addr2_(""), remote_port1_(0), remote_port2_(0)
+#include "anyrtpproxy/callIdQueue.h"
+
+RtpSession::RtpSession(const std::string& call_id) : in_sync_(false), call_id_(call_id) , dead_(false), complete_(false),
+ local_addr_("") , local_port1_(0), local_port2_(0),
+ remote_addr1_(""), remote_addr2_(""), remote_port1_(0), remote_port2_(0)
{
}
@@ -39,7 +42,7 @@ void RtpSession::reinit()
{
Lock lock(mutex_);
-// TODO: inform threads of reinit
+ gCallIdQueue.push(call_id_);
}
bool RtpSession::isDead()
@@ -54,6 +57,18 @@ bool RtpSession::isDead(bool d)
return dead_ = d;
}
+bool RtpSession::isComplete()
+{
+ Lock lock(mutex_);
+ return complete_;
+}
+
+bool RtpSession::isComplete(bool c)
+{
+ Lock lock(mutex_);
+ return complete_ = c;
+}
+
std::string RtpSession::getLocalAddr()
{
Lock lock(mutex_);
diff --git a/rtpSession.h b/rtpSession.h
index 9059c2c..c6c6542 100644
--- a/rtpSession.h
+++ b/rtpSession.h
@@ -39,11 +39,14 @@
class RtpSession
{
public:
- RtpSession();
+ RtpSession(const std::string& call_id);
bool isDead();
bool isDead(bool d);
+ bool isComplete();
+ bool isComplete(bool c);
+
std::string getLocalAddr();
RtpSession& setLocalAddr(std::string a);
u_int16_t getLocalPort1();
@@ -79,6 +82,7 @@ private:
u_int16_t old_local_port2 = local_port2_;
ar & dead_;
+ ar & complete_;
ar & local_addr_;
ar & local_port1_;
ar & local_port2_;
@@ -96,7 +100,9 @@ private:
bool in_sync_;
::Mutex mutex_;
+ const std::string& call_id_;
bool dead_;
+ bool complete_;
std::string local_addr_;
u_int16_t local_port1_, local_port2_;
std::string remote_addr1_, remote_addr2_;
diff --git a/rtpSessionTable.cpp b/rtpSessionTable.cpp
index 8b361a4..5beb3d6 100644
--- a/rtpSessionTable.cpp
+++ b/rtpSessionTable.cpp
@@ -54,21 +54,7 @@ RtpSessionTable::~RtpSessionTable()
{
}
-void RtpSessionTable::addSession(const std::string & call_id, RtpSession* ses )
-{
- Lock lock(mutex_);
-
-
- std::pair<RtpSessionMap::iterator, bool> ret = map_.insert(RtpSessionMap::value_type(call_id,ses));
- if(!ret.second)
- {
- map_.erase(ret.first);
- map_.insert(RtpSessionMap::value_type(call_id,ses));
- }
-}
-
-
-void RtpSessionTable::delSession(const std::string & call_id )
+void RtpSessionTable::delSession(const std::string & call_id)
{
Lock lock(mutex_);
@@ -93,9 +79,9 @@ RtpSession& RtpSessionTable::getOrNewSessionUnlocked(const std::string & call_id
return *(it->second);
is_new = true;
- map_.insert(RtpSessionMap::value_type(call_id, new RtpSession()));
- it = map_.find(call_id);
- return *(it->second);
+ std::pair<RtpSessionMap::iterator, bool> ret = map_.insert(RtpSessionMap::value_type(call_id, NULL));
+ ret.first->second = new RtpSession(ret.first->first);
+ return *(ret.first->second);
}
RtpSession& RtpSessionTable::getSession(const std::string & call_id)
diff --git a/rtpSessionTable.h b/rtpSessionTable.h
index 61bfdeb..dc78979 100644
--- a/rtpSessionTable.h
+++ b/rtpSessionTable.h
@@ -44,7 +44,6 @@ public:
static RtpSessionTable& instance();
RtpSessionTable();
~RtpSessionTable();
- void addSession(const std::string & call_id, RtpSession* ses);
void delSession(const std::string & call_id);
bool empty();
void clear();