From cd25b1fe9e2beb01f5d7d61daa87f73a8c191e40 Mon Sep 17 00:00:00 2001 From: Othmar Gsenger Date: Thu, 31 Jul 2014 15:40:03 +0000 Subject: multithreaded sender seems to work... --- src/anytun.cpp | 81 ++++++++++++++++++++++++++++++++++++++++--------- src/encryptedPacket.cpp | 11 +++++++ src/encryptedPacket.h | 5 +++ 3 files changed, 82 insertions(+), 15 deletions(-) diff --git a/src/anytun.cpp b/src/anytun.cpp index 10dca8d..36d785e 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -45,6 +45,7 @@ #include #include +#include "boost/tuple/tuple.hpp" #include #include #include @@ -151,6 +152,7 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel* plain_in_queue, Channel* plain_memory_pool, PacketSource* src) +void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel* plain_in_queue, Channel* plain_memory_pool, Channel* encrypted_out_queue, Channel* encrypted_memory_pool) { - if(!plain_in_queue || !plain_memory_pool || !src) { - cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or src pointer is null"; + if(!plain_in_queue || !plain_memory_pool || !encrypted_out_queue || !encrypted_memory_pool) { + cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or encrypted_out_queue or encrypted_memory_pool pointer is null"; return; } + cLog.msg(Log::PRIO_DEBUG) << "started threadReadPlainInQueueAndWriteEncryptedOutQueue"; try { std::auto_ptr c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND)); std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND)); - EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength()); uint16_t mux = gOpt.getMux(); PacketSourceEndpoint emptyEndpoint; for(;;) { PlainPacket * plain_packet = NULL; plain_in_queue->pop(&plain_packet); - encrypted_packet.withAuthTag(false); - encrypted_packet.setLength(MAX_PACKET_LENGTH); + EncryptedPacket * encrypted_packet = NULL; + encrypted_memory_pool->pop(&encrypted_packet); + encrypted_packet->withAuthTag(false); + encrypted_packet->setLength(MAX_PACKET_LENGTH); if(gConnectionList.empty()) { plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); continue; } //std::cout << "got Packet for plain "<* plain_in_queu cit = gConnectionList.getConnection(mux); } catch(std::exception&) { plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); continue; } // no route else { @@ -233,6 +239,7 @@ void threadReadPlainInQueueAndWriteToSocket(Channel* plain_in_queu if(cit==gConnectionList.getEnd()) { plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); continue; //no connection } ConnectionParam& conn = cit->second; @@ -240,30 +247,53 @@ void threadReadPlainInQueueAndWriteToSocket(Channel* plain_in_queu if(conn.remote_end_ == emptyEndpoint) { //cLog.msg(Log::PRIO_INFO) << "no remote address set"; plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); continue; } // encrypt packet - c->encrypt(conn.kd_, *plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux); + c->encrypt(conn.kd_, *plain_packet, *encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux); - encrypted_packet.setHeader(conn.seq_nr_, gOpt.getSenderId(), mux); + encrypted_packet->setHeader(conn.seq_nr_, gOpt.getSenderId(), mux); conn.seq_nr_++; // add authentication tag - a->generate(conn.kd_, encrypted_packet); + a->generate(conn.kd_, *encrypted_packet); + + encrypted_packet->setEndpoint(conn.remote_end_); + plain_memory_pool->push(plain_packet); + encrypted_out_queue->push(encrypted_packet); + } + } catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught runtime_error: " << e.what(); + } catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught exception: " << e.what(); + } +} +void threadWriteToSocket(Channel* encrypted_out_queue, Channel* encrypted_memory_pool, PacketSource* src) +{ + if(!encrypted_out_queue || !encrypted_memory_pool || !src) { + cLog.msg(Log::PRIO_ERROR) << "sender thread died because either encrypted_out_queue or encrypted_memory_pool, or src pointer is null"; + return; + } + cLog.msg(Log::PRIO_DEBUG) << "started threadWriteToSocket"; + try { + for(;;) { + EncryptedPacket * encrypted_packet = NULL; + encrypted_out_queue->pop(&encrypted_packet); try { - src->send(encrypted_packet.getBuf(), encrypted_packet.getLength(), conn.remote_end_); + src->send(encrypted_packet->getBuf(), encrypted_packet->getLength(), encrypted_packet->getEndpoint()); } catch(std::exception& /*e*/) { //TODO: do something here //cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what(); } - plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); } } catch(std::runtime_error& e) { - cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught runtime_error: " << e.what(); + cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught runtime_error: " << e.what(); } catch(std::exception& e) { - cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught exception: " << e.what(); + cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught exception: " << e.what(); } } @@ -370,8 +400,13 @@ void receiver(TunDevice* dev, PacketSource* src) void startSendRecvThreads(TunDevice* dev, PacketSource* src) { + unsigned int nthreads = boost::thread::hardware_concurrency(); + if (!nthreads) + nthreads = 4; + src->waitUntilReady(); - uint16_t plain_in_queue_length = 10; + + uint16_t plain_in_queue_length = 2 * nthreads + 4; Channel* plain_in_queue = new Channel(plain_in_queue_length); uint16_t plain_memory_pool_length = plain_in_queue_length + 1; @@ -379,8 +414,24 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) { plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH)); } + + uint16_t encrypted_out_queue_length = plain_in_queue_length; + Channel* encrypted_out_queue = new Channel(encrypted_out_queue_length); + + uint16_t encrypted_memory_pool_length = encrypted_out_queue_length + 1; + Channel* encrypted_memory_pool = new Channel(encrypted_memory_pool_length); + + uint32_t auth_tag_length = gOpt.getAuthTagLength(); + for(uint16_t cnt=0; cnt < encrypted_memory_pool_length; cnt++) { + encrypted_memory_pool->push(new EncryptedPacket(MAX_PACKET_LENGTH, auth_tag_length)); + } + boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool)); - boost::thread(boost::bind(threadReadPlainInQueueAndWriteToSocket, plain_in_queue, plain_memory_pool, src)); + for(uint16_t cnt=0; cnt < nthreads; cnt++) { + boost::thread(boost::bind(threadReadPlainInQueueAndWriteEncryptedOutQueue, plain_in_queue, plain_memory_pool, encrypted_out_queue, encrypted_memory_pool)); + } + //threadWriteToSocket(Channel* encrypted_out_queue, Channel* encrypted_memory_pool, PacketSource* src) + boost::thread(boost::bind(threadWriteToSocket, encrypted_out_queue, encrypted_memory_pool, src)); boost::thread(boost::bind(receiver, dev, src)); } diff --git a/src/encryptedPacket.cpp b/src/encryptedPacket.cpp index d5c2a32..8efd563 100644 --- a/src/encryptedPacket.cpp +++ b/src/encryptedPacket.cpp @@ -249,3 +249,14 @@ uint32_t EncryptedPacket::getAuthTagLength() return 0; } + +void EncryptedPacket::setEndpoint(PacketSourceEndpoint & ep ) +{ + endpoint_ = ep; +} + +PacketSourceEndpoint EncryptedPacket::getEndpoint() const +{ + return endpoint_; +} + diff --git a/src/encryptedPacket.h b/src/encryptedPacket.h index c924ad9..8bf38f5 100644 --- a/src/encryptedPacket.h +++ b/src/encryptedPacket.h @@ -48,6 +48,7 @@ #include "datatypes.h" #include "buffer.h" +#include "packetSource.h" class Cipher; class EncryptedPacket : public Buffer @@ -143,6 +144,9 @@ public: void removeAuthTag(); uint8_t* getAuthTag(); uint32_t getAuthTagLength(); + + void setEndpoint(PacketSourceEndpoint & ); + PacketSourceEndpoint getEndpoint() const; private: EncryptedPacket(); @@ -166,6 +170,7 @@ private: uint8_t* payload_; uint8_t* auth_tag_; uint32_t auth_tag_length_; + PacketSourceEndpoint endpoint_; }; #endif -- cgit v1.2.3