summaryrefslogtreecommitdiff
path: root/anyrtpproxy/anyrtpproxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'anyrtpproxy/anyrtpproxy.cpp')
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp106
1 files changed, 95 insertions, 11 deletions
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index 311dc4f..e856e16 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -16,17 +16,49 @@
#define MAX_PACKET_SIZE 1500
-void* worker(void* l)
+class OpenSerHost
{
- IfListElement* interface = reinterpret_cast<IfListElement*>(l);
+public:
+ OpenSerHost() : host_("",0) {};
+
+ IfListElement getHost() {
+ Lock lock(mutex);
+ return host_;
+ }
+
+ void setHost(std::string host, u_int16_t port)
+ {
+ Lock lock(mutex);
+ if(host_.host_ != host || host_.port_ != port)
+ cLog.msg(Log::PRIO_NOTICE) << "openSer Host detected at " << host << ":" << port;
+
+ host_.host_ = host;
+ host_.port_ = port;
+ }
+
+private:
+ Mutex mutex;
+
+ IfListElement host_;
+};
+
+struct ThreadParam
+{
+ OpenSerHost& open_ser_;
+ IfListElement interface_;
+};
+
+void* sender(void* p)
+{
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
try
{
- UDPSocket recv_sock(interface->host_, interface->port_);
- UDPSocket send_sock;
- IfList remote_host_list = gOpt.getRemoteHosts();
+ UDPSocket recv_sock(param->interface_.host_, param->interface_.port_);
+ UDPSocket send_sock(gOpt.getSendPort());
+ IfList remote_host_list(gOpt.getRemoteHosts());
- cLog.msg(Log::PRIO_NOTICE) << "worker listening on: " << interface->toString();
+ cLog.msg(Log::PRIO_NOTICE) << "sender listening on: " << param->interface_.toString();
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
while(1) {
@@ -37,6 +69,8 @@ void* worker(void* l)
u_int32_t len = recv_sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port);
buf.setLength(len);
+ param->open_ser_.setHost(remote_host, remote_port);
+
IfList::const_iterator it = remote_host_list.begin();
for(;it != remote_host_list.end(); it++)
send_sock.sendTo(buf.getBuf(), buf.getLength(), it->host_, it->port_);
@@ -44,12 +78,52 @@ void* worker(void* l)
}
catch(std::exception &e)
{
- cLog.msg(Log::PRIO_ERR) << "worker(" << interface->toString() << ") exiting because: " << e.what() << std::endl;
+ cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl;
}
pthread_exit(NULL);
}
+
+void* receiver(void* p)
+{
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+
+ try
+ {
+ UDPSocket sock(gOpt.getSendPort());
+
+ cLog.msg(Log::PRIO_NOTICE) << "receiver listening for packets from: " << param->interface_.toString();
+
+ Buffer buf(u_int32_t(MAX_PACKET_SIZE));
+ while(1) {
+ string remote_host;
+ u_int16_t remote_port;
+
+ buf.setLength(MAX_PACKET_SIZE);
+ u_int32_t len = sock.recvFrom(buf.getBuf(), buf.getLength(), remote_host, remote_port);
+ buf.setLength(len);
+
+ if(remote_host != param->interface_.host_ || remote_port != param->interface_.port_)
+ continue;
+
+ IfListElement openSerHost = param->open_ser_.getHost();
+ if(openSerHost.host_ == "" || !openSerHost.port_)
+ {
+ cLog.msg(Log::PRIO_NOTICE) << "no openser host detected till now, ignoring packet";
+ continue;
+ }
+
+ sock.sendTo(buf.getBuf(), buf.getLength(), openSerHost.host_, openSerHost.port_);
+ }
+ }
+ catch(std::exception &e)
+ {
+ cLog.msg(Log::PRIO_ERR) << "sender(" << param->interface_.toString() << ") exiting because: " << e.what() << std::endl;
+ }
+ pthread_exit(NULL);
+}
+
void chrootAndDrop(string const& chrootdir, string const& username)
{
if (getuid() != 0)
@@ -122,16 +196,26 @@ int main(int argc, char* argv[])
SignalController sig;
sig.init();
-
+ OpenSerHost open_ser;
+ std::list<ThreadParam> params;
IfList listeners(gOpt.getLocalInterfaces());
IfList::iterator it = listeners.begin();
for(;it != listeners.end();++it)
{
- pthread_t workerThread;
- pthread_create(&workerThread, NULL, worker, static_cast<void*>(&(*it)));
- pthread_detach(workerThread);
+ ThreadParam param = {open_ser, *it};
+ params.push_back(param);
+ pthread_t senderThread;
+ pthread_create(&senderThread, NULL, sender, &(params.back()));
+ pthread_detach(senderThread);
}
+
+ ThreadParam param = {open_ser, gOpt.getRemoteHosts().front()};
+ params.push_back(param);
+ pthread_t receiverThread;
+ pthread_create(&receiverThread, NULL, receiver, &(params.back()));
+ pthread_detach(receiverThread);
+
int ret = sig.run();
return ret;