summaryrefslogtreecommitdiff
path: root/anyrtpproxy
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-03-17 18:48:16 +0000
committerChristian Pointner <equinox@anytun.org>2008-03-17 18:48:16 +0000
commitdc3e77c4284b84b71ddabf1a813b18224b775217 (patch)
tree18703ca611ac1ba8719219d11fae5e6c434c23f7 /anyrtpproxy
parentfirst working version (diff)
finally added callIdQueue (sorry)
threads get started @ sync now TODO: cleanup threads and session on daed
Diffstat (limited to 'anyrtpproxy')
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp52
-rw-r--r--anyrtpproxy/callIdQueue.cpp76
-rw-r--r--anyrtpproxy/callIdQueue.h72
-rw-r--r--anyrtpproxy/commandHandler.cpp1
4 files changed, 180 insertions, 21 deletions
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index a308308..2b6184e 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -86,9 +86,9 @@ public:
void* listener(void* p)
{
ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
-
+
cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
-
+
try
{
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
@@ -132,7 +132,7 @@ class ListenerData
public:
ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2)
{};
-
+
UDPSocket* sock1_;
UDPSocket* sock2_;
pthread_t threads1_;
@@ -148,28 +148,38 @@ void* listenerManager(void* dont_use_me)
std::map<std::string, ListenerData> listenerMap;
while(1)
{
- std::string call_id = gCallIdQueue.front(); // waits for semphor and returns next call_id
+ std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
gCallIdQueue.pop();
RtpSession& session = gRtpSessionTable.getSession(call_id);
+ if(!session.isComplete())
+ continue;
-
- cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
- << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
- << session.getLocalAddr() << ":" << session.getLocalPort2();
-
- UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
- UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
-
- ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
- ListenerThreadParam(*sock1, *sock2, call_id, 2));
- ld.sock1_ = sock1;
- ld.sock2_ = sock2;
- pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
- pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
-
- listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
- }
+ std::map<std::string, ListenerData>::iterator it;
+ it = listenerMap.find(call_id);
+ if(it == listenerMap.end()) // listener Threads not existing yet
+ {
+ cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
+ << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
+ << session.getLocalAddr() << ":" << session.getLocalPort2();
+
+ UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
+ UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
+
+ ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
+ ListenerThreadParam(*sock1, *sock2, call_id, 2));
+ ld.sock1_ = sock1;
+ ld.sock2_ = sock2;
+ pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
+ pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
+
+ std::pair<std::map<std::string, ListenerData>::iterator, bool> ret;
+ ret = listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
+ it = ret.first;
+ continue;
+ }
+ // TODO: reinit if session is changed or cleanup if it is daed
+ }
}
catch(std::exception &e)
{
diff --git a/anyrtpproxy/callIdQueue.cpp b/anyrtpproxy/callIdQueue.cpp
new file mode 100644
index 0000000..bc90f08
--- /dev/null
+++ b/anyrtpproxy/callIdQueue.cpp
@@ -0,0 +1,76 @@
+/*
+ * anytun
+ *
+ * The secure anycast tunneling protocol (satp) defines a protocol used
+ * for communication between any combination of unicast and anycast
+ * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
+ * mode and allows tunneling of every ETHER TYPE protocol (e.g.
+ * ethernet, ip, arp ...). satp directly includes cryptography and
+ * message authentication based on the methodes used by SRTP. It is
+ * intended to deliver a generic, scaleable and secure solution for
+ * tunneling and relaying of packets of any protocol.
+ *
+ *
+ * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program (see the file COPYING included with this
+ * distribution); if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include "callIdQueue.h"
+
+CallIdQueue* CallIdQueue::inst = NULL;
+Mutex CallIdQueue::instMutex;
+CallIdQueue& gCallIdQueue = CallIdQueue::instance();
+
+CallIdQueue& CallIdQueue::instance()
+{
+ Lock lock(instMutex);
+ static instanceCleaner c;
+ if(!inst)
+ inst = new CallIdQueue();
+
+ return *inst;
+}
+
+CallIdQueue::CallIdQueue()
+{
+}
+
+CallIdQueue::~CallIdQueue()
+{
+ while(!callids_.empty())
+ pop();
+}
+
+std::string& CallIdQueue::front()
+{
+ sem_.down();
+ Lock lock(mutex_);
+ return callids_.front();
+}
+
+void CallIdQueue::push(std::string c)
+{
+ Lock lock(mutex_);
+ callids_.push(c);
+ sem_.up();
+}
+
+void CallIdQueue::pop()
+{
+ Lock lock(mutex_);
+ callids_.pop();
+}
+
diff --git a/anyrtpproxy/callIdQueue.h b/anyrtpproxy/callIdQueue.h
new file mode 100644
index 0000000..67e543d
--- /dev/null
+++ b/anyrtpproxy/callIdQueue.h
@@ -0,0 +1,72 @@
+/*
+ * anytun
+ *
+ * The secure anycast tunneling protocol (satp) defines a protocol used
+ * for communication between any combination of unicast and anycast
+ * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
+ * mode and allows tunneling of every ETHER TYPE protocol (e.g.
+ * ethernet, ip, arp ...). satp directly includes cryptography and
+ * message authentication based on the methodes used by SRTP. It is
+ * intended to deliver a generic, scaleable and secure solution for
+ * tunneling and relaying of packets of any protocol.
+ *
+ *
+ * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program (see the file COPYING included with this
+ * distribution); if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef __CALLID_QUEUE_H__
+#define __CALLID_QUEUE_H__
+
+#include <queue>
+#include <string>
+
+#include "../threadUtils.hpp"
+
+class CallIdQueue
+{
+public:
+ static CallIdQueue& instance();
+
+ std::string& front();
+ void push(std::string c);
+ void pop();
+
+private:
+ CallIdQueue();
+ ~CallIdQueue();
+
+ void operator=(const CallIdQueue &src);
+ CallIdQueue(const CallIdQueue &src);
+
+ static CallIdQueue* inst;
+ static ::Mutex instMutex;
+ class instanceCleaner {
+ public: ~instanceCleaner() {
+ if(CallIdQueue::inst != 0)
+ delete CallIdQueue::inst;
+ }
+ };
+ friend class instanceCleaner;
+
+ ::Mutex mutex_;
+ Semaphore sem_;
+ std::queue<std::string> callids_;
+};
+
+extern CallIdQueue& gCallIdQueue;
+
+#endif
diff --git a/anyrtpproxy/commandHandler.cpp b/anyrtpproxy/commandHandler.cpp
index 53e3e5f..71f4126 100644
--- a/anyrtpproxy/commandHandler.cpp
+++ b/anyrtpproxy/commandHandler.cpp
@@ -210,6 +210,7 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a
iss >> rport;
session.setRemotePort2(rport);
session.setRemoteAddr2(addr);
+ session.isComplete(true);
SyncCommand sc(call_id);
queue_.push(sc);