From 6db69370bd4a1ba3a54bcef9e2b7c03da2bd435b Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 13 Jan 2010 00:35:15 +0000 Subject: added mutli socket recv to packet source --- src/packetSource.cpp | 94 ++++++++++++++++++++++++++++++++++++++++++++++------ src/packetSource.h | 23 +++++++++++-- 2 files changed, 104 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/packetSource.cpp b/src/packetSource.cpp index 3ab22c6..9d0b706 100644 --- a/src/packetSource.cpp +++ b/src/packetSource.cpp @@ -32,6 +32,7 @@ #include #include +#include #include "datatypes.h" #include "packetSource.h" @@ -39,6 +40,7 @@ #include "resolver.h" #include "options.h" #include "signalController.h" +#include "anytunError.h" void PacketSource::waitUntilReady() { @@ -47,14 +49,19 @@ void PacketSource::waitUntilReady() UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) { + last_recv_sock_.buf_ = NULL; + last_recv_sock_.len_ = 0; + last_recv_sock_.sock_ = NULL; gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType()); } UDPPacketSource::~UDPPacketSource() { - std::list::iterator it = sockets_.begin(); - for(;it != sockets_.end(); ++it) - delete *it; + std::list::iterator it = sockets_.begin(); + for(;it != sockets_.end(); ++it) { + delete[](it->buf_); + delete(it->sock_); + } } void UDPPacketSource::onResolve(PacketSourceResolverIt& it) @@ -63,13 +70,16 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it) PacketSourceEndpoint e = *it; cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e; - proto::socket* sock = new proto::socket(io_service_); - sock->open(e.protocol()); + sockets_element_t sock; + sock.buf_ = NULL; + sock.len_ = 0; + sock.sock_ = new proto::socket(io_service_); + sock.sock_->open(e.protocol()); if(e.protocol() == proto::v6()) { boost::asio::ip::v6_only option(true); - sock->set_option(option); + sock.sock_->set_option(option); } - sock->bind(e); + sock.sock_->bind(e); sockets_.push_back(sock); it++; @@ -83,17 +93,79 @@ void UDPPacketSource::onError(const std::runtime_error& e) gSignalController.inject(SIGERROR, e.what()); } +void UDPPacketSource::recv_thread(thread_result_t result) +{ + cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << result.sock_->local_endpoint(); + + result.len_ = static_cast(result.sock_->receive_from(boost::asio::buffer(result.buf_, result.len_), result.remote_)); + { + Lock lock(thread_result_mutex_); + thread_result_queue_.push(result); + } + thread_result_sem_.up(); +} + u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote) { - return static_cast(sockets_.front()->receive_from(boost::asio::buffer(buf, len), remote)); + if(sockets_.size() == 1) + return static_cast(sockets_.begin()->sock_->receive_from(boost::asio::buffer(buf, len), remote)); + + if(!last_recv_sock_.sock_) { + std::list::iterator it = sockets_.begin(); + for(;it != sockets_.end(); ++it) { + if(it == sockets_.begin()) { + it->buf_ = buf; + it->len_ = len; + } + else { + it->buf_ = new u_int8_t[len]; + if(!it->buf_) + AnytunError::throwErr() << "memory error"; + it->len_ = len; + } + + thread_result_t result; + result.buf_ = it->buf_; + result.len_ = it->len_; + result.sock_ = it->sock_; + boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); + } + } + else { + thread_result_t result; + result.buf_ = last_recv_sock_.buf_; + result.len_ = last_recv_sock_.len_; + result.sock_ = last_recv_sock_.sock_; + boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); + } + + thread_result_sem_.down(); + thread_result_t result; + { + Lock lock(thread_result_mutex_); + result = thread_result_queue_.front(); + thread_result_queue_.pop(); + } + + last_recv_sock_.sock_ = result.sock_; + last_recv_sock_.buf_ = result.buf_; + last_recv_sock_.len_ = result.len_; + remote = result.remote_; + + if(result.sock_ != sockets_.begin()->sock_) { + std::memcpy(buf, result.buf_, (len < result.len_) ? len : result.len_); + return (len < result.len_) ? len : result.len_; + } + + return result.len_; } void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote) { - std::list::iterator it = sockets_.begin(); + std::list::iterator it = sockets_.begin(); for(;it != sockets_.end(); ++it) { - if((*it)->local_endpoint().protocol() == remote.protocol()) { - (*it)->send_to(boost::asio::buffer(buf, len), remote); + if(it->sock_->local_endpoint().protocol() == remote.protocol()) { + it->sock_->send_to(boost::asio::buffer(buf, len), remote); return; } } diff --git a/src/packetSource.h b/src/packetSource.h index ffc14bf..ceac34b 100644 --- a/src/packetSource.h +++ b/src/packetSource.h @@ -35,6 +35,7 @@ #include #include +#include #include "datatypes.h" #include "threadUtils.hpp" @@ -72,9 +73,27 @@ public: void onError(const std::runtime_error& e); private: - boost::asio::io_service io_service_; - std::list sockets_; + + typedef struct { + u_int8_t* buf_; + u_int32_t len_; + proto::socket* sock_; + } sockets_element_t; + std::list sockets_; + + typedef struct { + u_int8_t* buf_; + u_int32_t len_; + proto::socket* sock_; + PacketSourceEndpoint remote_; + } thread_result_t; + std::queue thread_result_queue_; + Mutex thread_result_mutex_; + Semaphore thread_result_sem_; + sockets_element_t last_recv_sock_; + + void recv_thread(thread_result_t result); }; #endif -- cgit v1.2.3