summaryrefslogtreecommitdiff
path: root/src/anyrtpproxy/anyrtpproxy.cpp
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2008-11-25 15:06:31 +0000
committerChristian Pointner <equinox@anytun.org>2008-11-25 15:06:31 +0000
commit047ecc9d8570581e2d8fc044925e9e28f9a6a6df (patch)
tree8ee40d780ffd4ff0aa4311ae1d9725d89dae177a /src/anyrtpproxy/anyrtpproxy.cpp
parentanyrtpproxy: fixed Makfile (diff)
further fixes for anyrtpproxy but still alot left
Diffstat (limited to 'src/anyrtpproxy/anyrtpproxy.cpp')
-rw-r--r--src/anyrtpproxy/anyrtpproxy.cpp106
1 files changed, 60 insertions, 46 deletions
diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp
index 78354a2..e0519b0 100644
--- a/src/anyrtpproxy/anyrtpproxy.cpp
+++ b/src/anyrtpproxy/anyrtpproxy.cpp
@@ -39,17 +39,13 @@
#include "../log.h"
#include "../signalController.h"
-#include "../PracticalSocket.h"
#include "../buffer.h"
#include "connectionList.h"
#include "../rtpSessionTable.h"
#include "../syncCommand.h"
#include "../syncQueue.h"
-#include "../syncSocketHandler.h"
-#include "../syncListenSocket.h"
+#include "../syncClient.h"
-#include "../syncSocket.h"
-#include "../syncClientSocket.h"
#include "../threadUtils.hpp"
#include "commandHandler.h"
@@ -289,39 +285,43 @@ void daemonize()
umask(027);
}
-void* syncConnector(void* p )
+void syncConnector(void* p)
{
- ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
-
- SocketHandler h;
- ConnectionList cl;
- SyncClientSocket sock(h,cl);
- sock.Open( param->connto.host, param->connto.port);
- h.Add(&sock);
- while (h.GetCount())
- {
- h.Select();
- }
- pthread_exit(NULL);
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+
+ SyncClient sc ( param->connto.host, param->connto.port);
+ sc.run();
}
-void* syncListener(void* p )
+void syncListener(SyncQueue * queue)
{
- ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
- ConnectionList cl;
-
- SyncSocketHandler h(param->queue);
- SyncListenSocket<SyncSocket,ConnectionList> l(h,cl);
+ try
+ {
+ boost::asio::io_service io_service;
+ SyncTcpConnection::proto::resolver resolver(io_service);
+ SyncTcpConnection::proto::endpoint e;
+ if(gOpt.getLocalSyncAddr()!="")
+ {
+ SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
+ e = *resolver.resolve(query);
+ } else {
+ SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
+ e = *resolver.resolve(query);
+ }
- if (l.Bind(gOpt.getLocalSyncPort()))
- pthread_exit(NULL);
- Utility::ResolveLocal(); // resolve local hostname
- h.Add(&l);
- h.Select(1,0);
- while (1) {
- h.Select(1,0);
+ SyncServer server(io_service,e);
+ server.onConnect=boost::bind(syncOnConnect,_1);
+ queue->setSyncServerPtr(&server);
+ io_service.run();
+ }
+ catch (std::exception& e)
+ {
+ std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
+ cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
+ << " (" << e.what() << ")" << std::endl;
}
+
}
int main(int argc, char* argv[])
@@ -360,24 +360,38 @@ int main(int argc, char* argv[])
SyncQueue queue;
- pthread_t listenerManagerThread;
- pthread_create(&listenerManagerThread, NULL, listenerManager, &queue);
- pthread_detach(listenerManagerThread);
- pthread_t syncListenerThread;
+ boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
- ConnectToList connect_to = gOpt.getConnectTo();
- ThreadParam p( queue,*(new OptionConnectTo()));
- if ( gOpt.getLocalSyncPort())
- pthread_create(&syncListenerThread, NULL, syncListener, &p);
- std::list<pthread_t> connectThreads;
- for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
- {
- connectThreads.push_back(pthread_t());
- ThreadParam * point = new ThreadParam(queue,*it);
- pthread_create(& connectThreads.back(), NULL, syncConnector, point);
- }
+// #ifndef ANYTUN_NOSYNC
+// boost::thread * syncListenerThread;
+// if(gOpt.getLocalSyncPort() != "")
+// syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
+
+// std::list<boost::thread *> connectThreads;
+// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
+// ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
+// connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
+// }
+// #endif
+
+
+
+// pthread_t syncListenerThread;
+
+// ConnectToList connect_to = gOpt.getConnectTo();
+// ThreadParam p( queue,*(new OptionConnectTo()));
+// if ( gOpt.getLocalSyncPort())
+// pthread_create(&syncListenerThread, NULL, syncListener, &p);
+
+// std::list<pthread_t> connectThreads;
+// for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
+// {
+// connectThreads.push_back(pthread_t());
+// ThreadParam * point = new ThreadParam(queue,*it);
+// pthread_create(& connectThreads.back(), NULL, syncConnector, point);
+// }
PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);