From 9cd05a9b74ee5b18fc12c5ee1cb5192337db6590 Mon Sep 17 00:00:00 2001 From: Othmar Gsenger Date: Thu, 31 Jul 2014 10:54:03 +0000 Subject: added one new thread. memory pool still needs work. some kind of auto pointer returning memory would be nice. crypto needs bigger refactoring i'm afraid --- src/anytun.cpp | 98 ++++++++++++++++++++++++++++++++++++++------------------- src/channel.hpp | 16 +++++----- 2 files changed, 74 insertions(+), 40 deletions(-) (limited to 'src') diff --git a/src/anytun.cpp b/src/anytun.cpp index 9e97e5a..10dca8d 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -144,47 +144,72 @@ void syncListener() } #endif -void sender(TunDevice* dev, PacketSource* src) +// Start 1 Thread per Tun device +void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel* plain_in_queue, Channel* plain_memory_pool ) { - if(!dev || !src) { - cLog.msg(Log::PRIO_ERROR) << "sender thread died because either dev or src pointer is null"; + if(!dev || !plain_in_queue || ! plain_memory_pool) { + cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToEncryptedQueue died because either dev or plain_in_queue or plain_memory_pool, pointer is null"; return; } - try { - std::auto_ptr c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND)); - std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND)); - - PlainPacket plain_packet(MAX_PACKET_LENGTH); - EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength()); - - uint16_t mux = gOpt.getMux(); - PacketSourceEndpoint emptyEndpoint; for(;;) { - plain_packet.setLength(MAX_PACKET_LENGTH); - encrypted_packet.withAuthTag(false); - encrypted_packet.setLength(MAX_PACKET_LENGTH); +// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop"; + PlainPacket * plain_packet = NULL; + plain_memory_pool->pop(&plain_packet); +// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop"; // read packet from device - int len = dev->read(plain_packet.getPayload(), plain_packet.getPayloadLength()); + int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength()); if(len < 0) { + plain_memory_pool->push(plain_packet); continue; // silently ignore device read errors, this is probably no good idea... } if(static_cast(len) < PlainPacket::getHeaderLength()) { + plain_memory_pool->push(plain_packet); continue; // ignore short packets } - plain_packet.setPayloadLength(len); + plain_packet->setPayloadLength(len); // set payload type if(dev->getType() == TYPE_TUN) { - plain_packet.setPayloadType(PAYLOAD_TYPE_TUN); + plain_packet->setPayloadType(PAYLOAD_TYPE_TUN); } else if(dev->getType() == TYPE_TAP) { - plain_packet.setPayloadType(PAYLOAD_TYPE_TAP); + plain_packet->setPayloadType(PAYLOAD_TYPE_TAP); } else { - plain_packet.setPayloadType(0); + plain_packet->setPayloadType(0); } + plain_in_queue->push(plain_packet); + } + } catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what(); + } catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught exception: " << e.what(); + } +} + +void threadReadPlainInQueueAndWriteToSocket(Channel* plain_in_queue, Channel* plain_memory_pool, PacketSource* src) +{ + if(!plain_in_queue || !plain_memory_pool || !src) { + cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or src pointer is null"; + return; + } + + try { + std::auto_ptr c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND)); + std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND)); + + EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength()); + + uint16_t mux = gOpt.getMux(); + PacketSourceEndpoint emptyEndpoint; + for(;;) { + PlainPacket * plain_packet = NULL; + plain_in_queue->pop(&plain_packet); + encrypted_packet.withAuthTag(false); + encrypted_packet.setLength(MAX_PACKET_LENGTH); if(gConnectionList.empty()) { + plain_memory_pool->push(plain_packet); continue; } //std::cout << "got Packet for plain "<getDstAddr()); //std::cout << " -> "<push(plain_packet); + continue; + } // no route else { cit = gConnectionList.getBegin(); } @@ -204,17 +232,19 @@ void sender(TunDevice* dev, PacketSource* src) #endif if(cit==gConnectionList.getEnd()) { + plain_memory_pool->push(plain_packet); continue; //no connection } ConnectionParam& conn = cit->second; if(conn.remote_end_ == emptyEndpoint) { //cLog.msg(Log::PRIO_INFO) << "no remote address set"; + plain_memory_pool->push(plain_packet); continue; } // encrypt packet - c->encrypt(conn.kd_, plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux); + c->encrypt(conn.kd_, *plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux); encrypted_packet.setHeader(conn.seq_nr_, gOpt.getSenderId(), mux); conn.seq_nr_++; @@ -228,11 +258,12 @@ void sender(TunDevice* dev, PacketSource* src) //TODO: do something here //cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what(); } + plain_memory_pool->push(plain_packet); } } catch(std::runtime_error& e) { - cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught runtime_error: " << e.what(); + cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught runtime_error: " << e.what(); } catch(std::exception& e) { - cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught exception: " << e.what(); + cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught exception: " << e.what(); } } @@ -340,13 +371,16 @@ void receiver(TunDevice* dev, PacketSource* src) void startSendRecvThreads(TunDevice* dev, PacketSource* src) { src->waitUntilReady(); - //channel a(1); - //a.push(23); - //a.push(23); - //int i=0; - //a.pop(i); - //std::cout << i << std::endl; - boost::thread(boost::bind(sender, dev, src)); + uint16_t plain_in_queue_length = 10; + Channel* plain_in_queue = new Channel(plain_in_queue_length); + + uint16_t plain_memory_pool_length = plain_in_queue_length + 1; + Channel* plain_memory_pool = new Channel(plain_memory_pool_length); + for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) { + plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH)); + } + boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool)); + boost::thread(boost::bind(threadReadPlainInQueueAndWriteToSocket, plain_in_queue, plain_memory_pool, src)); boost::thread(boost::bind(receiver, dev, src)); } diff --git a/src/channel.hpp b/src/channel.hpp index 8c69f19..8441750 100644 --- a/src/channel.hpp +++ b/src/channel.hpp @@ -50,7 +50,7 @@ #include template -class channel +class Channel { private: boost::mutex mtx_; @@ -61,24 +61,24 @@ private: boost::lock_guard guard(mtx_); cb_.push_back(t); } - void pop_cb(T & ret) { + void pop_cb(T * ret) { boost::lock_guard guard(mtx_); - ret = cb_[0]; + *ret = cb_[0]; cb_.pop_front(); } public: - channel(channel const &) = delete; -// channel(channel &&) = delete; - channel& operator=(const channel &) = delete; - channel(unsigned int num_elements=10) + Channel(Channel const &) = delete; +// Channel(Channel &&) = delete; + Channel& operator=(const Channel &) = delete; + Channel(unsigned int num_elements=10) :cb_(num_elements),sem_read_(0),sem_write_(num_elements) {}; void push(T const & t ) { sem_write_.down(); this->push_cb(t); sem_read_.up(); } - void pop(T & ret) { + void pop(T * ret) { sem_read_.down(); this->pop_cb(ret); sem_write_.up(); -- cgit v1.2.3