summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOthmar Gsenger <otti@anytun.org>2014-07-31 15:40:03 +0000
committerOthmar Gsenger <otti@anytun.org>2014-07-31 15:40:03 +0000
commitcd25b1fe9e2beb01f5d7d61daa87f73a8c191e40 (patch)
tree01b7bc9a13b70749eae5f5c9e9fc1e42f5c5615f
parentfix compiler warning (diff)
multithreaded sender seems to work...
-rw-r--r--src/anytun.cpp81
-rw-r--r--src/encryptedPacket.cpp11
-rw-r--r--src/encryptedPacket.h5
3 files changed, 82 insertions, 15 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 10dca8d..36d785e 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -45,6 +45,7 @@
#include <boost/bind.hpp>
#include <boost/thread.hpp>
+#include "boost/tuple/tuple.hpp"
#include <boost/assign.hpp>
#include <iostream>
#include <fstream>
@@ -151,6 +152,7 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack
cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToEncryptedQueue died because either dev or plain_in_queue or plain_memory_pool, pointer is null";
return;
}
+ cLog.msg(Log::PRIO_DEBUG) << "started threadReadTunDeviceAndWriteToPlainInQueue";
try {
for(;;) {
// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop";
@@ -187,29 +189,32 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack
}
}
-void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool, PacketSource* src)
+void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool, Channel<EncryptedPacket*>* encrypted_out_queue, Channel<EncryptedPacket*>* encrypted_memory_pool)
{
- if(!plain_in_queue || !plain_memory_pool || !src) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or src pointer is null";
+ if(!plain_in_queue || !plain_memory_pool || !encrypted_out_queue || !encrypted_memory_pool) {
+ cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or encrypted_out_queue or encrypted_memory_pool pointer is null";
return;
}
+ cLog.msg(Log::PRIO_DEBUG) << "started threadReadPlainInQueueAndWriteEncryptedOutQueue";
try {
std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
- EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
uint16_t mux = gOpt.getMux();
PacketSourceEndpoint emptyEndpoint;
for(;;) {
PlainPacket * plain_packet = NULL;
plain_in_queue->pop(&plain_packet);
- encrypted_packet.withAuthTag(false);
- encrypted_packet.setLength(MAX_PACKET_LENGTH);
+ EncryptedPacket * encrypted_packet = NULL;
+ encrypted_memory_pool->pop(&encrypted_packet);
+ encrypted_packet->withAuthTag(false);
+ encrypted_packet->setLength(MAX_PACKET_LENGTH);
if(gConnectionList.empty()) {
plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
continue;
}
//std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString();
@@ -222,6 +227,7 @@ void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queu
cit = gConnectionList.getConnection(mux);
} catch(std::exception&) {
plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
continue;
} // no route
else {
@@ -233,6 +239,7 @@ void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queu
if(cit==gConnectionList.getEnd()) {
plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
continue; //no connection
}
ConnectionParam& conn = cit->second;
@@ -240,30 +247,53 @@ void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queu
if(conn.remote_end_ == emptyEndpoint) {
//cLog.msg(Log::PRIO_INFO) << "no remote address set";
plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
continue;
}
// encrypt packet
- c->encrypt(conn.kd_, *plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
+ c->encrypt(conn.kd_, *plain_packet, *encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
- encrypted_packet.setHeader(conn.seq_nr_, gOpt.getSenderId(), mux);
+ encrypted_packet->setHeader(conn.seq_nr_, gOpt.getSenderId(), mux);
conn.seq_nr_++;
// add authentication tag
- a->generate(conn.kd_, encrypted_packet);
+ a->generate(conn.kd_, *encrypted_packet);
+
+ encrypted_packet->setEndpoint(conn.remote_end_);
+ plain_memory_pool->push(plain_packet);
+ encrypted_out_queue->push(encrypted_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught exception: " << e.what();
+ }
+}
+void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel<EncryptedPacket*>* encrypted_memory_pool, PacketSource* src)
+{
+ if(!encrypted_out_queue || !encrypted_memory_pool || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "sender thread died because either encrypted_out_queue or encrypted_memory_pool, or src pointer is null";
+ return;
+ }
+ cLog.msg(Log::PRIO_DEBUG) << "started threadWriteToSocket";
+ try {
+ for(;;) {
+ EncryptedPacket * encrypted_packet = NULL;
+ encrypted_out_queue->pop(&encrypted_packet);
try {
- src->send(encrypted_packet.getBuf(), encrypted_packet.getLength(), conn.remote_end_);
+ src->send(encrypted_packet->getBuf(), encrypted_packet->getLength(), encrypted_packet->getEndpoint());
} catch(std::exception& /*e*/) {
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what();
}
- plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
}
} catch(std::runtime_error& e) {
- cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught runtime_error: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught runtime_error: " << e.what();
} catch(std::exception& e) {
- cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught exception: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught exception: " << e.what();
}
}
@@ -370,8 +400,13 @@ void receiver(TunDevice* dev, PacketSource* src)
void startSendRecvThreads(TunDevice* dev, PacketSource* src)
{
+ unsigned int nthreads = boost::thread::hardware_concurrency();
+ if (!nthreads)
+ nthreads = 4;
+
src->waitUntilReady();
- uint16_t plain_in_queue_length = 10;
+
+ uint16_t plain_in_queue_length = 2 * nthreads + 4;
Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
uint16_t plain_memory_pool_length = plain_in_queue_length + 1;
@@ -379,8 +414,24 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) {
plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH));
}
+
+ uint16_t encrypted_out_queue_length = plain_in_queue_length;
+ Channel<EncryptedPacket*>* encrypted_out_queue = new Channel<EncryptedPacket*>(encrypted_out_queue_length);
+
+ uint16_t encrypted_memory_pool_length = encrypted_out_queue_length + 1;
+ Channel<EncryptedPacket*>* encrypted_memory_pool = new Channel<EncryptedPacket*>(encrypted_memory_pool_length);
+
+ uint32_t auth_tag_length = gOpt.getAuthTagLength();
+ for(uint16_t cnt=0; cnt < encrypted_memory_pool_length; cnt++) {
+ encrypted_memory_pool->push(new EncryptedPacket(MAX_PACKET_LENGTH, auth_tag_length));
+ }
+
boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool));
- boost::thread(boost::bind(threadReadPlainInQueueAndWriteToSocket, plain_in_queue, plain_memory_pool, src));
+ for(uint16_t cnt=0; cnt < nthreads; cnt++) {
+ boost::thread(boost::bind(threadReadPlainInQueueAndWriteEncryptedOutQueue, plain_in_queue, plain_memory_pool, encrypted_out_queue, encrypted_memory_pool));
+ }
+ //threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel<EncryptedPacket*>* encrypted_memory_pool, PacketSource* src)
+ boost::thread(boost::bind(threadWriteToSocket, encrypted_out_queue, encrypted_memory_pool, src));
boost::thread(boost::bind(receiver, dev, src));
}
diff --git a/src/encryptedPacket.cpp b/src/encryptedPacket.cpp
index d5c2a32..8efd563 100644
--- a/src/encryptedPacket.cpp
+++ b/src/encryptedPacket.cpp
@@ -249,3 +249,14 @@ uint32_t EncryptedPacket::getAuthTagLength()
return 0;
}
+
+void EncryptedPacket::setEndpoint(PacketSourceEndpoint & ep )
+{
+ endpoint_ = ep;
+}
+
+PacketSourceEndpoint EncryptedPacket::getEndpoint() const
+{
+ return endpoint_;
+}
+
diff --git a/src/encryptedPacket.h b/src/encryptedPacket.h
index c924ad9..8bf38f5 100644
--- a/src/encryptedPacket.h
+++ b/src/encryptedPacket.h
@@ -48,6 +48,7 @@
#include "datatypes.h"
#include "buffer.h"
+#include "packetSource.h"
class Cipher;
class EncryptedPacket : public Buffer
@@ -143,6 +144,9 @@ public:
void removeAuthTag();
uint8_t* getAuthTag();
uint32_t getAuthTagLength();
+
+ void setEndpoint(PacketSourceEndpoint & );
+ PacketSourceEndpoint getEndpoint() const;
private:
EncryptedPacket();
@@ -166,6 +170,7 @@ private:
uint8_t* payload_;
uint8_t* auth_tag_;
uint32_t auth_tag_length_;
+ PacketSourceEndpoint endpoint_;
};
#endif