diff options
Diffstat (limited to 'anyrtpproxy/anyrtpproxy.cpp')
-rw-r--r-- | anyrtpproxy/anyrtpproxy.cpp | 77 |
1 files changed, 76 insertions, 1 deletions
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index 3b0c488..da03ff4 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -10,6 +10,17 @@ #include "../signalController.h" #include "../PracticalSocket.h" #include "../buffer.h" +#include "../connectionList.h" +#include "../rtpSessionTable.h" +#include "../syncCommand.h" +#include "../syncQueue.h" +#include "../syncSocketHandler.h" +#include "../syncListenSocket.h" + +#include "../syncSocket.h" +#include "../syncClientSocket.h" +#include "../threadUtils.hpp" + #include "options.h" #include <map> @@ -17,6 +28,18 @@ #define MAX_PACKET_SIZE 1500 + +class ThreadParam +{ +public: + ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_) + : queue(queue_),connto(connto_) + {}; + SyncQueue & queue; + OptionConnectTo & connto; +}; + + class ControlHost : public Host { public: @@ -33,7 +56,7 @@ public: private: - Mutex mutex; + ::Mutex mutex; std::map<ControlHost, std::pair<UDPSocket*, pthread_t> > control_hosts_; }; @@ -162,6 +185,40 @@ void daemonize() umask(027); } +void* syncConnector(void* p ) +{ + ThreadParam* param = reinterpret_cast<ThreadParam*>(p); + + SocketHandler h; + ConnectionList cl; + SyncClientSocket sock(h,cl); + sock.Open( param->connto.host, param->connto.port); + h.Add(&sock); + while (h.GetCount()) + { + h.Select(); + } + pthread_exit(NULL); +} + +void* syncListener(void* p ) +{ + ThreadParam* param = reinterpret_cast<ThreadParam*>(p); + ConnectionList cl; + + SyncSocketHandler h(param->queue); + SyncListenSocket<SyncSocket,ConnectionList> l(h,cl); + + if (l.Bind(gOpt.getLocalSyncPort())) + pthread_exit(NULL); + + Utility::ResolveLocal(); // resolve local hostname + h.Add(&l); + h.Select(1,0); + while (1) { + h.Select(1,0); + } +} int main(int argc, char* argv[]) { @@ -186,6 +243,24 @@ int main(int argc, char* argv[]) pthread_t senderThread; pthread_create(&senderThread, NULL, sender, NULL); pthread_detach(senderThread); + pthread_t syncListenerThread; + + SyncQueue queue; +// Example +// gRtpSessionTable.addSession(std::string("callid"),RtpSession()); +// SyncCommand sc (std::string("callid")); +// queue.push(sc); + ThreadParam p( queue,*(new OptionConnectTo())) + if ( gOpt.getLocalSyncPort()) + pthread_create(&syncListenerThread, NULL, syncListener, &p); + + std::list<pthread_t> connectThreads; + for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) + { + connectThreads.push_back(pthread_t()); + ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it); + pthread_create(& connectThreads.back(), NULL, syncConnector, point); + } int ret = sig.run(); return ret; |