diff options
author | Christian Pointner <equinox@anytun.org> | 2010-01-13 01:39:43 +0000 |
---|---|---|
committer | Christian Pointner <equinox@anytun.org> | 2010-01-13 01:39:43 +0000 |
commit | e28a27a8fd0d70d2aec48597991aa26003be1118 (patch) | |
tree | e6ae46558f78e0a36ee1b98459cef2f2f1387b58 /src/packetSource.cpp | |
parent | disables unsupported socket option on windows (diff) |
single thread per socket
Diffstat (limited to 'src/packetSource.cpp')
-rw-r--r-- | src/packetSource.cpp | 101 |
1 files changed, 49 insertions, 52 deletions
diff --git a/src/packetSource.cpp b/src/packetSource.cpp index 17233a9..9eaedc6 100644 --- a/src/packetSource.cpp +++ b/src/packetSource.cpp @@ -49,9 +49,6 @@ void PacketSource::waitUntilReady() UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) { - last_recv_sock_.buf_ = NULL; - last_recv_sock_.len_ = 0; - last_recv_sock_.sock_ = NULL; gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType()); } @@ -59,8 +56,10 @@ UDPPacketSource::~UDPPacketSource() { std::list<sockets_element_t>::iterator it = sockets_.begin(); for(;it != sockets_.end(); ++it) { - delete[](it->buf_); - delete(it->sock_); +/// this might be a needed by the receiver thread, TODO cleanup +// delete[](it->buf_); +// delete(it->sem_); +// delete(it->sock_); } } @@ -73,7 +72,11 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it) sockets_element_t sock; sock.buf_ = NULL; sock.len_ = 0; + sock.sem_ = NULL; sock.sock_ = new proto::socket(io_service_); + if(!sock.sock_) + AnytunError::throwErr() << "memory error"; + sock.sock_->open(e.protocol()); #ifndef _MSC_VER if(e.protocol() == proto::v6()) { @@ -87,6 +90,27 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it) it++; } + // prepare multi-socket recv + if(sockets_.size() > 1) { + std::list<sockets_element_t>::iterator it = sockets_.begin(); + for(;it != sockets_.end(); ++it) { + it->len_ = 1600; // TODO packet size + it->buf_ = new u_int8_t[it->len_]; + if(!it->buf_) + AnytunError::throwErr() << "memory error"; + + it->sem_ = new Semaphore(); + if(!it->sem_) { + delete[](it->buf_); + AnytunError::throwErr() << "memory error"; + } + + boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, it)); + it->sem_->up(); + } + + } + ready_sem_.up(); } @@ -95,51 +119,30 @@ void UDPPacketSource::onError(const std::runtime_error& e) gSignalController.inject(SIGERROR, e.what()); } -void UDPPacketSource::recv_thread(thread_result_t result) +void UDPPacketSource::recv_thread(std::list<sockets_element_t>::iterator it) { - cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << result.sock_->local_endpoint(); + cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << it->sock_->local_endpoint(); - result.len_ = static_cast<u_int32_t>(result.sock_->receive_from(boost::asio::buffer(result.buf_, result.len_), result.remote_)); - { - Lock lock(thread_result_mutex_); - thread_result_queue_.push(result); + thread_result_t result; + result.it_ = it; + for(;;) { + it->sem_->down(); + + cLog.msg(Log::PRIO_DEBUG) << "calling recv() for " << it->sock_->local_endpoint(); + + result.len_ = static_cast<u_int32_t>(it->sock_->receive_from(boost::asio::buffer(it->buf_, it->len_), result.remote_)); + { + Lock lock(thread_result_mutex_); + thread_result_queue_.push(result); + } + thread_result_sem_.up(); } - thread_result_sem_.up(); } u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote) { if(sockets_.size() == 1) - return static_cast<u_int32_t>(sockets_.begin()->sock_->receive_from(boost::asio::buffer(buf, len), remote)); - - if(!last_recv_sock_.sock_) { - std::list<sockets_element_t>::iterator it = sockets_.begin(); - for(;it != sockets_.end(); ++it) { - if(it == sockets_.begin()) { - it->buf_ = buf; - it->len_ = len; - } - else { - it->buf_ = new u_int8_t[len]; - if(!it->buf_) - AnytunError::throwErr() << "memory error"; - it->len_ = len; - } - - thread_result_t result; - result.buf_ = it->buf_; - result.len_ = it->len_; - result.sock_ = it->sock_; - boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); - } - } - else { - thread_result_t result; - result.buf_ = last_recv_sock_.buf_; - result.len_ = last_recv_sock_.len_; - result.sock_ = last_recv_sock_.sock_; - boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result)); - } + return static_cast<u_int32_t>(sockets_.front().sock_->receive_from(boost::asio::buffer(buf, len), remote)); thread_result_sem_.down(); thread_result_t result; @@ -148,18 +151,12 @@ u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoi result = thread_result_queue_.front(); thread_result_queue_.pop(); } - - last_recv_sock_.sock_ = result.sock_; - last_recv_sock_.buf_ = result.buf_; - last_recv_sock_.len_ = result.len_; remote = result.remote_; + std::memcpy(buf, result.it_->buf_, (len < result.len_) ? len : result.len_); + len = (len < result.len_) ? len : result.len_; + result.it_->sem_->up(); - if(result.sock_ != sockets_.begin()->sock_) { - std::memcpy(buf, result.buf_, (len < result.len_) ? len : result.len_); - return (len < result.len_) ? len : result.len_; - } - - return result.len_; + return len; } void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote) |