From 047ecc9d8570581e2d8fc044925e9e28f9a6a6df Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 25 Nov 2008 15:06:31 +0000 Subject: further fixes for anyrtpproxy but still alot left --- src/anyrtpproxy/anyrtpproxy.cpp | 106 +++++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 46 deletions(-) (limited to 'src/anyrtpproxy/anyrtpproxy.cpp') diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp index 78354a2..e0519b0 100644 --- a/src/anyrtpproxy/anyrtpproxy.cpp +++ b/src/anyrtpproxy/anyrtpproxy.cpp @@ -39,17 +39,13 @@ #include "../log.h" #include "../signalController.h" -#include "../PracticalSocket.h" #include "../buffer.h" #include "connectionList.h" #include "../rtpSessionTable.h" #include "../syncCommand.h" #include "../syncQueue.h" -#include "../syncSocketHandler.h" -#include "../syncListenSocket.h" +#include "../syncClient.h" -#include "../syncSocket.h" -#include "../syncClientSocket.h" #include "../threadUtils.hpp" #include "commandHandler.h" @@ -289,39 +285,43 @@ void daemonize() umask(027); } -void* syncConnector(void* p ) +void syncConnector(void* p) { - ThreadParam* param = reinterpret_cast(p); - - SocketHandler h; - ConnectionList cl; - SyncClientSocket sock(h,cl); - sock.Open( param->connto.host, param->connto.port); - h.Add(&sock); - while (h.GetCount()) - { - h.Select(); - } - pthread_exit(NULL); + ThreadParam* param = reinterpret_cast(p); + + SyncClient sc ( param->connto.host, param->connto.port); + sc.run(); } -void* syncListener(void* p ) +void syncListener(SyncQueue * queue) { - ThreadParam* param = reinterpret_cast(p); - ConnectionList cl; - - SyncSocketHandler h(param->queue); - SyncListenSocket l(h,cl); + try + { + boost::asio::io_service io_service; + SyncTcpConnection::proto::resolver resolver(io_service); + SyncTcpConnection::proto::endpoint e; + if(gOpt.getLocalSyncAddr()!="") + { + SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort()); + e = *resolver.resolve(query); + } else { + SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort()); + e = *resolver.resolve(query); + } - if (l.Bind(gOpt.getLocalSyncPort())) - pthread_exit(NULL); - Utility::ResolveLocal(); // resolve local hostname - h.Add(&l); - h.Select(1,0); - while (1) { - h.Select(1,0); + SyncServer server(io_service,e); + server.onConnect=boost::bind(syncOnConnect,_1); + queue->setSyncServerPtr(&server); + io_service.run(); + } + catch (std::exception& e) + { + std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr(); + cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort() + << " (" << e.what() << ")" << std::endl; } + } int main(int argc, char* argv[]) @@ -360,24 +360,38 @@ int main(int argc, char* argv[]) SyncQueue queue; - pthread_t listenerManagerThread; - pthread_create(&listenerManagerThread, NULL, listenerManager, &queue); - pthread_detach(listenerManagerThread); - pthread_t syncListenerThread; + boost::thread listenerManagerThread(boost::bind(listenerManager,&queue)); - ConnectToList connect_to = gOpt.getConnectTo(); - ThreadParam p( queue,*(new OptionConnectTo())); - if ( gOpt.getLocalSyncPort()) - pthread_create(&syncListenerThread, NULL, syncListener, &p); - std::list connectThreads; - for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) - { - connectThreads.push_back(pthread_t()); - ThreadParam * point = new ThreadParam(queue,*it); - pthread_create(& connectThreads.back(), NULL, syncConnector, point); - } +// #ifndef ANYTUN_NOSYNC +// boost::thread * syncListenerThread; +// if(gOpt.getLocalSyncPort() != "") +// syncListenerThread = new boost::thread(boost::bind(syncListener,&queue)); + +// std::list connectThreads; +// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { +// ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it); +// connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point))); +// } +// #endif + + + +// pthread_t syncListenerThread; + +// ConnectToList connect_to = gOpt.getConnectTo(); +// ThreadParam p( queue,*(new OptionConnectTo())); +// if ( gOpt.getLocalSyncPort()) +// pthread_create(&syncListenerThread, NULL, syncListener, &p); + +// std::list connectThreads; +// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) +// { +// connectThreads.push_back(pthread_t()); +// ThreadParam * point = new ThreadParam(queue,*it); +// pthread_create(& connectThreads.back(), NULL, syncConnector, point); +// } PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort()); CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window); -- cgit v1.2.3