From b83f4bc9e7c6aab20dfdff69fdbcbb2681646791 Mon Sep 17 00:00:00 2001 From: Othmar Gsenger Date: Sat, 8 Mar 2008 19:01:52 +0000 Subject: anyrtpproxy sync test --- anyrtpproxy/Makefile | 4 +++ anyrtpproxy/anyrtpproxy.cpp | 77 ++++++++++++++++++++++++++++++++++++++++++++- anyrtpproxy/options.h | 12 +++++-- 3 files changed, 90 insertions(+), 3 deletions(-) (limited to 'anyrtpproxy') diff --git a/anyrtpproxy/Makefile b/anyrtpproxy/Makefile index f4486b1..354099b 100644 --- a/anyrtpproxy/Makefile +++ b/anyrtpproxy/Makefile @@ -1,7 +1,11 @@ C = gcc CFLAGS = -g -Wall +CFLAGS += -DSOCKETS_NAMESPACE=sockets +CFLAGS += -DSOCKETS_NAMESPACE_STR='"sockets"' C++ = g++ CCFLAGS = -g -Wall +CCFLAGS += -DSOCKETS_NAMESPACE=sockets +CCFLAGS += -DSOCKETS_NAMESPACE_STR='"sockets"' LD = g++ LDFLAGS = -g -Wall -O2 -lpthread diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index 3b0c488..da03ff4 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -10,6 +10,17 @@ #include "../signalController.h" #include "../PracticalSocket.h" #include "../buffer.h" +#include "../connectionList.h" +#include "../rtpSessionTable.h" +#include "../syncCommand.h" +#include "../syncQueue.h" +#include "../syncSocketHandler.h" +#include "../syncListenSocket.h" + +#include "../syncSocket.h" +#include "../syncClientSocket.h" +#include "../threadUtils.hpp" + #include "options.h" #include @@ -17,6 +28,18 @@ #define MAX_PACKET_SIZE 1500 + +class ThreadParam +{ +public: + ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_) + : queue(queue_),connto(connto_) + {}; + SyncQueue & queue; + OptionConnectTo & connto; +}; + + class ControlHost : public Host { public: @@ -33,7 +56,7 @@ public: private: - Mutex mutex; + ::Mutex mutex; std::map > control_hosts_; }; @@ -162,6 +185,40 @@ void daemonize() umask(027); } +void* syncConnector(void* p ) +{ + ThreadParam* param = reinterpret_cast(p); + + SocketHandler h; + ConnectionList cl; + SyncClientSocket sock(h,cl); + sock.Open( param->connto.host, param->connto.port); + h.Add(&sock); + while (h.GetCount()) + { + h.Select(); + } + pthread_exit(NULL); +} + +void* syncListener(void* p ) +{ + ThreadParam* param = reinterpret_cast(p); + ConnectionList cl; + + SyncSocketHandler h(param->queue); + SyncListenSocket l(h,cl); + + if (l.Bind(gOpt.getLocalSyncPort())) + pthread_exit(NULL); + + Utility::ResolveLocal(); // resolve local hostname + h.Add(&l); + h.Select(1,0); + while (1) { + h.Select(1,0); + } +} int main(int argc, char* argv[]) { @@ -186,6 +243,24 @@ int main(int argc, char* argv[]) pthread_t senderThread; pthread_create(&senderThread, NULL, sender, NULL); pthread_detach(senderThread); + pthread_t syncListenerThread; + + SyncQueue queue; +// Example +// gRtpSessionTable.addSession(std::string("callid"),RtpSession()); +// SyncCommand sc (std::string("callid")); +// queue.push(sc); + ThreadParam p( queue,*(new OptionConnectTo())) + if ( gOpt.getLocalSyncPort()) + pthread_create(&syncListenerThread, NULL, syncListener, &p); + + std::list connectThreads; + for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) + { + connectThreads.push_back(pthread_t()); + ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it); + pthread_create(& connectThreads.back(), NULL, syncConnector, point); + } int ret = sig.run(); return ret; diff --git a/anyrtpproxy/options.h b/anyrtpproxy/options.h index c75da6a..25004a4 100644 --- a/anyrtpproxy/options.h +++ b/anyrtpproxy/options.h @@ -35,6 +35,14 @@ #include #include +typedef struct OptionConnectTo +{ + std::string host; + uint16_t port; +}; + +typedef std::list ConnectToList; + class Host { public: @@ -84,7 +92,7 @@ private: bool sanityCheck(); static Options* inst; - static Mutex instMutex; + static ::Mutex instMutex; class instanceCleaner { public: ~instanceCleaner() { if(Options::inst != 0) @@ -93,7 +101,7 @@ private: }; friend class instanceCleaner; - Mutex mutex; + ::Mutex mutex; std::string progname_; bool chroot_; -- cgit v1.2.3