summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2010-01-13 00:35:15 +0000
committerChristian Pointner <equinox@anytun.org>2010-01-13 00:35:15 +0000
commit6db69370bd4a1ba3a54bcef9e2b7c03da2bd435b (patch)
tree0a48338d0e5a89d97ee21a24009ceeb79ada0b1c
parentbetter speration of waitForScript and waitAndDestroy() (diff)
added mutli socket recv to packet source
-rw-r--r--src/packetSource.cpp94
-rw-r--r--src/packetSource.h23
2 files changed, 104 insertions, 13 deletions
diff --git a/src/packetSource.cpp b/src/packetSource.cpp
index 3ab22c6..9d0b706 100644
--- a/src/packetSource.cpp
+++ b/src/packetSource.cpp
@@ -32,6 +32,7 @@
#include <boost/asio.hpp>
#include <boost/bind.hpp>
+#include <boost/thread.hpp>
#include "datatypes.h"
#include "packetSource.h"
@@ -39,6 +40,7 @@
#include "resolver.h"
#include "options.h"
#include "signalController.h"
+#include "anytunError.h"
void PacketSource::waitUntilReady()
{
@@ -47,14 +49,19 @@ void PacketSource::waitUntilReady()
UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port)
{
+ last_recv_sock_.buf_ = NULL;
+ last_recv_sock_.len_ = 0;
+ last_recv_sock_.sock_ = NULL;
gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType());
}
UDPPacketSource::~UDPPacketSource()
{
- std::list<proto::socket*>::iterator it = sockets_.begin();
- for(;it != sockets_.end(); ++it)
- delete *it;
+ std::list<sockets_element_t>::iterator it = sockets_.begin();
+ for(;it != sockets_.end(); ++it) {
+ delete[](it->buf_);
+ delete(it->sock_);
+ }
}
void UDPPacketSource::onResolve(PacketSourceResolverIt& it)
@@ -63,13 +70,16 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it)
PacketSourceEndpoint e = *it;
cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e;
- proto::socket* sock = new proto::socket(io_service_);
- sock->open(e.protocol());
+ sockets_element_t sock;
+ sock.buf_ = NULL;
+ sock.len_ = 0;
+ sock.sock_ = new proto::socket(io_service_);
+ sock.sock_->open(e.protocol());
if(e.protocol() == proto::v6()) {
boost::asio::ip::v6_only option(true);
- sock->set_option(option);
+ sock.sock_->set_option(option);
}
- sock->bind(e);
+ sock.sock_->bind(e);
sockets_.push_back(sock);
it++;
@@ -83,17 +93,79 @@ void UDPPacketSource::onError(const std::runtime_error& e)
gSignalController.inject(SIGERROR, e.what());
}
+void UDPPacketSource::recv_thread(thread_result_t result)
+{
+ cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << result.sock_->local_endpoint();
+
+ result.len_ = static_cast<u_int32_t>(result.sock_->receive_from(boost::asio::buffer(result.buf_, result.len_), result.remote_));
+ {
+ Lock lock(thread_result_mutex_);
+ thread_result_queue_.push(result);
+ }
+ thread_result_sem_.up();
+}
+
u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote)
{
- return static_cast<u_int32_t>(sockets_.front()->receive_from(boost::asio::buffer(buf, len), remote));
+ if(sockets_.size() == 1)
+ return static_cast<u_int32_t>(sockets_.begin()->sock_->receive_from(boost::asio::buffer(buf, len), remote));
+
+ if(!last_recv_sock_.sock_) {
+ std::list<sockets_element_t>::iterator it = sockets_.begin();
+ for(;it != sockets_.end(); ++it) {
+ if(it == sockets_.begin()) {
+ it->buf_ = buf;
+ it->len_ = len;
+ }
+ else {
+ it->buf_ = new u_int8_t[len];
+ if(!it->buf_)
+ AnytunError::throwErr() << "memory error";
+ it->len_ = len;
+ }
+
+ thread_result_t result;
+ result.buf_ = it->buf_;
+ result.len_ = it->len_;
+ result.sock_ = it->sock_;
+ boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result));
+ }
+ }
+ else {
+ thread_result_t result;
+ result.buf_ = last_recv_sock_.buf_;
+ result.len_ = last_recv_sock_.len_;
+ result.sock_ = last_recv_sock_.sock_;
+ boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result));
+ }
+
+ thread_result_sem_.down();
+ thread_result_t result;
+ {
+ Lock lock(thread_result_mutex_);
+ result = thread_result_queue_.front();
+ thread_result_queue_.pop();
+ }
+
+ last_recv_sock_.sock_ = result.sock_;
+ last_recv_sock_.buf_ = result.buf_;
+ last_recv_sock_.len_ = result.len_;
+ remote = result.remote_;
+
+ if(result.sock_ != sockets_.begin()->sock_) {
+ std::memcpy(buf, result.buf_, (len < result.len_) ? len : result.len_);
+ return (len < result.len_) ? len : result.len_;
+ }
+
+ return result.len_;
}
void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote)
{
- std::list<proto::socket*>::iterator it = sockets_.begin();
+ std::list<sockets_element_t>::iterator it = sockets_.begin();
for(;it != sockets_.end(); ++it) {
- if((*it)->local_endpoint().protocol() == remote.protocol()) {
- (*it)->send_to(boost::asio::buffer(buf, len), remote);
+ if(it->sock_->local_endpoint().protocol() == remote.protocol()) {
+ it->sock_->send_to(boost::asio::buffer(buf, len), remote);
return;
}
}
diff --git a/src/packetSource.h b/src/packetSource.h
index ffc14bf..ceac34b 100644
--- a/src/packetSource.h
+++ b/src/packetSource.h
@@ -35,6 +35,7 @@
#include <boost/asio.hpp>
#include <list>
+#include <queue>
#include "datatypes.h"
#include "threadUtils.hpp"
@@ -72,9 +73,27 @@ public:
void onError(const std::runtime_error& e);
private:
-
boost::asio::io_service io_service_;
- std::list<proto::socket*> sockets_;
+
+ typedef struct {
+ u_int8_t* buf_;
+ u_int32_t len_;
+ proto::socket* sock_;
+ } sockets_element_t;
+ std::list<sockets_element_t> sockets_;
+
+ typedef struct {
+ u_int8_t* buf_;
+ u_int32_t len_;
+ proto::socket* sock_;
+ PacketSourceEndpoint remote_;
+ } thread_result_t;
+ std::queue<thread_result_t> thread_result_queue_;
+ Mutex thread_result_mutex_;
+ Semaphore thread_result_sem_;
+ sockets_element_t last_recv_sock_;
+
+ void recv_thread(thread_result_t result);
};
#endif