From 8efe1bd45ddab5e60c756c9c11506dfe469e8563 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 14 Mar 2008 19:33:40 +0000 Subject: first working version --- anyrtpproxy/Makefile | 4 + anyrtpproxy/anyrtpproxy.cpp | 205 +++++++++++++++++++++++++---------------- anyrtpproxy/commandHandler.cpp | 7 +- 3 files changed, 133 insertions(+), 83 deletions(-) (limited to 'anyrtpproxy') diff --git a/anyrtpproxy/Makefile b/anyrtpproxy/Makefile index 6b0e4a1..7e3475f 100644 --- a/anyrtpproxy/Makefile +++ b/anyrtpproxy/Makefile @@ -33,6 +33,7 @@ OBJS = anyrtpproxy.o \ ../networkPrefix.o \ ../Sockets/libSockets.a \ commandHandler.o \ + callIdQueue.o \ options.o EXECUTABLE = anyrtpproxy @@ -51,6 +52,9 @@ connectionList.o: connectionList.cpp connectionList.h commandHandler.o: commandHandler.cpp commandHandler.h $(C++) $(CCFLAGS) $< -c +callIdQueue.o: callIdQueue.cpp callIdQueue.h + $(C++) $(CCFLAGS) $< -c + anyrtpproxy.o: anyrtpproxy.cpp $(C++) $(CCFLAGS) $< -c diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index 280f99b..a308308 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -1,3 +1,33 @@ +/* + * anytun + * + * The secure anycast tunneling protocol (satp) defines a protocol used + * for communication between any combination of unicast and anycast + * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel + * mode and allows tunneling of every ETHER TYPE protocol (e.g. + * ethernet, ip, arp ...). satp directly includes cryptography and + * message authentication based on the methodes used by SRTP. It is + * intended to deliver a generic, scaleable and secure solution for + * tunneling and relaying of packets of any protocol. + * + * + * Copyright (C) 2007 anytun.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (see the file COPYING included with this + * distribution); if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + #include #include @@ -22,6 +52,7 @@ #include "../threadUtils.hpp" #include "commandHandler.h" +#include "callIdQueue.h" #include "options.h" #include @@ -40,99 +71,112 @@ public: OptionConnectTo & connto; }; - -class ControlHost : public Host +class ListenerThreadParam { public: - ControlHost() : Host("",0) {}; - bool operator<(const ControlHost& cmp_to) + ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d) + {}; + + UDPSocket& sock1_; + UDPSocket& sock2_; + std::string call_id_; + int dir_; +}; + +void* listener(void* p) +{ + ListenerThreadParam* param = reinterpret_cast(p); + + cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; + + try { - return port_ < cmp_to.port_; + Buffer buf(u_int32_t(MAX_PACKET_SIZE)); + string remote_addr; + u_int16_t remote_port; + while(1) { + buf.setLength(MAX_PACKET_SIZE); + u_int32_t len; + if(param->dir_ == 1) + len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port); + else if(param->dir_ == 2) + len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port); + buf.setLength(len); + + RtpSession& session = gRtpSessionTable.getSession(param->call_id_); + if(session.isDead()) + break; + + //TODO: if weak? don't check but save the new(?) remote addr into list + if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) || + (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2()))) + continue; + + if(param->dir_ == 1) + param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2()); + else if(param->dir_ == 2) + param->sock1_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr1(), session.getRemotePort1()); + } } -}; + catch(std::exception &e) + { + cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what(); + } + cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally"; -class ControlHostMap + pthread_exit(NULL); +} + +class ListenerData { public: - - -private: - ::Mutex mutex; - - std::map > control_hosts_; + ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2) + {}; + + UDPSocket* sock1_; + UDPSocket* sock2_; + pthread_t threads1_; + pthread_t threads2_; + ListenerThreadParam params1_; + ListenerThreadParam params2_; }; -void* sender(void* dont_use_me) +void* listenerManager(void* dont_use_me) { -// try -// { -// HostList remote_host_list(gOpt.getRemoteHosts()); -// UDPSocket control_sock(gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_); - -// Buffer buf(u_int32_t(MAX_PACKET_SIZE)); -// string remote_host; -// u_int16_t remote_port; -// while(1) { -// buf.setLength(MAX_PACKET_SIZE); -// u_int32_t len = control_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); -// buf.setLength(len); - -//TODO????//TODO????//TODO????//TODO????//TODO????//TODO????//TODO???? control.setHost(remote_host, remote_port); - -// SenderThreadParam receiverParam = {control_host, control_sock, sock, gOpt.getRemoteHosts().front()}; -// pthread_t receiverThread; -// pthread_create(&receiverThread, NULL, receiver, &receiverParam); -// pthread_detach(receiverThread); + try + { + std::map listenerMap; + while(1) + { + std::string call_id = gCallIdQueue.front(); // waits for semphor and returns next call_id + gCallIdQueue.pop(); - -// HostList::const_iterator it = remote_host_list.begin(); -// for(;it != remote_host_list.end(); it++) -// param->sock_.sendTo(buf.getBuf(), buf.getLength(), it->addr_, it->port_); -// } -// } -// catch(std::exception &e) -// { -// cLog.msg(Log::PRIO_ERR) << "sender exiting because: " << e.what() << std::endl; -// } - pthread_exit(NULL); -} + RtpSession& session = gRtpSessionTable.getSession(call_id); + cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: " + << session.getLocalAddr() << ":" << session.getLocalPort1() << " " + << session.getLocalAddr() << ":" << session.getLocalPort2(); -void* receiver(void* p) -{ -// SenderThreadParam* param = reinterpret_cast(p); - -// try -// { -// Buffer buf(u_int32_t(MAX_PACKET_SIZE)); -// string remote_host; -// u_int16_t remote_port; - -// while(1) { -// buf.setLength(MAX_PACKET_SIZE); -// u_int32_t len = param->sock_.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port); -// buf.setLength(len); - -// if(remote_host != param->first_receiver_.addr_ || remote_port != param->first_receiver_.port_) -// continue; + UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); + UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); + + ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1), + ListenerThreadParam(*sock1, *sock2, call_id, 2)); + ld.sock1_ = sock1; + ld.sock2_ = sock2; + pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_)); + pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_)); -// Host control_host = param->control_.getHost(); -// if(control_host.addr_ == "" || !control_host.port_) -// { -// cLog.msg(Log::PRIO_NOTICE) << "no control host detected till now, ignoring packet"; -// continue; -// } - -// param->control_sock_.sendTo(buf.getBuf(), buf.getLength(), control_host.addr_, control_host.port_); -// } -// } -// catch(std::exception &e) -// { -// cLog.msg(Log::PRIO_ERR) << "receiver exiting because: " << e.what() << std::endl; -// } + listenerMap.insert(std::map::value_type(call_id, ld)); + } + } + catch(std::exception &e) + { + cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because: " << e.what(); + } pthread_exit(NULL); -} +} void chrootAndDrop(string const& chrootdir, string const& username) { @@ -241,9 +285,10 @@ int main(int argc, char* argv[]) SignalController sig; sig.init(); -// pthread_t senderThread; -// pthread_create(&senderThread, NULL, sender, NULL); -// pthread_detach(senderThread); + pthread_t listenerManagerThread; + pthread_create(&listenerManagerThread, NULL, listenerManager, NULL); + pthread_detach(listenerManagerThread); + pthread_t syncListenerThread; SyncQueue queue; diff --git a/anyrtpproxy/commandHandler.cpp b/anyrtpproxy/commandHandler.cpp index 58d9974..53e3e5f 100644 --- a/anyrtpproxy/commandHandler.cpp +++ b/anyrtpproxy/commandHandler.cpp @@ -40,6 +40,7 @@ #include "../syncQueue.h" #include "../syncCommand.h" #include "../rtpSessionTable.h" +#include "callIdQueue.h" #define MAX_COMMAND_LENGTH 1000 @@ -187,7 +188,7 @@ string CommandHandler::handleRequest(string modifiers, string call_id, string ad queue_.push(sc); ostringstream oss; - oss << session.getLocalPort1(); + oss << session.getLocalPort2(); return oss.str(); } catch(std::exception& e) @@ -212,10 +213,10 @@ string CommandHandler::handleResponse(string modifiers, string call_id, string a SyncCommand sc(call_id); queue_.push(sc); - session.init(); + gCallIdQueue.push(call_id); ostringstream oss; - oss << session.getLocalPort2(); + oss << session.getLocalPort1(); return oss.str(); } catch(std::exception& e) -- cgit v1.2.3