From e28a27a8fd0d70d2aec48597991aa26003be1118 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 13 Jan 2010 01:39:43 +0000 Subject: single thread per socket --- src/anytun.cpp | 2 +- src/packetSource.cpp | 101 +++++++++++++++++++++++++-------------------------- src/packetSource.h | 9 ++--- 3 files changed, 54 insertions(+), 58 deletions(-) diff --git a/src/anytun.cpp b/src/anytun.cpp index 0268ff1..85c85bf 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -272,7 +272,7 @@ void receiver(TunDevice* dev, PacketSource* src) // check whether auth tag is ok or not if(!a->checkTag(conn.kd_, encrypted_packet)) { - cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!" << std::endl; + cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!"; continue; } diff --git a/src/packetSource.cpp b/src/packetSource.cpp index 17233a9..9eaedc6 100644 --- a/src/packetSource.cpp +++ b/src/packetSource.cpp @@ -49,9 +49,6 @@ void PacketSource::waitUntilReady() UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) { - last_recv_sock_.buf_ = NULL; - last_recv_sock_.len_ = 0; - last_recv_sock_.sock_ = NULL; gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType()); } @@ -59,8 +56,10 @@ UDPPacketSource::~UDPPacketSource() { std::list::iterator it = sockets_.begin(); for(;it != sockets_.end(); ++it) { - delete[](it->buf_); - delete(it->sock_); +/// this might be a needed by the receiver thread, TODO cleanup +// delete[](it->buf_); +// delete(it->sem_); +// delete(it->sock_); } } @@ -73,7 +72,11 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it) sockets_element_t sock; sock.buf_ = NULL; sock.len_ = 0; + sock.sem_ = NULL; sock.sock_ = new proto::socket(io_service_); + if(!sock.sock_) + AnytunError::throwErr() << "memory error"; + sock.sock_->open(e.protocol()); #ifndef _MSC_VER if(e.protocol() == proto::v6()) { @@ -87,6 +90,27 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it) it++; } + // prepare multi-socket recv + if(sockets_.size() > 1) { + std::list::iterator it = sockets_.begin(); + for(;it != sockets_.end(); ++it) { + it->len_ = 1600; // TODO packet size + it->buf_ = new u_int8_t[it->len_]; + if(!it->buf_) + AnytunError::throwErr() << "memory error"; + + it->sem_ = new Semaphore(); + if(!it->sem_) { + delete[](it->buf_); + AnytunError::throwErr() << "memory error"; + } + + boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, it)); + it->sem_->up(); + } + + } + ready_sem_.up(); } @@ -95,51 +119,30 @@ void UDPPacketSource::onError(const std::runtime_error& e) gSignalController.inject(SIGERROR, e.what()); } -void UDPPacketSource::recv_thread(thread_result_t result) +void UDPPacketSource::recv_thread(std::list::iterator it) { - cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << result.sock_->local_endpoint(); + cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << it->sock_->local_endpoint(); - result.len_ = static_cast(result.sock_->receive_from(boost::asio::buffer(result.buf_, result.len_), result.remote_)); - { - Lock lock(thread_result_mutex_); - thread_result_queue_.push(result); + thread_result_t result; + result.it_ = it; + for(;;) { + it->sem_->down(); + + cLog.msg(Log::PRIO_DEBUG) << "calling recv() for " << it->sock_->local_endpoint(); + + result.len_ = static_cast(it->sock_->receive_from(boost::asio::buffer(it->buf_, it->len_), result.remote_)); + { + Lock lock(thread_result_mutex_); + thread_result_queue_.push(result); + } + thread_result_sem_.up(); } - thread_result_sem_.up(); } u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote) { if(sockets_.size() == 1) - return static_cast(sockets_.begin()->sock_->receive_from(boost::asio::buffer(buf, len), remote)); - - if(!last_recv_sock_.sock_) { - std::list::iterator it = sockets_.begin(); - for(;it != sockets_.end(); ++it) { - if(it == sockets_.begin()) { - it->buf_ = buf; - it->len_ = len; - } - else { - it->buf_ = new u_int8_t[len]; - if(!it->buf_) - AnytunError::throwErr() << "memory error"; - it->len_ = len; - } - - thread_result_t result; - result.buf_ = it->buf_; - result.len_ = it->len_; - result.sock_ = it->sock_; - boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); - } - } - else { - thread_result_t result; - result.buf_ = last_recv_sock_.buf_; - result.len_ = last_recv_sock_.len_; - result.sock_ = last_recv_sock_.sock_; - boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); - } + return static_cast(sockets_.front().sock_->receive_from(boost::asio::buffer(buf, len), remote)); thread_result_sem_.down(); thread_result_t result; @@ -148,18 +151,12 @@ u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoi result = thread_result_queue_.front(); thread_result_queue_.pop(); } - - last_recv_sock_.sock_ = result.sock_; - last_recv_sock_.buf_ = result.buf_; - last_recv_sock_.len_ = result.len_; remote = result.remote_; + std::memcpy(buf, result.it_->buf_, (len < result.len_) ? len : result.len_); + len = (len < result.len_) ? len : result.len_; + result.it_->sem_->up(); - if(result.sock_ != sockets_.begin()->sock_) { - std::memcpy(buf, result.buf_, (len < result.len_) ? len : result.len_); - return (len < result.len_) ? len : result.len_; - } - - return result.len_; + return len; } void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote) diff --git a/src/packetSource.h b/src/packetSource.h index ceac34b..da90690 100644 --- a/src/packetSource.h +++ b/src/packetSource.h @@ -34,6 +34,7 @@ #define ANYTUN_packetSource_h_INCLUDED #include +#include #include #include @@ -79,21 +80,19 @@ private: u_int8_t* buf_; u_int32_t len_; proto::socket* sock_; + Semaphore* sem_; } sockets_element_t; std::list sockets_; + void recv_thread(std::list::iterator it); typedef struct { - u_int8_t* buf_; u_int32_t len_; - proto::socket* sock_; PacketSourceEndpoint remote_; + std::list::iterator it_; } thread_result_t; std::queue thread_result_queue_; Mutex thread_result_mutex_; Semaphore thread_result_sem_; - sockets_element_t last_recv_sock_; - - void recv_thread(thread_result_t result); }; #endif -- cgit v1.2.3