From 047ecc9d8570581e2d8fc044925e9e28f9a6a6df Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 25 Nov 2008 15:06:31 +0000 Subject: further fixes for anyrtpproxy but still alot left --- src/anyrtpproxy/anyrtpproxy.cpp | 106 +++++++++++++++++++++++----------------- src/anyrtpproxy/options.cpp | 33 ++++++++++--- src/anyrtpproxy/options.h | 17 ++++--- 3 files changed, 96 insertions(+), 60 deletions(-) diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp index 78354a2..e0519b0 100644 --- a/src/anyrtpproxy/anyrtpproxy.cpp +++ b/src/anyrtpproxy/anyrtpproxy.cpp @@ -39,17 +39,13 @@ #include "../log.h" #include "../signalController.h" -#include "../PracticalSocket.h" #include "../buffer.h" #include "connectionList.h" #include "../rtpSessionTable.h" #include "../syncCommand.h" #include "../syncQueue.h" -#include "../syncSocketHandler.h" -#include "../syncListenSocket.h" +#include "../syncClient.h" -#include "../syncSocket.h" -#include "../syncClientSocket.h" #include "../threadUtils.hpp" #include "commandHandler.h" @@ -289,39 +285,43 @@ void daemonize() umask(027); } -void* syncConnector(void* p ) +void syncConnector(void* p) { - ThreadParam* param = reinterpret_cast(p); - - SocketHandler h; - ConnectionList cl; - SyncClientSocket sock(h,cl); - sock.Open( param->connto.host, param->connto.port); - h.Add(&sock); - while (h.GetCount()) - { - h.Select(); - } - pthread_exit(NULL); + ThreadParam* param = reinterpret_cast(p); + + SyncClient sc ( param->connto.host, param->connto.port); + sc.run(); } -void* syncListener(void* p ) +void syncListener(SyncQueue * queue) { - ThreadParam* param = reinterpret_cast(p); - ConnectionList cl; - - SyncSocketHandler h(param->queue); - SyncListenSocket l(h,cl); + try + { + boost::asio::io_service io_service; + SyncTcpConnection::proto::resolver resolver(io_service); + SyncTcpConnection::proto::endpoint e; + if(gOpt.getLocalSyncAddr()!="") + { + SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort()); + e = *resolver.resolve(query); + } else { + SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort()); + e = *resolver.resolve(query); + } - if (l.Bind(gOpt.getLocalSyncPort())) - pthread_exit(NULL); - Utility::ResolveLocal(); // resolve local hostname - h.Add(&l); - h.Select(1,0); - while (1) { - h.Select(1,0); + SyncServer server(io_service,e); + server.onConnect=boost::bind(syncOnConnect,_1); + queue->setSyncServerPtr(&server); + io_service.run(); + } + catch (std::exception& e) + { + std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr(); + cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort() + << " (" << e.what() << ")" << std::endl; } + } int main(int argc, char* argv[]) @@ -360,24 +360,38 @@ int main(int argc, char* argv[]) SyncQueue queue; - pthread_t listenerManagerThread; - pthread_create(&listenerManagerThread, NULL, listenerManager, &queue); - pthread_detach(listenerManagerThread); - pthread_t syncListenerThread; + boost::thread listenerManagerThread(boost::bind(listenerManager,&queue)); - ConnectToList connect_to = gOpt.getConnectTo(); - ThreadParam p( queue,*(new OptionConnectTo())); - if ( gOpt.getLocalSyncPort()) - pthread_create(&syncListenerThread, NULL, syncListener, &p); - std::list connectThreads; - for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) - { - connectThreads.push_back(pthread_t()); - ThreadParam * point = new ThreadParam(queue,*it); - pthread_create(& connectThreads.back(), NULL, syncConnector, point); - } +// #ifndef ANYTUN_NOSYNC +// boost::thread * syncListenerThread; +// if(gOpt.getLocalSyncPort() != "") +// syncListenerThread = new boost::thread(boost::bind(syncListener,&queue)); + +// std::list connectThreads; +// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { +// ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it); +// connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point))); +// } +// #endif + + + +// pthread_t syncListenerThread; + +// ConnectToList connect_to = gOpt.getConnectTo(); +// ThreadParam p( queue,*(new OptionConnectTo())); +// if ( gOpt.getLocalSyncPort()) +// pthread_create(&syncListenerThread, NULL, syncListener, &p); + +// std::list connectThreads; +// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) +// { +// connectThreads.push_back(pthread_t()); +// ThreadParam * point = new ThreadParam(queue,*it); +// pthread_create(& connectThreads.back(), NULL, syncConnector, point); +// } PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort()); CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window); diff --git a/src/anyrtpproxy/options.cpp b/src/anyrtpproxy/options.cpp index 328c03c..e3f3850 100644 --- a/src/anyrtpproxy/options.cpp +++ b/src/anyrtpproxy/options.cpp @@ -50,7 +50,7 @@ Options& Options::instance() return *inst; } -Options::Options() : control_interface_("0.0.0.0", 22222) +Options::Options() : control_interface_("0.0.0.0", "22222") { progname_ = "anyrtpproxy"; @@ -60,7 +60,7 @@ Options::Options() : control_interface_("0.0.0.0", 22222) daemonize_ = true; pid_file_ = ""; local_addr_ = ""; - local_sync_port_ = 0; + local_sync_port_ = ""; rtp_start_port_ = 34000; rtp_end_port_ = 35000; no_nat_once_ = false; @@ -170,7 +170,7 @@ bool Options::parse(int argc, char* argv[]) PARSE_SCALAR_PARAM2("-p","--port-range", rtp_start_port_, rtp_end_port_) PARSE_CSLIST_PARAM("-M","--sync-hosts", host_port_queue) PARSE_SCALAR_PARAM("-S","--sync-port", local_sync_port_) -// PARSE_SCALAR_PARAM("-I","--sync-interface", local_sync_addr_) + PARSE_SCALAR_PARAM("-I","--sync-interface", local_sync_addr_) else return false; } @@ -205,11 +205,11 @@ void Options::printUsage() std::cout << " [-d|--nodaemonize] don't run in background" << std::endl; std::cout << " [-P|--write-pid] write pid to this file" << std::endl; std::cout << " [-i|--interface] local ip address to listen to for RTP packets" << std::endl; - std::cout << " [-s|--control] the address/port to listen on for control commands" << std::endl; + std::cout << " [-s|--control] [:] the address/port to listen on for control commands" << std::endl; std::cout << " [-p|--port-range] port range used to relay rtp connections" << std::endl; std::cout << " [-n|--nat] enable permantent automatic nat detection(use only with anytun)" << std::endl; std::cout << " [-o|--no-nat-once] disable automatic nat detection for new connections" << std::endl; -// std::cout << " [-I|--sync-interface] local unicast(sync) ip address to bind to" << std::endl; + std::cout << " [-I|--sync-interface] local unicast(sync) ip address to bind to" << std::endl; std::cout << " [-S|--sync-port] local unicast(sync) port to bind to" << std::endl; std::cout << " [-M|--sync-hosts] :[,:[...]]"<< std::endl; std::cout << " List of Remote Sync Hosts/Ports"<< std::endl; @@ -226,6 +226,10 @@ void Options::printOptions() std::cout << "pid_file='" << pid_file_ << "'" << std::endl; std::cout << "control-interface='" << control_interface_.toString() << "'" << std::endl; std::cout << "local_addr='" << local_addr_ << "'" << std::endl; + std::cout << "rtp_start_port=" << rtp_start_port_ << std::endl; + std::cout << "rtp_end_port=" << rtp_end_port_ << std::endl; + std::cout << "local_sync_addr='" << local_sync_addr_ << "'" << std::endl; + std::cout << "local_sync_port='" << local_sync_port_ << "'" << std::endl; } std::string Options::getProgname() @@ -295,13 +299,28 @@ Options& Options::setLocalAddr(std::string l) return *this; } -u_int16_t Options::getLocalSyncPort() +std::string Options::getLocalSyncAddr() { + Lock lock(mutex); + return local_sync_addr_; +} + +Options& Options::setLocalSyncAddr(std::string l) +{ + Lock lock(mutex); + local_sync_addr_ = l; + return *this; +} + +std::string Options::getLocalSyncPort() +{ + Lock lock(mutex); return local_sync_port_; } -Options& Options::setLocalSyncPort(u_int16_t l) +Options& Options::setLocalSyncPort(std::string l) { + Lock lock(mutex); local_sync_port_ = l; return *this; } diff --git a/src/anyrtpproxy/options.h b/src/anyrtpproxy/options.h index 51219df..50e16f9 100644 --- a/src/anyrtpproxy/options.h +++ b/src/anyrtpproxy/options.h @@ -39,7 +39,7 @@ typedef struct OptionConnectTo { std::string host; - uint16_t port; + std::string port; }; typedef std::list ConnectToList; @@ -47,12 +47,12 @@ typedef std::list ConnectToList; class Host { public: - Host(std::string addr, u_int16_t port) : addr_(addr), port_(port) {} + Host(std::string addr, std::string port) : addr_(addr), port_(port) {} Host(std::string addr_port) { std::istringstream iss(addr_port); getline(iss, addr_, ':'); - if(!(iss >> port_)) port_ = 0; + if(!(iss >> port_)) port_ = ""; } std::string toString() const { @@ -62,7 +62,7 @@ public: } std::string addr_; - u_int16_t port_; + std::string port_; }; typedef std::list HostList; @@ -87,8 +87,10 @@ public: Host getControlInterface(); std::string getLocalAddr(); Options& setLocalAddr(std::string l); - u_int16_t getLocalSyncPort(); - Options& setLocalSyncPort(u_int16_t l); + std::string getLocalSyncAddr(); + Options& setLocalSyncAddr(std::string l); + std::string getLocalSyncPort(); + Options& setLocalSyncPort(std::string l); u_int16_t getRtpStartPort(); Options& setRtpStartPort(u_int16_t l); u_int16_t getRtpEndPort(); @@ -122,7 +124,8 @@ private: std::string chroot_dir_; std::string pid_file_; bool daemonize_; - u_int16_t local_sync_port_; + std::string local_sync_addr_; + std::string local_sync_port_; std::string local_addr_; u_int16_t rtp_start_port_; u_int16_t rtp_end_port_; -- cgit v1.2.3