diff options
-rw-r--r-- | src/packetSource.cpp | 94 | ||||
-rw-r--r-- | src/packetSource.h | 23 |
2 files changed, 104 insertions, 13 deletions
diff --git a/src/packetSource.cpp b/src/packetSource.cpp index 3ab22c6..9d0b706 100644 --- a/src/packetSource.cpp +++ b/src/packetSource.cpp @@ -32,6 +32,7 @@ #include <boost/asio.hpp> #include <boost/bind.hpp> +#include <boost/thread.hpp> #include "datatypes.h" #include "packetSource.h" @@ -39,6 +40,7 @@ #include "resolver.h" #include "options.h" #include "signalController.h" +#include "anytunError.h" void PacketSource::waitUntilReady() { @@ -47,14 +49,19 @@ void PacketSource::waitUntilReady() UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) { + last_recv_sock_.buf_ = NULL; + last_recv_sock_.len_ = 0; + last_recv_sock_.sock_ = NULL; gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType()); } UDPPacketSource::~UDPPacketSource() { - std::list<proto::socket*>::iterator it = sockets_.begin(); - for(;it != sockets_.end(); ++it) - delete *it; + std::list<sockets_element_t>::iterator it = sockets_.begin(); + for(;it != sockets_.end(); ++it) { + delete[](it->buf_); + delete(it->sock_); + } } void UDPPacketSource::onResolve(PacketSourceResolverIt& it) @@ -63,13 +70,16 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it) PacketSourceEndpoint e = *it; cLog.msg(Log::PRIO_NOTICE) << "opening socket: " << e; - proto::socket* sock = new proto::socket(io_service_); - sock->open(e.protocol()); + sockets_element_t sock; + sock.buf_ = NULL; + sock.len_ = 0; + sock.sock_ = new proto::socket(io_service_); + sock.sock_->open(e.protocol()); if(e.protocol() == proto::v6()) { boost::asio::ip::v6_only option(true); - sock->set_option(option); + sock.sock_->set_option(option); } - sock->bind(e); + sock.sock_->bind(e); sockets_.push_back(sock); it++; @@ -83,17 +93,79 @@ void UDPPacketSource::onError(const std::runtime_error& e) gSignalController.inject(SIGERROR, e.what()); } +void UDPPacketSource::recv_thread(thread_result_t result) +{ + cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << result.sock_->local_endpoint(); + + result.len_ = static_cast<u_int32_t>(result.sock_->receive_from(boost::asio::buffer(result.buf_, result.len_), result.remote_)); + { + Lock lock(thread_result_mutex_); + thread_result_queue_.push(result); + } + thread_result_sem_.up(); +} + u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote) { - return static_cast<u_int32_t>(sockets_.front()->receive_from(boost::asio::buffer(buf, len), remote)); + if(sockets_.size() == 1) + return static_cast<u_int32_t>(sockets_.begin()->sock_->receive_from(boost::asio::buffer(buf, len), remote)); + + if(!last_recv_sock_.sock_) { + std::list<sockets_element_t>::iterator it = sockets_.begin(); + for(;it != sockets_.end(); ++it) { + if(it == sockets_.begin()) { + it->buf_ = buf; + it->len_ = len; + } + else { + it->buf_ = new u_int8_t[len]; + if(!it->buf_) + AnytunError::throwErr() << "memory error"; + it->len_ = len; + } + + thread_result_t result; + result.buf_ = it->buf_; + result.len_ = it->len_; + result.sock_ = it->sock_; + boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); + } + } + else { + thread_result_t result; + result.buf_ = last_recv_sock_.buf_; + result.len_ = last_recv_sock_.len_; + result.sock_ = last_recv_sock_.sock_; + boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); + } + + thread_result_sem_.down(); + thread_result_t result; + { + Lock lock(thread_result_mutex_); + result = thread_result_queue_.front(); + thread_result_queue_.pop(); + } + + last_recv_sock_.sock_ = result.sock_; + last_recv_sock_.buf_ = result.buf_; + last_recv_sock_.len_ = result.len_; + remote = result.remote_; + + if(result.sock_ != sockets_.begin()->sock_) { + std::memcpy(buf, result.buf_, (len < result.len_) ? len : result.len_); + return (len < result.len_) ? len : result.len_; + } + + return result.len_; } void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote) { - std::list<proto::socket*>::iterator it = sockets_.begin(); + std::list<sockets_element_t>::iterator it = sockets_.begin(); for(;it != sockets_.end(); ++it) { - if((*it)->local_endpoint().protocol() == remote.protocol()) { - (*it)->send_to(boost::asio::buffer(buf, len), remote); + if(it->sock_->local_endpoint().protocol() == remote.protocol()) { + it->sock_->send_to(boost::asio::buffer(buf, len), remote); return; } } diff --git a/src/packetSource.h b/src/packetSource.h index ffc14bf..ceac34b 100644 --- a/src/packetSource.h +++ b/src/packetSource.h @@ -35,6 +35,7 @@ #include <boost/asio.hpp> #include <list> +#include <queue> #include "datatypes.h" #include "threadUtils.hpp" @@ -72,9 +73,27 @@ public: void onError(const std::runtime_error& e); private: - boost::asio::io_service io_service_; - std::list<proto::socket*> sockets_; + + typedef struct { + u_int8_t* buf_; + u_int32_t len_; + proto::socket* sock_; + } sockets_element_t; + std::list<sockets_element_t> sockets_; + + typedef struct { + u_int8_t* buf_; + u_int32_t len_; + proto::socket* sock_; + PacketSourceEndpoint remote_; + } thread_result_t; + std::queue<thread_result_t> thread_result_queue_; + Mutex thread_result_mutex_; + Semaphore thread_result_sem_; + sockets_element_t last_recv_sock_; + + void recv_thread(thread_result_t result); }; #endif |