summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOthmar Gsenger <otti@anytun.org>2008-03-08 19:01:52 +0000
committerOthmar Gsenger <otti@anytun.org>2008-03-08 19:01:52 +0000
commitb83f4bc9e7c6aab20dfdff69fdbcbb2681646791 (patch)
tree81c7ca3136919cdc4bc39116dfd8a8d24ff3c4ae
parentadded SyncRtpCommand (diff)
anyrtpproxy sync test
-rw-r--r--anyrtpproxy/Makefile4
-rw-r--r--anyrtpproxy/anyrtpproxy.cpp77
-rw-r--r--anyrtpproxy/options.h12
3 files changed, 90 insertions, 3 deletions
diff --git a/anyrtpproxy/Makefile b/anyrtpproxy/Makefile
index f4486b1..354099b 100644
--- a/anyrtpproxy/Makefile
+++ b/anyrtpproxy/Makefile
@@ -1,7 +1,11 @@
C = gcc
CFLAGS = -g -Wall
+CFLAGS += -DSOCKETS_NAMESPACE=sockets
+CFLAGS += -DSOCKETS_NAMESPACE_STR='"sockets"'
C++ = g++
CCFLAGS = -g -Wall
+CCFLAGS += -DSOCKETS_NAMESPACE=sockets
+CCFLAGS += -DSOCKETS_NAMESPACE_STR='"sockets"'
LD = g++
LDFLAGS = -g -Wall -O2 -lpthread
diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp
index 3b0c488..da03ff4 100644
--- a/anyrtpproxy/anyrtpproxy.cpp
+++ b/anyrtpproxy/anyrtpproxy.cpp
@@ -10,6 +10,17 @@
#include "../signalController.h"
#include "../PracticalSocket.h"
#include "../buffer.h"
+#include "../connectionList.h"
+#include "../rtpSessionTable.h"
+#include "../syncCommand.h"
+#include "../syncQueue.h"
+#include "../syncSocketHandler.h"
+#include "../syncListenSocket.h"
+
+#include "../syncSocket.h"
+#include "../syncClientSocket.h"
+#include "../threadUtils.hpp"
+
#include "options.h"
#include <map>
@@ -17,6 +28,18 @@
#define MAX_PACKET_SIZE 1500
+
+class ThreadParam
+{
+public:
+ ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
+ : queue(queue_),connto(connto_)
+ {};
+ SyncQueue & queue;
+ OptionConnectTo & connto;
+};
+
+
class ControlHost : public Host
{
public:
@@ -33,7 +56,7 @@ public:
private:
- Mutex mutex;
+ ::Mutex mutex;
std::map<ControlHost, std::pair<UDPSocket*, pthread_t> > control_hosts_;
};
@@ -162,6 +185,40 @@ void daemonize()
umask(027);
}
+void* syncConnector(void* p )
+{
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+
+ SocketHandler h;
+ ConnectionList cl;
+ SyncClientSocket sock(h,cl);
+ sock.Open( param->connto.host, param->connto.port);
+ h.Add(&sock);
+ while (h.GetCount())
+ {
+ h.Select();
+ }
+ pthread_exit(NULL);
+}
+
+void* syncListener(void* p )
+{
+ ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
+ ConnectionList cl;
+
+ SyncSocketHandler h(param->queue);
+ SyncListenSocket<SyncSocket,ConnectionList> l(h,cl);
+
+ if (l.Bind(gOpt.getLocalSyncPort()))
+ pthread_exit(NULL);
+
+ Utility::ResolveLocal(); // resolve local hostname
+ h.Add(&l);
+ h.Select(1,0);
+ while (1) {
+ h.Select(1,0);
+ }
+}
int main(int argc, char* argv[])
{
@@ -186,6 +243,24 @@ int main(int argc, char* argv[])
pthread_t senderThread;
pthread_create(&senderThread, NULL, sender, NULL);
pthread_detach(senderThread);
+ pthread_t syncListenerThread;
+
+ SyncQueue queue;
+// Example
+// gRtpSessionTable.addSession(std::string("callid"),RtpSession());
+// SyncCommand sc (std::string("callid"));
+// queue.push(sc);
+ ThreadParam p( queue,*(new OptionConnectTo()))
+ if ( gOpt.getLocalSyncPort())
+ pthread_create(&syncListenerThread, NULL, syncListener, &p);
+
+ std::list<pthread_t> connectThreads;
+ for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
+ {
+ connectThreads.push_back(pthread_t());
+ ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
+ pthread_create(& connectThreads.back(), NULL, syncConnector, point);
+ }
int ret = sig.run();
return ret;
diff --git a/anyrtpproxy/options.h b/anyrtpproxy/options.h
index c75da6a..25004a4 100644
--- a/anyrtpproxy/options.h
+++ b/anyrtpproxy/options.h
@@ -35,6 +35,14 @@
#include <list>
#include <sstream>
+typedef struct OptionConnectTo
+{
+ std::string host;
+ uint16_t port;
+};
+
+typedef std::list<OptionConnectTo> ConnectToList;
+
class Host
{
public:
@@ -84,7 +92,7 @@ private:
bool sanityCheck();
static Options* inst;
- static Mutex instMutex;
+ static ::Mutex instMutex;
class instanceCleaner {
public: ~instanceCleaner() {
if(Options::inst != 0)
@@ -93,7 +101,7 @@ private:
};
friend class instanceCleaner;
- Mutex mutex;
+ ::Mutex mutex;
std::string progname_;
bool chroot_;