summaryrefslogtreecommitdiff
path: root/src/packetSource.cpp
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2010-01-13 01:39:43 +0000
committerChristian Pointner <equinox@anytun.org>2010-01-13 01:39:43 +0000
commite28a27a8fd0d70d2aec48597991aa26003be1118 (patch)
treee6ae46558f78e0a36ee1b98459cef2f2f1387b58 /src/packetSource.cpp
parentdisables unsupported socket option on windows (diff)
single thread per socket
Diffstat (limited to 'src/packetSource.cpp')
-rw-r--r--src/packetSource.cpp101
1 files changed, 49 insertions, 52 deletions
diff --git a/src/packetSource.cpp b/src/packetSource.cpp
index 17233a9..9eaedc6 100644
--- a/src/packetSource.cpp
+++ b/src/packetSource.cpp
@@ -49,9 +49,6 @@ void PacketSource::waitUntilReady()
UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port)
{
- last_recv_sock_.buf_ = NULL;
- last_recv_sock_.len_ = 0;
- last_recv_sock_.sock_ = NULL;
gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType());
}
@@ -59,8 +56,10 @@ UDPPacketSource::~UDPPacketSource()
{
std::list<sockets_element_t>::iterator it = sockets_.begin();
for(;it != sockets_.end(); ++it) {
- delete[](it->buf_);
- delete(it->sock_);
+/// this might be a needed by the receiver thread, TODO cleanup
+// delete[](it->buf_);
+// delete(it->sem_);
+// delete(it->sock_);
}
}
@@ -73,7 +72,11 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it)
sockets_element_t sock;
sock.buf_ = NULL;
sock.len_ = 0;
+ sock.sem_ = NULL;
sock.sock_ = new proto::socket(io_service_);
+ if(!sock.sock_)
+ AnytunError::throwErr() << "memory error";
+
sock.sock_->open(e.protocol());
#ifndef _MSC_VER
if(e.protocol() == proto::v6()) {
@@ -87,6 +90,27 @@ void UDPPacketSource::onResolve(PacketSourceResolverIt& it)
it++;
}
+ // prepare multi-socket recv
+ if(sockets_.size() > 1) {
+ std::list<sockets_element_t>::iterator it = sockets_.begin();
+ for(;it != sockets_.end(); ++it) {
+ it->len_ = 1600; // TODO packet size
+ it->buf_ = new u_int8_t[it->len_];
+ if(!it->buf_)
+ AnytunError::throwErr() << "memory error";
+
+ it->sem_ = new Semaphore();
+ if(!it->sem_) {
+ delete[](it->buf_);
+ AnytunError::throwErr() << "memory error";
+ }
+
+ boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, it));
+ it->sem_->up();
+ }
+
+ }
+
ready_sem_.up();
}
@@ -95,51 +119,30 @@ void UDPPacketSource::onError(const std::runtime_error& e)
gSignalController.inject(SIGERROR, e.what());
}
-void UDPPacketSource::recv_thread(thread_result_t result)
+void UDPPacketSource::recv_thread(std::list<sockets_element_t>::iterator it)
{
- cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << result.sock_->local_endpoint();
+ cLog.msg(Log::PRIO_DEBUG) << "started receiver thread for " << it->sock_->local_endpoint();
- result.len_ = static_cast<u_int32_t>(result.sock_->receive_from(boost::asio::buffer(result.buf_, result.len_), result.remote_));
- {
- Lock lock(thread_result_mutex_);
- thread_result_queue_.push(result);
+ thread_result_t result;
+ result.it_ = it;
+ for(;;) {
+ it->sem_->down();
+
+ cLog.msg(Log::PRIO_DEBUG) << "calling recv() for " << it->sock_->local_endpoint();
+
+ result.len_ = static_cast<u_int32_t>(it->sock_->receive_from(boost::asio::buffer(it->buf_, it->len_), result.remote_));
+ {
+ Lock lock(thread_result_mutex_);
+ thread_result_queue_.push(result);
+ }
+ thread_result_sem_.up();
}
- thread_result_sem_.up();
}
u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote)
{
if(sockets_.size() == 1)
- return static_cast<u_int32_t>(sockets_.begin()->sock_->receive_from(boost::asio::buffer(buf, len), remote));
-
- if(!last_recv_sock_.sock_) {
- std::list<sockets_element_t>::iterator it = sockets_.begin();
- for(;it != sockets_.end(); ++it) {
- if(it == sockets_.begin()) {
- it->buf_ = buf;
- it->len_ = len;
- }
- else {
- it->buf_ = new u_int8_t[len];
- if(!it->buf_)
- AnytunError::throwErr() << "memory error";
- it->len_ = len;
- }
-
- thread_result_t result;
- result.buf_ = it->buf_;
- result.len_ = it->len_;
- result.sock_ = it->sock_;
- boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result));
- }
- }
- else {
- thread_result_t result;
- result.buf_ = last_recv_sock_.buf_;
- result.len_ = last_recv_sock_.len_;
- result.sock_ = last_recv_sock_.sock_;
- boost::thread(boost::bind(&UDPPacketSource::recv_thread, this, result));
- }
+ return static_cast<u_int32_t>(sockets_.front().sock_->receive_from(boost::asio::buffer(buf, len), remote));
thread_result_sem_.down();
thread_result_t result;
@@ -148,18 +151,12 @@ u_int32_t UDPPacketSource::recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoi
result = thread_result_queue_.front();
thread_result_queue_.pop();
}
-
- last_recv_sock_.sock_ = result.sock_;
- last_recv_sock_.buf_ = result.buf_;
- last_recv_sock_.len_ = result.len_;
remote = result.remote_;
+ std::memcpy(buf, result.it_->buf_, (len < result.len_) ? len : result.len_);
+ len = (len < result.len_) ? len : result.len_;
+ result.it_->sem_->up();
- if(result.sock_ != sockets_.begin()->sock_) {
- std::memcpy(buf, result.buf_, (len < result.len_) ? len : result.len_);
- return (len < result.len_) ? len : result.len_;
- }
-
- return result.len_;
+ return len;
}
void UDPPacketSource::send(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint remote)