From fc927f97f7060d638fa4e61a9331df49ceef143f Mon Sep 17 00:00:00 2001 From: Othmar Gsenger Date: Mon, 1 Sep 2014 21:10:57 +0000 Subject: added QueuePusher, added ANYTUN_MULTITHREAD comile option --- src/anytun.cpp | 203 ++++++++++++++++++---------------------------------- src/channel.hpp | 3 - src/configure | 14 ++++ src/queuePusher.hpp | 80 +++++++++++++++++++++ 4 files changed, 162 insertions(+), 138 deletions(-) create mode 100644 src/queuePusher.hpp diff --git a/src/anytun.cpp b/src/anytun.cpp index 8cb9c43..cb3c3ed 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -92,9 +92,13 @@ #endif #include "cryptinit.hpp" -#include "channel.hpp" #include "sysExec.h" +#ifdef ANYTUN_MULTITHREAD +#include "channel.hpp" +#include "queuePusher.hpp" +#endif + bool disableRouting = false; void createConnection(const PacketSourceEndpoint& remote_end, window_size_t seqSize, mux_t mux) @@ -145,6 +149,21 @@ void syncListener() } #endif +#ifndef ANYTUN_MULTITHREAD +void sender(TunDevice* dev, PacketSource* src) +{ + if(!dev || !src) { + cLog.msg(Log::PRIO_ERROR) << "sender died because either dev or src is null"; + return; + } + PlainPacket * plain_packet = new PlainPacket(MAX_PACKET_LENGTH); + EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, gOpt.getAuthTagLength()); + cLog.msg(Log::PRIO_DEBUG) << "started sender"; + std::auto_ptr c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND)); + std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND)); + uint16_t mux = gOpt.getMux(); + PacketSourceEndpoint emptyEndpoint; +#else // Start 1 Thread per Tun device void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel* plain_in_queue, Channel* plain_memory_pool ) { @@ -153,23 +172,23 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channelpop(&plain_packet); + QueuePusher qp_plain(plain_packet,plain_memory_pool); +#endif plain_packet->setLength(MAX_PACKET_LENGTH); -// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop"; // read packet from device int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength()); if(len < 0) { - plain_memory_pool->push(plain_packet); continue; // silently ignore device read errors, this is probably no good idea... } if(static_cast(len) < PlainPacket::getHeaderLength()) { - plain_memory_pool->push(plain_packet); continue; // ignore short packets } plain_packet->setPayloadLength(len); @@ -181,7 +200,8 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, ChannelsetPayloadType(0); } - plain_in_queue->push(plain_packet); +#ifdef ANYTUN_MULTITHREAD + qp_plain.changeQueue(plain_in_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what(); @@ -208,14 +228,15 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel* plai for(;;) { PlainPacket * plain_packet = NULL; plain_in_queue->pop(&plain_packet); + QueuePusher qp_plain(plain_packet,plain_memory_pool); EncryptedPacket * encrypted_packet = NULL; encrypted_memory_pool->pop(&encrypted_packet); + QueuePusher qp_encrypted(encrypted_packet,encrypted_memory_pool); +#endif encrypted_packet->withAuthTag(false); encrypted_packet->setLength(MAX_PACKET_LENGTH); if(gConnectionList.empty()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } //std::cout << "got Packet for plain "<* plai //std::cout << " -> "<push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } // no route else { @@ -239,16 +258,12 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel* plai #endif if(cit==gConnectionList.getEnd()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; //no connection } ConnectionParam& conn = cit->second; if(conn.remote_end_ == emptyEndpoint) { //cLog.msg(Log::PRIO_INFO) << "no remote address set"; - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } @@ -262,8 +277,8 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel* plai a->generate(conn.kd_, *encrypted_packet); encrypted_packet->setEndpoint(conn.remote_end_); - plain_memory_pool->push(plain_packet); - encrypted_out_queue->push(encrypted_packet); +#ifdef ANYTUN_MULTITHREAD + qp_encrypted.changeQueue(encrypted_out_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught runtime_error: " << e.what(); @@ -283,13 +298,14 @@ void threadWriteToSocket(Channel* encrypted_out_queue, Channel for(;;) { EncryptedPacket * encrypted_packet = NULL; encrypted_out_queue->pop(&encrypted_packet); + QueuePusher qp_encrypted(encrypted_packet,encrypted_memory_pool); +#endif try { src->send(encrypted_packet->getBuf(), encrypted_packet->getLength(), encrypted_packet->getEndpoint()); } catch(std::exception& /*e*/) { //TODO: do something here //cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what(); } - encrypted_memory_pool->push(encrypted_packet); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught runtime_error: " << e.what(); @@ -298,6 +314,25 @@ void threadWriteToSocket(Channel* encrypted_out_queue, Channel } } +#ifndef ANYTUN_MULTITHREAD +void receiver(TunDevice* dev, PacketSource* src) +{ + if(!dev || !src) { + cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null"; + return; + } + + try { + std::auto_ptr c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND)); + std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND)); + + uint32_t auth_tag_length = gOpt.getAuthTagLength(); + EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, auth_tag_length); + PlainPacket * plain_packet= new PlainPacket(MAX_PACKET_LENGTH); + + for(;;) { + PacketSourceEndpoint remote_end; +#else void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel* encrypted_in_queue, Channel* encrypted_memory_pool) { if(!src || ! encrypted_in_queue || ! encrypted_memory_pool) { @@ -311,9 +346,10 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channelpop(&encrypted_packet); + QueuePusher qp_encrypted(encrypted_packet,encrypted_memory_pool); encrypted_packet->setLength(MAX_PACKET_LENGTH); PacketSourceEndpoint remote_end; - +#endif encrypted_packet->withAuthTag(false); encrypted_packet->setLength(MAX_PACKET_LENGTH); @@ -324,21 +360,19 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channelpush(encrypted_packet); continue; } if(len < 0) { - encrypted_memory_pool->push(encrypted_packet); continue; // silently ignore socket recv errors, this is probably no good idea... } if(static_cast(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) { - encrypted_memory_pool->push(encrypted_packet); continue; // ignore short packets } encrypted_packet->setLength(len); encrypted_packet->setEndpoint(remote_end); - encrypted_in_queue->push(encrypted_packet); +#ifdef ANYTUN_MULTITHREAD + qp_encrypted.changeQueue(encrypted_in_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught runtime_error: " << e.what(); @@ -360,9 +394,12 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel* for(;;) { PlainPacket * plain_packet = NULL; plain_memory_pool->pop(&plain_packet); + QueuePusher qp_plain(plain_packet,plain_memory_pool); plain_packet->setLength(MAX_PACKET_LENGTH); EncryptedPacket * encrypted_packet = NULL; encrypted_in_queue->pop(&encrypted_packet); + QueuePusher qp_encrypted(encrypted_packet,encrypted_memory_pool); +#endif PacketSourceEndpoint remote_end = encrypted_packet->getEndpoint(); mux_t mux = encrypted_packet->getMux(); // autodetect peer @@ -373,8 +410,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel* ConnectionMap::iterator cit = gConnectionList.getConnection(mux); if(cit == gConnectionList.getEnd()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } ConnectionParam& conn = cit->second; @@ -382,8 +417,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel* // check whether auth tag is ok or not if(!a->checkTag(conn.kd_, *encrypted_packet)) { cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!"; - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } @@ -391,8 +424,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel* if(conn.seq_window_.checkAndAdd(encrypted_packet->getSenderId(), encrypted_packet->getSeqNr())) { cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_ << " seq:"<< encrypted_packet->getSeqNr() << " sid: "<< encrypted_packet->getSenderId(); - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } @@ -408,16 +439,14 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel* } // ignore zero length packets if(encrypted_packet->getPayloadLength() <= PlainPacket::getHeaderLength()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } // decrypt packet c->decrypt(conn.kd_, *encrypted_packet, *plain_packet); - plain_out_queue->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); +#ifdef ANYTUN_MULTITHREAD + qp_plain.changeQueue(plain_out_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught runtime_error: " << e.what(); @@ -433,6 +462,8 @@ void threadWriteToTunDevice( Channel* plain_out_queue, Channelpop(&plain_packet); + QueuePusher qp_plain(plain_packet,plain_memory_pool); +#endif if((dev->getType() == TYPE_TUN && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN4 && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN6) || (dev->getType() == TYPE_TAP && plain_packet->getPayloadType() != PAYLOAD_TYPE_TAP)) { @@ -442,7 +473,6 @@ void threadWriteToTunDevice( Channel* plain_out_queue, Channelwrite(plain_packet->getPayload(), plain_packet->getLength()); - plain_memory_pool->push(plain_packet); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught runtime_error: " << e.what(); @@ -452,107 +482,6 @@ void threadWriteToTunDevice( Channel* plain_out_queue, Channel c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND)); - std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND)); - - uint32_t auth_tag_length = gOpt.getAuthTagLength(); - EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, auth_tag_length); - PlainPacket plain_packet(MAX_PACKET_LENGTH); - - for(;;) { - PacketSourceEndpoint remote_end; - - plain_packet.setLength(MAX_PACKET_LENGTH); - encrypted_packet.withAuthTag(false); - encrypted_packet.setLength(MAX_PACKET_LENGTH); - - // read packet from socket - int len; - try { - len = src->recv(encrypted_packet.getBuf(), encrypted_packet.getLength(), remote_end); - } catch(std::exception& /*e*/) { - //TODO: do something here - //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what(); - continue; - } - if(len < 0) { - continue; // silently ignore socket recv errors, this is probably no good idea... - } - - if(static_cast(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) { - continue; // ignore short packets - } - encrypted_packet.setLength(len); - - mux_t mux = encrypted_packet.getMux(); - // autodetect peer - if(gConnectionList.empty() && gOpt.getRemoteAddr() == "") { - cLog.msg(Log::PRIO_NOTICE) << "autodetected remote host " << remote_end; - createConnection(remote_end, gOpt.getSeqWindowSize(),mux); - } - - ConnectionMap::iterator cit = gConnectionList.getConnection(mux); - if(cit == gConnectionList.getEnd()) { - continue; - } - ConnectionParam& conn = cit->second; - - // check whether auth tag is ok or not - if(!a->checkTag(conn.kd_, encrypted_packet)) { - cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!"; - continue; - } - - // Replay Protection - if(conn.seq_window_.checkAndAdd(encrypted_packet.getSenderId(), encrypted_packet.getSeqNr())) { - cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_ - << " seq:"<< encrypted_packet.getSeqNr() << " sid: "<< encrypted_packet.getSenderId(); - continue; - } - - //Allow dynamic IP changes - //TODO: add command line option to turn this off - if(remote_end != conn.remote_end_) { - cLog.msg(Log::PRIO_NOTICE) << "connection "<< mux << " autodetected remote host ip changed " << remote_end; - conn.remote_end_=remote_end; -#ifndef ANYTUN_NOSYNC - SyncCommand sc(gConnectionList,mux); - gSyncQueue.push(sc); -#endif - } - // ignore zero length packets - if(encrypted_packet.getPayloadLength() <= PlainPacket::getHeaderLength()) { - continue; - } - - // decrypt packet - c->decrypt(conn.kd_, encrypted_packet, plain_packet); - - // check payload_type - if((dev->getType() == TYPE_TUN && plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN4 && - plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN6) || - (dev->getType() == TYPE_TAP && plain_packet.getPayloadType() != PAYLOAD_TYPE_TAP)) { - continue; - } - - // write it on the device - dev->write(plain_packet.getPayload(), plain_packet.getLength()); - } - } catch(std::runtime_error& e) { - cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught runtime_error: " << e.what(); - } catch(std::exception& e) { - cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught exception: " << e.what(); - } -} - void startSendRecvThreads(TunDevice* dev, PacketSource* src) { unsigned int nthreads = boost::thread::hardware_concurrency(); @@ -560,7 +489,7 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) nthreads = 4; src->waitUntilReady(); - +#ifdef ANYTUN_MULTITHREAD uint16_t plain_in_queue_length = 2 * nthreads + 4; Channel* plain_in_queue = new Channel(plain_in_queue_length); @@ -600,6 +529,10 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) boost::thread(boost::bind(threadReadEncryptedInQueueAndWritePlainOutQueue, encrypted_in_queue, encrypted_memory_pool, plain_out_queue, plain_memory_pool)); // } boost::thread(boost::bind(threadWriteToTunDevice, plain_out_queue, plain_memory_pool, dev)); +#else + boost::thread(boost::bind(sender, dev, src)); + boost::thread(boost::bind(receiver, dev, src)); +#endif } diff --git a/src/channel.hpp b/src/channel.hpp index c30dd61..c2a1c66 100644 --- a/src/channel.hpp +++ b/src/channel.hpp @@ -62,9 +62,6 @@ class Channel #endif { private: -#ifdef BOOST_HAS_TRIVIAL_ASSIGN - BOOST_STATIC_ASSERT((boost::has_trivial_assign::value)); -#endif boost::mutex mtx_; boost::circular_buffer cb_; Semaphore sem_read_, sem_write_; diff --git a/src/configure b/src/configure index f070150..ff6ba01 100755 --- a/src/configure +++ b/src/configure @@ -64,6 +64,7 @@ MANDIR='' INSTALLMANPAGE=1 EXAMPLESDIR='' INSTALLEXAMPLES=1 +MULTITHREAD=0 BOOST_PREFIX='' GCRYPT_PREFIX='' @@ -89,6 +90,8 @@ print_usage() { echo " --enable-passphrase enable master key and salt passphrase" echo " --disable-routing disable built-in routing capability" echo " --enable-routing enable built-in routing capability" + echo " --enable-new-multithread enable new experimental multithreading" + echo " --disable-new-multithread disable new experimental multithreading" echo " --cross-prefix= add PREFIX to compiler calls" echo " --with-boost= don't use systemwide boost" echo " --with-gcrypt= don't use systemwide gcrypt" @@ -154,6 +157,12 @@ do --disable-routing) ROUTING=0 ;; + --enable-new-multithread) + MULTITHREAD=1 + ;; + --disable-new-multithread) + MULTITHREAD=0 + ;; --ebuild-compat) EBUILD_COMPAT=1 ;; @@ -300,6 +309,11 @@ if [ $ROUTING -eq 0 ]; then echo "disabling built-in routing capability" fi +if [ $MULTITHREAD -eq 1 ]; then + CXXFLAGS=$CXXFLAGS' -DANYTUN_MULTITHREAD' + echo "enabling new experimental mutlithread" +fi + if [ -z "$BINDIR" ]; then BINDIR=$PREFIX/bin fi diff --git a/src/queuePusher.hpp b/src/queuePusher.hpp new file mode 100644 index 0000000..7e960db --- /dev/null +++ b/src/queuePusher.hpp @@ -0,0 +1,80 @@ +/* + * anytun + * + * The secure anycast tunneling protocol (satp) defines a protocol used + * for communication between any combination of unicast and anycast + * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel + * mode and allows tunneling of every ETHER TYPE protocol (e.g. + * ethernet, ip, arp ...). satp directly includes cryptography and + * message authentication based on the methods used by SRTP. It is + * intended to deliver a generic, scaleable and secure solution for + * tunneling and relaying of packets of any protocol. + * + * + * Copyright (C) 2007-2014 Markus Grüneis, Othmar Gsenger, Erwin Nindl, + * Christian Pointner + * + * This file is part of Anytun. + * + * Anytun is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * Anytun is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Anytun. If not, see . + * + * In addition, as a special exception, the copyright holders give + * permission to link the code of portions of this program with the + * OpenSSL library under certain conditions as described in each + * individual source file, and distribute linked combinations + * including the two. + * You must obey the GNU General Public License in all respects + * for all of the code used other than OpenSSL. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you + * do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source + * files in the program, then also delete it here. + */ + +#ifndef ANYTUN_queue_pusher_hpp_INCLUDED +#define ANYTUN_queue_pusher_hpp_INCLUDED + +#ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS +#include +#endif + +template +class QueuePusher +#ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS + : boost::noncopyable +#endif +{ +private: + T * object_; + Channel * queue_; + +public: +#ifndef BOOST_NO_CXX11_DELETED_FUNCTIONS + QueuePusher(QueuePusher const &) = delete; + QueuePusher(QueuePusher &&) = delete; + QueuePusher& operator=(const QueuePusher &) = delete; +#endif + void changeQueue( Channel * queue ) { + queue_=queue; + } + QueuePusher(T * object,Channel * queue) + :object_(object),queue_(queue) {}; + ~QueuePusher() { + if(queue_ && object_) + queue_->push(object_); + } +}; + +#endif -- cgit v1.2.3