summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-11-25 15:06:31 +0000
committerChristian Pointner <equinox@anytun.org>2008-11-25 15:06:31 +0000
commit047ecc9d8570581e2d8fc044925e9e28f9a6a6df (patch)
tree8ee40d780ffd4ff0aa4311ae1d9725d89dae177a
parentanyrtpproxy: fixed Makfile (diff)
further fixes for anyrtpproxy but still alot left
-rw-r--r--src/anyrtpproxy/anyrtpproxy.cpp106
-rw-r--r--src/anyrtpproxy/options.cpp33
-rw-r--r--src/anyrtpproxy/options.h17
3 files changed, 96 insertions, 60 deletions
diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp
index 78354a2..e0519b0 100644
--- a/src/anyrtpproxy/anyrtpproxy.cpp
+++ b/src/anyrtpproxy/anyrtpproxy.cpp
@@ -39,17 +39,13 @@
#include "../log.h"
#include "../signalController.h"
-#include "../PracticalSocket.h"
#include "../buffer.h"
#include "connectionList.h"
#include "../rtpSessionTable.h"
#include "../syncCommand.h"
#include "../syncQueue.h"
-#include "../syncSocketHandler.h"
-#include "../syncListenSocket.h"
+#include "../syncClient.h"
-#include "../syncSocket.h"
-#include "../syncClientSocket.h"
#include "../threadUtils.hpp"
#include "commandHandler.h"
@@ -289,39 +285,43 @@ void daemonize()
umask(027);
}
-void* syncConnector(void* p )
+void syncConnector(void* p)
{
- ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
-
- SocketHandler h;
- ConnectionList cl;
- SyncClientSocket sock(h,cl);
- sock.Open( param->connto.host, param->connto.port);
- h.Add(&sock);
- while (h.GetCount())
- {
- h.Select();
- }
- pthread_exit(NULL);
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+
+ SyncClient sc ( param->connto.host, param->connto.port);
+ sc.run();
}
-void* syncListener(void* p )
+void syncListener(SyncQueue * queue)
{
- ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
- ConnectionList cl;
-
- SyncSocketHandler h(param->queue);
- SyncListenSocket<SyncSocket,ConnectionList> l(h,cl);
+ try
+ {
+ boost::asio::io_service io_service;
+ SyncTcpConnection::proto::resolver resolver(io_service);
+ SyncTcpConnection::proto::endpoint e;
+ if(gOpt.getLocalSyncAddr()!="")
+ {
+ SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
+ e = *resolver.resolve(query);
+ } else {
+ SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
+ e = *resolver.resolve(query);
+ }
- if (l.Bind(gOpt.getLocalSyncPort()))
- pthread_exit(NULL);
- Utility::ResolveLocal(); // resolve local hostname
- h.Add(&l);
- h.Select(1,0);
- while (1) {
- h.Select(1,0);
+ SyncServer server(io_service,e);
+ server.onConnect=boost::bind(syncOnConnect,_1);
+ queue->setSyncServerPtr(&server);
+ io_service.run();
+ }
+ catch (std::exception& e)
+ {
+ std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
+ cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
+ << " (" << e.what() << ")" << std::endl;
}
+
}
int main(int argc, char* argv[])
@@ -360,24 +360,38 @@ int main(int argc, char* argv[])
SyncQueue queue;
- pthread_t listenerManagerThread;
- pthread_create(&listenerManagerThread, NULL, listenerManager, &queue);
- pthread_detach(listenerManagerThread);
- pthread_t syncListenerThread;
+ boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
- ConnectToList connect_to = gOpt.getConnectTo();
- ThreadParam p( queue,*(new OptionConnectTo()));
- if ( gOpt.getLocalSyncPort())
- pthread_create(&syncListenerThread, NULL, syncListener, &p);
- std::list<pthread_t> connectThreads;
- for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
- {
- connectThreads.push_back(pthread_t());
- ThreadParam * point = new ThreadParam(queue,*it);
- pthread_create(& connectThreads.back(), NULL, syncConnector, point);
- }
+// #ifndef ANYTUN_NOSYNC
+// boost::thread * syncListenerThread;
+// if(gOpt.getLocalSyncPort() != "")
+// syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
+
+// std::list<boost::thread *> connectThreads;
+// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
+// ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
+// connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
+// }
+// #endif
+
+
+
+// pthread_t syncListenerThread;
+
+// ConnectToList connect_to = gOpt.getConnectTo();
+// ThreadParam p( queue,*(new OptionConnectTo()));
+// if ( gOpt.getLocalSyncPort())
+// pthread_create(&syncListenerThread, NULL, syncListener, &p);
+
+// std::list<pthread_t> connectThreads;
+// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
+// {
+// connectThreads.push_back(pthread_t());
+// ThreadParam * point = new ThreadParam(queue,*it);
+// pthread_create(& connectThreads.back(), NULL, syncConnector, point);
+// }
PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
diff --git a/src/anyrtpproxy/options.cpp b/src/anyrtpproxy/options.cpp
index 328c03c..e3f3850 100644
--- a/src/anyrtpproxy/options.cpp
+++ b/src/anyrtpproxy/options.cpp
@@ -50,7 +50,7 @@ Options& Options::instance()
return *inst;
}
-Options::Options() : control_interface_("0.0.0.0", 22222)
+Options::Options() : control_interface_("0.0.0.0", "22222")
{
progname_ = "anyrtpproxy";
@@ -60,7 +60,7 @@ Options::Options() : control_interface_("0.0.0.0", 22222)
daemonize_ = true;
pid_file_ = "";
local_addr_ = "";
- local_sync_port_ = 0;
+ local_sync_port_ = "";
rtp_start_port_ = 34000;
rtp_end_port_ = 35000;
no_nat_once_ = false;
@@ -170,7 +170,7 @@ bool Options::parse(int argc, char* argv[])
PARSE_SCALAR_PARAM2("-p","--port-range", rtp_start_port_, rtp_end_port_)
PARSE_CSLIST_PARAM("-M","--sync-hosts", host_port_queue)
PARSE_SCALAR_PARAM("-S","--sync-port", local_sync_port_)
-// PARSE_SCALAR_PARAM("-I","--sync-interface", local_sync_addr_)
+ PARSE_SCALAR_PARAM("-I","--sync-interface", local_sync_addr_)
else
return false;
}
@@ -205,11 +205,11 @@ void Options::printUsage()
std::cout << " [-d|--nodaemonize] don't run in background" << std::endl;
std::cout << " [-P|--write-pid] <path> write pid to this file" << std::endl;
std::cout << " [-i|--interface] <ip-address> local ip address to listen to for RTP packets" << std::endl;
- std::cout << " [-s|--control] <addr[:port]> the address/port to listen on for control commands" << std::endl;
+ std::cout << " [-s|--control] <addr>[:<port>] the address/port to listen on for control commands" << std::endl;
std::cout << " [-p|--port-range] <start> <end> port range used to relay rtp connections" << std::endl;
std::cout << " [-n|--nat] enable permantent automatic nat detection(use only with anytun)" << std::endl;
std::cout << " [-o|--no-nat-once] disable automatic nat detection for new connections" << std::endl;
-// std::cout << " [-I|--sync-interface] <ip-address> local unicast(sync) ip address to bind to" << std::endl;
+ std::cout << " [-I|--sync-interface] <ip-address> local unicast(sync) ip address to bind to" << std::endl;
std::cout << " [-S|--sync-port] <port> local unicast(sync) port to bind to" << std::endl;
std::cout << " [-M|--sync-hosts] <hostname|ip>:<port>[,<hostname|ip>:<port>[...]]"<< std::endl;
std::cout << " List of Remote Sync Hosts/Ports"<< std::endl;
@@ -226,6 +226,10 @@ void Options::printOptions()
std::cout << "pid_file='" << pid_file_ << "'" << std::endl;
std::cout << "control-interface='" << control_interface_.toString() << "'" << std::endl;
std::cout << "local_addr='" << local_addr_ << "'" << std::endl;
+ std::cout << "rtp_start_port=" << rtp_start_port_ << std::endl;
+ std::cout << "rtp_end_port=" << rtp_end_port_ << std::endl;
+ std::cout << "local_sync_addr='" << local_sync_addr_ << "'" << std::endl;
+ std::cout << "local_sync_port='" << local_sync_port_ << "'" << std::endl;
}
std::string Options::getProgname()
@@ -295,13 +299,28 @@ Options& Options::setLocalAddr(std::string l)
return *this;
}
-u_int16_t Options::getLocalSyncPort()
+std::string Options::getLocalSyncAddr()
{
+ Lock lock(mutex);
+ return local_sync_addr_;
+}
+
+Options& Options::setLocalSyncAddr(std::string l)
+{
+ Lock lock(mutex);
+ local_sync_addr_ = l;
+ return *this;
+}
+
+std::string Options::getLocalSyncPort()
+{
+ Lock lock(mutex);
return local_sync_port_;
}
-Options& Options::setLocalSyncPort(u_int16_t l)
+Options& Options::setLocalSyncPort(std::string l)
{
+ Lock lock(mutex);
local_sync_port_ = l;
return *this;
}
diff --git a/src/anyrtpproxy/options.h b/src/anyrtpproxy/options.h
index 51219df..50e16f9 100644
--- a/src/anyrtpproxy/options.h
+++ b/src/anyrtpproxy/options.h
@@ -39,7 +39,7 @@
typedef struct OptionConnectTo
{
std::string host;
- uint16_t port;
+ std::string port;
};
typedef std::list<OptionConnectTo> ConnectToList;
@@ -47,12 +47,12 @@ typedef std::list<OptionConnectTo> ConnectToList;
class Host
{
public:
- Host(std::string addr, u_int16_t port) : addr_(addr), port_(port) {}
+ Host(std::string addr, std::string port) : addr_(addr), port_(port) {}
Host(std::string addr_port)
{
std::istringstream iss(addr_port);
getline(iss, addr_, ':');
- if(!(iss >> port_)) port_ = 0;
+ if(!(iss >> port_)) port_ = "";
}
std::string toString() const
{
@@ -62,7 +62,7 @@ public:
}
std::string addr_;
- u_int16_t port_;
+ std::string port_;
};
typedef std::list<Host> HostList;
@@ -87,8 +87,10 @@ public:
Host getControlInterface();
std::string getLocalAddr();
Options& setLocalAddr(std::string l);
- u_int16_t getLocalSyncPort();
- Options& setLocalSyncPort(u_int16_t l);
+ std::string getLocalSyncAddr();
+ Options& setLocalSyncAddr(std::string l);
+ std::string getLocalSyncPort();
+ Options& setLocalSyncPort(std::string l);
u_int16_t getRtpStartPort();
Options& setRtpStartPort(u_int16_t l);
u_int16_t getRtpEndPort();
@@ -122,7 +124,8 @@ private:
std::string chroot_dir_;
std::string pid_file_;
bool daemonize_;
- u_int16_t local_sync_port_;
+ std::string local_sync_addr_;
+ std::string local_sync_port_;
std::string local_addr_;
u_int16_t rtp_start_port_;
u_int16_t rtp_end_port_;