summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--PracticalSocket.cpp19
-rw-r--r--PracticalSocket.h4
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp106
-rw-r--r--anyrtpproxy/options.cpp27
-rw-r--r--anyrtpproxy/options.h3
5 files changed, 145 insertions, 14 deletions
diff --git a/PracticalSocket.cpp b/PracticalSocket.cpp
index 658bc6e..5d39557 100644
--- a/PracticalSocket.cpp
+++ b/PracticalSocket.cpp
@@ -186,6 +186,14 @@ void Socket::setLocalAddressAndPort(const string &localAddress,
}
}
+void Socket::setSocketOpt(int optionName, const void* optionValue, socklen_t optionLen)
+ throw(SocketException)
+{
+ if (::setsockopt(sockDesc, SOL_SOCKET, optionName, optionValue, optionLen) < 0) {
+ throw SocketException("setSockopt failed", true);
+ }
+}
+
void Socket::cleanUp() throw(SocketException) {
#ifdef WIN32
if (WSACleanup() != 0) {
@@ -282,6 +290,8 @@ TCPSocket::TCPSocket(int newConnSD) : CommunicatingSocket(newConnSD) {
TCPServerSocket::TCPServerSocket(unsigned short localPort, int queueLen)
throw(SocketException) : Socket(SOCK_STREAM, IPPROTO_TCP) {
+ const int opt = 1;
+ setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt));
setLocalPort(localPort);
setListen(queueLen);
}
@@ -289,6 +299,8 @@ TCPServerSocket::TCPServerSocket(unsigned short localPort, int queueLen)
TCPServerSocket::TCPServerSocket(const string &localAddress,
unsigned short localPort, int queueLen)
throw(SocketException) : Socket(SOCK_STREAM, IPPROTO_TCP) {
+ const int opt = 1;
+ setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt));
setLocalAddressAndPort(localAddress, localPort);
setListen(queueLen);
}
@@ -312,17 +324,24 @@ void TCPServerSocket::setListen(int queueLen) throw(SocketException) {
UDPSocket::UDPSocket() throw(SocketException) : CommunicatingSocket(SOCK_DGRAM,
IPPROTO_UDP) {
+
+ const int opt = 1;
+ setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt));
setBroadcast();
}
UDPSocket::UDPSocket(unsigned short localPort) throw(SocketException) :
CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) {
+ const int opt = 1;
+ setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt));
setLocalPort(localPort);
setBroadcast();
}
UDPSocket::UDPSocket(const string &localAddress, unsigned short localPort)
throw(SocketException) : CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) {
+ const int opt = 1;
+ setSocketOpt(SO_REUSEADDR, &opt, sizeof(opt));
setLocalAddressAndPort(localAddress, localPort);
setBroadcast();
}
diff --git a/PracticalSocket.h b/PracticalSocket.h
index dcb39a9..0192cfd 100644
--- a/PracticalSocket.h
+++ b/PracticalSocket.h
@@ -128,6 +128,10 @@ public:
void setLocalAddressAndPort(const string &localAddress,
unsigned short localPort = 0) throw(SocketException);
+
+ void setSocketOpt(int optionName, const void* optionValue, socklen_t optionLen)
+ throw(SocketException);
+
/**
* If WinSock, unload the WinSock DLLs; otherwise do nothing. We ignore
* this in our sample client code but include it in the library for
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index 311dc4f..e856e16 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -16,17 +16,49 @@
#define MAX_PACKET_SIZE 1500
-void* worker(void* l)
+class OpenSerHost
{
- IfListElement* interface = reinterpret_cast<IfListElement*>(l);
+public:
+ OpenSerHost() : host_("",0) {};
+
+ IfListElement getHost() {
+ Lock lock(mutex);
+ return host_;
+ }
+
+ void setHost(std::string host, u_int16_t port)
+ {
+ Lock lock(mutex);
+ if(host_.host_ != host || host_.port_ != port)
+ cLog.msg(Log::PRIO_NOTICE) << "openSer Host detected at " << host << ":" << port;
+
+ host_.host_ = host;
+ host_.port_ = port;
+ }
+
+private:
+ Mutex mutex;
+
+ IfListElement host_;
+};
+
+struct ThreadParam
+{
+ OpenSerHost& open_ser_;
+ IfListElement interface_;
+};
+
+void* sender(void* p)
+{
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
try
{
- UDPSocket recv_sock(interface->host_, interface->port_);
- UDPSocket send_sock;
- IfList remote_host_list = gOpt.getRemoteHosts();
+ UDPSocket recv_sock(param->interface_.host_, param->interface_.port_);
+ UDPSocket send_sock(gOpt.getSendPort());
+ IfList remote_host_list(gOpt.getRemoteHosts());
- cLog.msg(Log::PRIO_NOTICE) << "worker listening on: " << interface->toString();
+ cLog.msg(Log::PRIO_NOTICE) << "sender listening on: " << param->interface_.toString();
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
while(1) {
@@ -37,6 +69,8 @@ void* worker(void* l)
u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port);
buf.setLength(len);
+ param->open_ser_.setHost(remote_host, remote_port);
+
IfList::const_iterator it = remote_host_list.begin();
for(;it != remote_host_list.end(); it++)
send_sock.sendTo(buf.getBuf(), buf.getLength(), it->host_, it->port_);
@@ -44,12 +78,52 @@ void* worker(void* l)
}
catch(std::exception &e)
{
- cLog.msg(Log::PRIO_ERR) << "worker(" << interface->toString() << ") exiting because: " << e.what() << std::endl;
+ cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl;
}
pthread_exit(NULL);
}
+
+void* receiver(void* p)
+{
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+
+ try
+ {
+ UDPSocket sock(gOpt.getSendPort());
+
+ cLog.msg(Log::PRIO_NOTICE) << "receiver listening for packets from: " << param->interface_.toString();
+
+ Buffer buf(u_int32_t(MAX_PACKET_SIZE));
+ while(1) {
+ string remote_host;
+ u_int16_t remote_port;
+
+ buf.setLength(MAX_PACKET_SIZE);
+ u_int32_t len = sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port);
+ buf.setLength(len);
+
+ if(remote_host != param->interface_.host_ || remote_port != param->interface_.port_)
+ continue;
+
+ IfListElement openSerHost = param->open_ser_.getHost();
+ if(openSerHost.host_ == "" || !openSerHost.port_)
+ {
+ cLog.msg(Log::PRIO_NOTICE) << "no openser host detected till now, ignoring packet";
+ continue;
+ }
+
+ sock.sendTo(buf.getBuf(), buf.getLength(), openSerHost.host_, openSerHost.port_);
+ }
+ }
+ catch(std::exception &e)
+ {
+ cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl;
+ }
+ pthread_exit(NULL);
+}
+
void chrootAndDrop(string const& chrootdir, string const& username)
{
if (getuid() != 0)
@@ -122,16 +196,26 @@ int main(int argc, char* argv[])
SignalController sig;
sig.init();
-
+ OpenSerHost open_ser;
+ std::list<ThreadParam> params;
IfList listeners(gOpt.getLocalInterfaces());
IfList::iterator it = listeners.begin();
for(;it != listeners.end();++it)
{
- pthread_t workerThread;
- pthread_create(&workerThread, NULL, worker, static_cast<void*>(&(*it)));
- pthread_detach(workerThread);
+ ThreadParam param = {open_ser, *it};
+ params.push_back(param);
+ pthread_t senderThread;
+ pthread_create(&senderThread, NULL, sender, &(params.back()));
+ pthread_detach(senderThread);
}
+
+ ThreadParam param = {open_ser, gOpt.getRemoteHosts().front()};
+ params.push_back(param);
+ pthread_t receiverThread;
+ pthread_create(&receiverThread, NULL, receiver, &(params.back()));
+ pthread_detach(receiverThread);
+
int ret = sig.run();
return ret;
diff --git a/anyrtpproxy/options.cpp b/anyrtpproxy/options.cpp
index 6665f3d..6e7f291 100644
--- a/anyrtpproxy/options.cpp
+++ b/anyrtpproxy/options.cpp
@@ -56,6 +56,7 @@ Options::Options()
username_ = "nobody";
chroot_dir_ = "/var/run";
daemonize_ = true;
+ send_port_ = 22220;
local_interfaces_.push_back(IfListElement("0.0.0.0", 22221));
remote_hosts_.push_back(IfListElement("127.0.0.1", 22222));
}
@@ -142,7 +143,7 @@ bool Options::parse(int argc, char* argv[])
PARSE_SCALAR_PARAM("-u","--user", username_)
PARSE_SCALAR_PARAM("-c","--chroot-dir", chroot_dir_)
PARSE_INVERSE_BOOL_PARAM("-d","--nodaemonize", daemonize_)
- PARSE_SCALAR_PARAM("-c","--chroot-dir", chroot_dir_)
+ PARSE_SCALAR_PARAM("-p","--port", send_port_)
PARSE_CSLIST_PARAM("-l","--listen", local_interfaces_)
PARSE_CSLIST_PARAM("-r","--hosts", remote_hosts_)
else
@@ -168,8 +169,14 @@ bool Options::sanityCheck()
void Options::printUsage()
{
std::cout << "USAGE:" << std::endl;
- std::cout << "plain_tool [-h|--help] prints this..." << std::endl;
-// std::cout << " [-K|--key] <master key> master key to use for encryption" << std::endl;
+ std::cout << "anyrtpproxy [-h|--help] prints this..." << std::endl;
+ std::cout << " [-t|--chroot] chroot and drop priviledges" << std::endl;
+ std::cout << " [-u|--username] <username> in case of chroot run as this user" << std::endl;
+ std::cout << " [-c|--chroot-dir] <directory> directory to make a chroot to" << std::endl;
+ std::cout << " [-d|--nodaemonize] don't run in background" << std::endl;
+ std::cout << " [-p|--port] <port> use this port to send out packets" << std::endl;
+ std::cout << " [-l|--listen] <host[:port]>[,<host>[:<port> ..] a list of local interfaces to listen on" << std::endl;
+ std::cout << " [-r|--hosts] <host[:port]>[,<host>[:<port> ..] a list of remote hosts to send duplicates to" << std::endl;
}
void Options::printOptions()
@@ -180,6 +187,7 @@ void Options::printOptions()
std::cout << "username='" << username_ << "'" << std::endl;
std::cout << "chroot-dir='" << chroot_dir_ << "'" << std::endl;
std::cout << "daemonize='" << daemonize_ << "'" << std::endl;
+ std::cout << "send-port='" << send_port_ << "'" << std::endl;
std::cout << "local interfaces='";
IfList::const_iterator it=local_interfaces_.begin();
for(u_int32_t i=0; it != local_interfaces_.end(); ++it, ++i)
@@ -264,6 +272,19 @@ Options& Options::setDaemonize(bool d)
return *this;
}
+u_int16_t Options::getSendPort()
+{
+ Lock lock(mutex);
+ return send_port_;
+}
+
+Options& Options::setSendPort(u_int16_t p)
+{
+ Lock lock(mutex);
+ send_port_ = p;
+ return *this;
+}
+
IfList Options::getLocalInterfaces()
{
Lock lock(mutex);
diff --git a/anyrtpproxy/options.h b/anyrtpproxy/options.h
index 48df36a..48139f7 100644
--- a/anyrtpproxy/options.h
+++ b/anyrtpproxy/options.h
@@ -77,6 +77,8 @@ public:
Options& setChrootDir(std::string c);
bool getDaemonize();
Options& setDaemonize(bool d);
+ u_int16_t getSendPort();
+ Options& setSendPort(u_int16_t p);
IfList getLocalInterfaces();
IfList getRemoteHosts();
@@ -104,6 +106,7 @@ private:
std::string username_;
std::string chroot_dir_;
bool daemonize_;
+ u_int16_t send_port_;
IfList local_interfaces_;
IfList remote_hosts_;
};