diff options
author | Othmar Gsenger <otti@anytun.org> | 2014-09-01 21:10:57 +0000 |
---|---|---|
committer | Othmar Gsenger <otti@anytun.org> | 2014-09-01 21:10:57 +0000 |
commit | fc927f97f7060d638fa4e61a9331df49ceef143f (patch) | |
tree | 05236c6a1fd463012e4a617a88ba859141547607 /src/anytun.cpp | |
parent | new threading working, but only one crypto thread per direction possible yet (diff) |
added QueuePusher, added ANYTUN_MULTITHREAD comile option
Diffstat (limited to 'src/anytun.cpp')
-rw-r--r-- | src/anytun.cpp | 203 |
1 files changed, 68 insertions, 135 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp index 8cb9c43..cb3c3ed 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -92,9 +92,13 @@ #endif #include "cryptinit.hpp" -#include "channel.hpp" #include "sysExec.h" +#ifdef ANYTUN_MULTITHREAD +#include "channel.hpp" +#include "queuePusher.hpp" +#endif + bool disableRouting = false; void createConnection(const PacketSourceEndpoint& remote_end, window_size_t seqSize, mux_t mux) @@ -145,6 +149,21 @@ void syncListener() } #endif +#ifndef ANYTUN_MULTITHREAD +void sender(TunDevice* dev, PacketSource* src) +{ + if(!dev || !src) { + cLog.msg(Log::PRIO_ERROR) << "sender died because either dev or src is null"; + return; + } + PlainPacket * plain_packet = new PlainPacket(MAX_PACKET_LENGTH); + EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, gOpt.getAuthTagLength()); + cLog.msg(Log::PRIO_DEBUG) << "started sender"; + std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND)); + std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND)); + uint16_t mux = gOpt.getMux(); + PacketSourceEndpoint emptyEndpoint; +#else // Start 1 Thread per Tun device void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool ) { @@ -153,23 +172,23 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack return; } cLog.msg(Log::PRIO_DEBUG) << "started threadReadTunDeviceAndWriteToPlainInQueue"; +#endif try { for(;;) { -// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop"; +#ifdef ANYTUN_MULTITHREAD PlainPacket * plain_packet = NULL; plain_memory_pool->pop(&plain_packet); + QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool); +#endif plain_packet->setLength(MAX_PACKET_LENGTH); -// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop"; // read packet from device int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength()); if(len < 0) { - plain_memory_pool->push(plain_packet); continue; // silently ignore device read errors, this is probably no good idea... } if(static_cast<uint32_t>(len) < PlainPacket::getHeaderLength()) { - plain_memory_pool->push(plain_packet); continue; // ignore short packets } plain_packet->setPayloadLength(len); @@ -181,7 +200,8 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack } else { plain_packet->setPayloadType(0); } - plain_in_queue->push(plain_packet); +#ifdef ANYTUN_MULTITHREAD + qp_plain.changeQueue(plain_in_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what(); @@ -208,14 +228,15 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai for(;;) { PlainPacket * plain_packet = NULL; plain_in_queue->pop(&plain_packet); + QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool); EncryptedPacket * encrypted_packet = NULL; encrypted_memory_pool->pop(&encrypted_packet); + QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool); +#endif encrypted_packet->withAuthTag(false); encrypted_packet->setLength(MAX_PACKET_LENGTH); if(gConnectionList.empty()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } //std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString(); @@ -227,8 +248,6 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai //std::cout << " -> "<<mux << std::endl; cit = gConnectionList.getConnection(mux); } catch(std::exception&) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } // no route else { @@ -239,16 +258,12 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai #endif if(cit==gConnectionList.getEnd()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; //no connection } ConnectionParam& conn = cit->second; if(conn.remote_end_ == emptyEndpoint) { //cLog.msg(Log::PRIO_INFO) << "no remote address set"; - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } @@ -262,8 +277,8 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai a->generate(conn.kd_, *encrypted_packet); encrypted_packet->setEndpoint(conn.remote_end_); - plain_memory_pool->push(plain_packet); - encrypted_out_queue->push(encrypted_packet); +#ifdef ANYTUN_MULTITHREAD + qp_encrypted.changeQueue(encrypted_out_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught runtime_error: " << e.what(); @@ -283,13 +298,14 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel for(;;) { EncryptedPacket * encrypted_packet = NULL; encrypted_out_queue->pop(&encrypted_packet); + QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool); +#endif try { src->send(encrypted_packet->getBuf(), encrypted_packet->getLength(), encrypted_packet->getEndpoint()); } catch(std::exception& /*e*/) { //TODO: do something here //cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what(); } - encrypted_memory_pool->push(encrypted_packet); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught runtime_error: " << e.what(); @@ -298,6 +314,25 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel } } +#ifndef ANYTUN_MULTITHREAD +void receiver(TunDevice* dev, PacketSource* src) +{ + if(!dev || !src) { + cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null"; + return; + } + + try { + std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND)); + std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND)); + + uint32_t auth_tag_length = gOpt.getAuthTagLength(); + EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, auth_tag_length); + PlainPacket * plain_packet= new PlainPacket(MAX_PACKET_LENGTH); + + for(;;) { + PacketSourceEndpoint remote_end; +#else void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<EncryptedPacket*>* encrypted_in_queue, Channel<EncryptedPacket*>* encrypted_memory_pool) { if(!src || ! encrypted_in_queue || ! encrypted_memory_pool) { @@ -311,9 +346,10 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<Encry for(;;) { EncryptedPacket * encrypted_packet = NULL; encrypted_memory_pool->pop(&encrypted_packet); + QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool); encrypted_packet->setLength(MAX_PACKET_LENGTH); PacketSourceEndpoint remote_end; - +#endif encrypted_packet->withAuthTag(false); encrypted_packet->setLength(MAX_PACKET_LENGTH); @@ -324,21 +360,19 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<Encry } catch(std::exception& /*e*/) { //TODO: do something here //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what(); - encrypted_memory_pool->push(encrypted_packet); continue; } if(len < 0) { - encrypted_memory_pool->push(encrypted_packet); continue; // silently ignore socket recv errors, this is probably no good idea... } if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) { - encrypted_memory_pool->push(encrypted_packet); continue; // ignore short packets } encrypted_packet->setLength(len); encrypted_packet->setEndpoint(remote_end); - encrypted_in_queue->push(encrypted_packet); +#ifdef ANYTUN_MULTITHREAD + qp_encrypted.changeQueue(encrypted_in_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught runtime_error: " << e.what(); @@ -360,9 +394,12 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>* for(;;) { PlainPacket * plain_packet = NULL; plain_memory_pool->pop(&plain_packet); + QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool); plain_packet->setLength(MAX_PACKET_LENGTH); EncryptedPacket * encrypted_packet = NULL; encrypted_in_queue->pop(&encrypted_packet); + QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool); +#endif PacketSourceEndpoint remote_end = encrypted_packet->getEndpoint(); mux_t mux = encrypted_packet->getMux(); // autodetect peer @@ -373,8 +410,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>* ConnectionMap::iterator cit = gConnectionList.getConnection(mux); if(cit == gConnectionList.getEnd()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } ConnectionParam& conn = cit->second; @@ -382,8 +417,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>* // check whether auth tag is ok or not if(!a->checkTag(conn.kd_, *encrypted_packet)) { cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!"; - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } @@ -391,8 +424,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>* if(conn.seq_window_.checkAndAdd(encrypted_packet->getSenderId(), encrypted_packet->getSeqNr())) { cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_ << " seq:"<< encrypted_packet->getSeqNr() << " sid: "<< encrypted_packet->getSenderId(); - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } @@ -408,16 +439,14 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>* } // ignore zero length packets if(encrypted_packet->getPayloadLength() <= PlainPacket::getHeaderLength()) { - plain_memory_pool->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); continue; } // decrypt packet c->decrypt(conn.kd_, *encrypted_packet, *plain_packet); - plain_out_queue->push(plain_packet); - encrypted_memory_pool->push(encrypted_packet); +#ifdef ANYTUN_MULTITHREAD + qp_plain.changeQueue(plain_out_queue); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught runtime_error: " << e.what(); @@ -433,6 +462,8 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla for(;;) { PlainPacket * plain_packet = NULL; plain_out_queue->pop(&plain_packet); + QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool); +#endif if((dev->getType() == TYPE_TUN && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN4 && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN6) || (dev->getType() == TYPE_TAP && plain_packet->getPayloadType() != PAYLOAD_TYPE_TAP)) { @@ -442,7 +473,6 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla // write it on the device dev->write(plain_packet->getPayload(), plain_packet->getLength()); - plain_memory_pool->push(plain_packet); } } catch(std::runtime_error& e) { cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught runtime_error: " << e.what(); @@ -452,107 +482,6 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla } -void receiver(TunDevice* dev, PacketSource* src) -{ - if(!dev || !src) { - cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null"; - return; - } - - try { - std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND)); - std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND)); - - uint32_t auth_tag_length = gOpt.getAuthTagLength(); - EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, auth_tag_length); - PlainPacket plain_packet(MAX_PACKET_LENGTH); - - for(;;) { - PacketSourceEndpoint remote_end; - - plain_packet.setLength(MAX_PACKET_LENGTH); - encrypted_packet.withAuthTag(false); - encrypted_packet.setLength(MAX_PACKET_LENGTH); - - // read packet from socket - int len; - try { - len = src->recv(encrypted_packet.getBuf(), encrypted_packet.getLength(), remote_end); - } catch(std::exception& /*e*/) { - //TODO: do something here - //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what(); - continue; - } - if(len < 0) { - continue; // silently ignore socket recv errors, this is probably no good idea... - } - - if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) { - continue; // ignore short packets - } - encrypted_packet.setLength(len); - - mux_t mux = encrypted_packet.getMux(); - // autodetect peer - if(gConnectionList.empty() && gOpt.getRemoteAddr() == "") { - cLog.msg(Log::PRIO_NOTICE) << "autodetected remote host " << remote_end; - createConnection(remote_end, gOpt.getSeqWindowSize(),mux); - } - - ConnectionMap::iterator cit = gConnectionList.getConnection(mux); - if(cit == gConnectionList.getEnd()) { - continue; - } - ConnectionParam& conn = cit->second; - - // check whether auth tag is ok or not - if(!a->checkTag(conn.kd_, encrypted_packet)) { - cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!"; - continue; - } - - // Replay Protection - if(conn.seq_window_.checkAndAdd(encrypted_packet.getSenderId(), encrypted_packet.getSeqNr())) { - cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_ - << " seq:"<< encrypted_packet.getSeqNr() << " sid: "<< encrypted_packet.getSenderId(); - continue; - } - - //Allow dynamic IP changes - //TODO: add command line option to turn this off - if(remote_end != conn.remote_end_) { - cLog.msg(Log::PRIO_NOTICE) << "connection "<< mux << " autodetected remote host ip changed " << remote_end; - conn.remote_end_=remote_end; -#ifndef ANYTUN_NOSYNC - SyncCommand sc(gConnectionList,mux); - gSyncQueue.push(sc); -#endif - } - // ignore zero length packets - if(encrypted_packet.getPayloadLength() <= PlainPacket::getHeaderLength()) { - continue; - } - - // decrypt packet - c->decrypt(conn.kd_, encrypted_packet, plain_packet); - - // check payload_type - if((dev->getType() == TYPE_TUN && plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN4 && - plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN6) || - (dev->getType() == TYPE_TAP && plain_packet.getPayloadType() != PAYLOAD_TYPE_TAP)) { - continue; - } - - // write it on the device - dev->write(plain_packet.getPayload(), plain_packet.getLength()); - } - } catch(std::runtime_error& e) { - cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught runtime_error: " << e.what(); - } catch(std::exception& e) { - cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught exception: " << e.what(); - } -} - void startSendRecvThreads(TunDevice* dev, PacketSource* src) { unsigned int nthreads = boost::thread::hardware_concurrency(); @@ -560,7 +489,7 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) nthreads = 4; src->waitUntilReady(); - +#ifdef ANYTUN_MULTITHREAD uint16_t plain_in_queue_length = 2 * nthreads + 4; Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length); @@ -600,6 +529,10 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) boost::thread(boost::bind(threadReadEncryptedInQueueAndWritePlainOutQueue, encrypted_in_queue, encrypted_memory_pool, plain_out_queue, plain_memory_pool)); // } boost::thread(boost::bind(threadWriteToTunDevice, plain_out_queue, plain_memory_pool, dev)); +#else + boost::thread(boost::bind(sender, dev, src)); + boost::thread(boost::bind(receiver, dev, src)); +#endif } |