summaryrefslogtreecommitdiff
path: root/src/anyrtpproxy/anyrtpproxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/anyrtpproxy/anyrtpproxy.cpp')
-rw-r--r--src/anyrtpproxy/anyrtpproxy.cpp238
1 files changed, 112 insertions, 126 deletions
diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp
index 7c5514c..22d7494 100644
--- a/src/anyrtpproxy/anyrtpproxy.cpp
+++ b/src/anyrtpproxy/anyrtpproxy.cpp
@@ -11,7 +11,7 @@
* tunneling and relaying of packets of any protocol.
*
*
- * Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl,
+ * Copyright (C) 2007-2009 Othmar Gsenger, Erwin Nindl,
* Christian Pointner <satp@wirdorange.org>
*
* This file is part of Anytun.
@@ -66,64 +66,62 @@ void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2
{
cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
- try
- {
+ try {
Buffer buf(u_int32_t(MAX_PACKET_SIZE));
RtpSession::proto::endpoint remote_end;
while(1) {
buf.setLength(MAX_PACKET_SIZE);
u_int32_t len=0;
- if(dir == 1)
- len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
- else if(dir == 2)
- len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
- else break;
+ if(dir == 1) {
+ len = 0; //sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
+ } else if(dir == 2) {
+ len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
+ } else { break; }
RtpSession& session = gRtpSessionTable.getSession(call_id);
if(session.isDead()) {
- cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
+ cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
break;
}
- if(!len)
+ if(!len) {
continue;
+ }
buf.setLength(len);
-
- if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
- (dir == 2 && remote_end != session.getRemoteEnd2()))
- {
+
+ if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
+ (dir == 2 && remote_end != session.getRemoteEnd2())) {
if(gOpt.getNat() ||
- (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
- (dir == 2 && !session.getSeen2()))))
- {
+ (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
+ (dir == 2 && !session.getSeen2())))) {
cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
<< remote_end;
- if(dir == 1)
+ if(dir == 1) {
session.setRemoteEnd1(remote_end);
- if(dir == 2)
+ }
+ if(dir == 2) {
session.setRemoteEnd2(remote_end);
-
+ }
+
if(!gOpt.getNat()) { // with nat enabled sync is not needed
SyncRtpCommand sc(call_id);
queue->push(sc);
}
- }
- else
+ } else {
continue;
- }
+ }
+ }
session.setSeen1();
session.setSeen2();
- if(dir == 1)
+ if(dir == 1) {
sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
- else if(dir == 2)
+ } else if(dir == 2) {
sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
- else break;
- }
- }
- catch(std::exception &e)
- {
+ } else { break; }
+ }
+ } catch(std::exception& e) {
cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
}
*running = false;
@@ -150,21 +148,19 @@ void listenerManager(void* p)
SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
std::map<std::string, ListenerData*> listenerMap;
- while(1)
- {
- try
- {
+ while(1) {
+ try {
std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
gCallIdQueue.pop();
RtpSession& session = gRtpSessionTable.getSession(call_id);
- if(!session.isComplete())
+ if(!session.isComplete()) {
continue;
+ }
std::map<std::string, ListenerData*>::iterator it;
it = listenerMap.find(call_id);
- if(it == listenerMap.end()) // listener Threads not existing yet
- {
+ if(it == listenerMap.end()) { // listener Threads not existing yet
ListenerData* ld = new ListenerData();
ld->sock1_.open(session.getLocalEnd1().protocol());
@@ -181,8 +177,7 @@ void listenerManager(void* p)
continue;
}
- if(!it->second->running1_ && !it->second->running2_)
- {
+ if(!it->second->running1_ && !it->second->running2_) {
cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
if(it->second->thread1_) {
it->second->thread1_->join();
@@ -197,10 +192,8 @@ void listenerManager(void* p)
gRtpSessionTable.delSession(call_id);
continue;
}
- // TODO: reinit if session changed
- }
- catch(std::exception &e)
- {
+ // TODO: reinit if session changed
+ } catch(std::exception& e) {
cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
usleep(500); // in case of an hard error don't block cpu (this is ugly)
}
@@ -210,33 +203,28 @@ void listenerManager(void* p)
void chrootAndDrop(string const& chrootdir, string const& username)
{
- if (getuid() != 0)
- {
- std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
- exit(-1);
- }
-
- struct passwd *pw = getpwnam(username.c_str());
- if(pw) {
- if(chroot(chrootdir.c_str()))
- {
+ if(getuid() != 0) {
+ std::cerr << "this program has to be run as root in order to run in a chroot" << std::endl;
+ exit(-1);
+ }
+
+ struct passwd* pw = getpwnam(username.c_str());
+ if(pw) {
+ if(chroot(chrootdir.c_str())) {
std::cerr << "can't chroot to " << chrootdir << std::endl;
exit(-1);
- }
+ }
std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
chdir("/");
- if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid))
- {
- std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
- exit(-1);
- }
+ if(initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid)) {
+ std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
+ exit(-1);
+ }
std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
- }
- else
- {
+ } else {
std::cerr << "unknown user " << username << std::endl;
exit(-1);
- }
+ }
}
void daemonize()
@@ -244,65 +232,62 @@ void daemonize()
pid_t pid;
pid = fork();
- if(pid) exit(0);
+ if(pid) { exit(0); }
setsid();
pid = fork();
- if(pid) exit(0);
-
-// std::cout << "running in background now..." << std::endl;
+ if(pid) { exit(0); }
+
+ // std::cout << "running in background now..." << std::endl;
int fd;
-// for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
- for (fd=0;fd<=2;fd++) // close all file descriptors
+ // for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
+ for(fd=0; fd<=2; fd++) { // close all file descriptors
close(fd);
+ }
fd=open("/dev/null",O_RDWR); // stdin
dup(fd); // stdout
dup(fd); // stderr
- umask(027);
+ umask(027);
}
class ThreadParam
{
public:
- ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
+ ThreadParam(SyncQueue& queue_,OptionConnectTo& connto_)
: queue(queue_),connto(connto_)
- {};
- SyncQueue & queue;
- OptionConnectTo & connto;
+ {};
+ SyncQueue& queue;
+ OptionConnectTo& connto;
};
void syncConnector(void* p)
{
- ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
- SyncClient sc ( param->connto.host, param->connto.port);
- sc.run();
+ SyncClient sc(param->connto.host, param->connto.port);
+ sc.run();
}
-void syncListener(SyncQueue * queue)
+void syncListener(SyncQueue* queue)
{
- try
- {
+ try {
boost::asio::io_service io_service;
- SyncTcpConnection::proto::resolver resolver(io_service);
- SyncTcpConnection::proto::endpoint e;
- if(gOpt.getLocalSyncAddr()!="")
- {
- SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
- e = *resolver.resolve(query);
- } else {
- SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
- e = *resolver.resolve(query);
- }
+ SyncTcpConnection::proto::resolver resolver(io_service);
+ SyncTcpConnection::proto::endpoint e;
+ if(gOpt.getLocalSyncAddr()!="") {
+ SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
+ e = *resolver.resolve(query);
+ } else {
+ SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
+ e = *resolver.resolve(query);
+ }
SyncServer server(io_service,e);
- server.onConnect=boost::bind(syncOnConnect,_1);
- queue->setSyncServerPtr(&server);
+ server.onConnect=boost::bind(syncOnConnect,_1);
+ queue->setSyncServerPtr(&server);
io_service.run();
- }
- catch (std::exception& e)
- {
+ } catch(std::exception& e) {
std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
<< " (" << e.what() << ")" << std::endl;
@@ -312,9 +297,8 @@ void syncListener(SyncQueue * queue)
int main(int argc, char* argv[])
{
-// std::cout << "anyrtpproxy" << std::endl;
- if(!gOpt.parse(argc, argv))
- {
+ // std::cout << "anyrtpproxy" << std::endl;
+ if(!gOpt.parse(argc, argv)) {
gOpt.printUsage();
exit(-1);
}
@@ -330,17 +314,19 @@ int main(int argc, char* argv[])
}
}
- if(gOpt.getChroot())
+ if(gOpt.getChroot()) {
chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
- if(gOpt.getDaemonize())
+ }
+ if(gOpt.getDaemonize()) {
daemonize();
+ }
if(pidFile.is_open()) {
pid_t pid = getpid();
pidFile << pid;
pidFile.close();
}
-
+
SignalController sig;
sig.init();
@@ -350,38 +336,38 @@ int main(int argc, char* argv[])
boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
-// #ifndef ANYTUN_NOSYNC
-// boost::thread * syncListenerThread;
-// if(gOpt.getLocalSyncPort() != "")
-// syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
-
-// std::list<boost::thread *> connectThreads;
-// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
-// ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
-// connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
-// }
-// #endif
+ // #ifndef ANYTUN_NOSYNC
+ // boost::thread * syncListenerThread;
+ // if(gOpt.getLocalSyncPort() != "")
+ // syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
+
+ // std::list<boost::thread *> connectThreads;
+ // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
+ // ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
+ // connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
+ // }
+ // #endif
-// pthread_t syncListenerThread;
+ // pthread_t syncListenerThread;
-// ConnectToList connect_to = gOpt.getConnectTo();
-// ThreadParam p( queue,*(new OptionConnectTo()));
-// if ( gOpt.getLocalSyncPort())
-// pthread_create(&syncListenerThread, NULL, syncListener, &p);
+ // ConnectToList connect_to = gOpt.getConnectTo();
+ // ThreadParam p( queue,*(new OptionConnectTo()));
+ // if ( gOpt.getLocalSyncPort())
+ // pthread_create(&syncListenerThread, NULL, syncListener, &p);
-// std::list<pthread_t> connectThreads;
-// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
-// {
-// connectThreads.push_back(pthread_t());
-// ThreadParam * point = new ThreadParam(queue,*it);
-// pthread_create(& connectThreads.back(), NULL, syncConnector, point);
-// }
+ // std::list<pthread_t> connectThreads;
+ // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
+ // {
+ // connectThreads.push_back(pthread_t());
+ // ThreadParam * point = new ThreadParam(queue,*it);
+ // pthread_create(& connectThreads.back(), NULL, syncConnector, point);
+ // }
- PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
+ PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
-
+
int ret = sig.run();
return ret;
}