summaryrefslogtreecommitdiff
path: root/src/anytun.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/anytun.cpp')
-rw-r--r--src/anytun.cpp203
1 files changed, 68 insertions, 135 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 8cb9c43..cb3c3ed 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -92,9 +92,13 @@
#endif
#include "cryptinit.hpp"
-#include "channel.hpp"
#include "sysExec.h"
+#ifdef ANYTUN_MULTITHREAD
+#include "channel.hpp"
+#include "queuePusher.hpp"
+#endif
+
bool disableRouting = false;
void createConnection(const PacketSourceEndpoint& remote_end, window_size_t seqSize, mux_t mux)
@@ -145,6 +149,21 @@ void syncListener()
}
#endif
+#ifndef ANYTUN_MULTITHREAD
+void sender(TunDevice* dev, PacketSource* src)
+{
+ if(!dev || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "sender died because either dev or src is null";
+ return;
+ }
+ PlainPacket * plain_packet = new PlainPacket(MAX_PACKET_LENGTH);
+ EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
+ cLog.msg(Log::PRIO_DEBUG) << "started sender";
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
+ uint16_t mux = gOpt.getMux();
+ PacketSourceEndpoint emptyEndpoint;
+#else
// Start 1 Thread per Tun device
void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool )
{
@@ -153,23 +172,23 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack
return;
}
cLog.msg(Log::PRIO_DEBUG) << "started threadReadTunDeviceAndWriteToPlainInQueue";
+#endif
try {
for(;;) {
-// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop";
+#ifdef ANYTUN_MULTITHREAD
PlainPacket * plain_packet = NULL;
plain_memory_pool->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
+#endif
plain_packet->setLength(MAX_PACKET_LENGTH);
-// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop";
// read packet from device
int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength());
if(len < 0) {
- plain_memory_pool->push(plain_packet);
continue; // silently ignore device read errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < PlainPacket::getHeaderLength()) {
- plain_memory_pool->push(plain_packet);
continue; // ignore short packets
}
plain_packet->setPayloadLength(len);
@@ -181,7 +200,8 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack
} else {
plain_packet->setPayloadType(0);
}
- plain_in_queue->push(plain_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_plain.changeQueue(plain_in_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what();
@@ -208,14 +228,15 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
for(;;) {
PlainPacket * plain_packet = NULL;
plain_in_queue->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
EncryptedPacket * encrypted_packet = NULL;
encrypted_memory_pool->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
+#endif
encrypted_packet->withAuthTag(false);
encrypted_packet->setLength(MAX_PACKET_LENGTH);
if(gConnectionList.empty()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
//std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString();
@@ -227,8 +248,6 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
//std::cout << " -> "<<mux << std::endl;
cit = gConnectionList.getConnection(mux);
} catch(std::exception&) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
} // no route
else {
@@ -239,16 +258,12 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
#endif
if(cit==gConnectionList.getEnd()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue; //no connection
}
ConnectionParam& conn = cit->second;
if(conn.remote_end_ == emptyEndpoint) {
//cLog.msg(Log::PRIO_INFO) << "no remote address set";
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
@@ -262,8 +277,8 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
a->generate(conn.kd_, *encrypted_packet);
encrypted_packet->setEndpoint(conn.remote_end_);
- plain_memory_pool->push(plain_packet);
- encrypted_out_queue->push(encrypted_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_encrypted.changeQueue(encrypted_out_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught runtime_error: " << e.what();
@@ -283,13 +298,14 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel
for(;;) {
EncryptedPacket * encrypted_packet = NULL;
encrypted_out_queue->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
+#endif
try {
src->send(encrypted_packet->getBuf(), encrypted_packet->getLength(), encrypted_packet->getEndpoint());
} catch(std::exception& /*e*/) {
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what();
}
- encrypted_memory_pool->push(encrypted_packet);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught runtime_error: " << e.what();
@@ -298,6 +314,25 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel
}
}
+#ifndef ANYTUN_MULTITHREAD
+void receiver(TunDevice* dev, PacketSource* src)
+{
+ if(!dev || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null";
+ return;
+ }
+
+ try {
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND));
+
+ uint32_t auth_tag_length = gOpt.getAuthTagLength();
+ EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, auth_tag_length);
+ PlainPacket * plain_packet= new PlainPacket(MAX_PACKET_LENGTH);
+
+ for(;;) {
+ PacketSourceEndpoint remote_end;
+#else
void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<EncryptedPacket*>* encrypted_in_queue, Channel<EncryptedPacket*>* encrypted_memory_pool)
{
if(!src || ! encrypted_in_queue || ! encrypted_memory_pool) {
@@ -311,9 +346,10 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<Encry
for(;;) {
EncryptedPacket * encrypted_packet = NULL;
encrypted_memory_pool->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
encrypted_packet->setLength(MAX_PACKET_LENGTH);
PacketSourceEndpoint remote_end;
-
+#endif
encrypted_packet->withAuthTag(false);
encrypted_packet->setLength(MAX_PACKET_LENGTH);
@@ -324,21 +360,19 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<Encry
} catch(std::exception& /*e*/) {
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what();
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
if(len < 0) {
- encrypted_memory_pool->push(encrypted_packet);
continue; // silently ignore socket recv errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) {
- encrypted_memory_pool->push(encrypted_packet);
continue; // ignore short packets
}
encrypted_packet->setLength(len);
encrypted_packet->setEndpoint(remote_end);
- encrypted_in_queue->push(encrypted_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_encrypted.changeQueue(encrypted_in_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught runtime_error: " << e.what();
@@ -360,9 +394,12 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
for(;;) {
PlainPacket * plain_packet = NULL;
plain_memory_pool->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
plain_packet->setLength(MAX_PACKET_LENGTH);
EncryptedPacket * encrypted_packet = NULL;
encrypted_in_queue->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
+#endif
PacketSourceEndpoint remote_end = encrypted_packet->getEndpoint();
mux_t mux = encrypted_packet->getMux();
// autodetect peer
@@ -373,8 +410,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
ConnectionMap::iterator cit = gConnectionList.getConnection(mux);
if(cit == gConnectionList.getEnd()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
ConnectionParam& conn = cit->second;
@@ -382,8 +417,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
// check whether auth tag is ok or not
if(!a->checkTag(conn.kd_, *encrypted_packet)) {
cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!";
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
@@ -391,8 +424,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
if(conn.seq_window_.checkAndAdd(encrypted_packet->getSenderId(), encrypted_packet->getSeqNr())) {
cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_
<< " seq:"<< encrypted_packet->getSeqNr() << " sid: "<< encrypted_packet->getSenderId();
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
@@ -408,16 +439,14 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
}
// ignore zero length packets
if(encrypted_packet->getPayloadLength() <= PlainPacket::getHeaderLength()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
// decrypt packet
c->decrypt(conn.kd_, *encrypted_packet, *plain_packet);
- plain_out_queue->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_plain.changeQueue(plain_out_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught runtime_error: " << e.what();
@@ -433,6 +462,8 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla
for(;;) {
PlainPacket * plain_packet = NULL;
plain_out_queue->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
+#endif
if((dev->getType() == TYPE_TUN && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN4 &&
plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN6) ||
(dev->getType() == TYPE_TAP && plain_packet->getPayloadType() != PAYLOAD_TYPE_TAP)) {
@@ -442,7 +473,6 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla
// write it on the device
dev->write(plain_packet->getPayload(), plain_packet->getLength());
- plain_memory_pool->push(plain_packet);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught runtime_error: " << e.what();
@@ -452,107 +482,6 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla
}
-void receiver(TunDevice* dev, PacketSource* src)
-{
- if(!dev || !src) {
- cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null";
- return;
- }
-
- try {
- std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND));
- std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND));
-
- uint32_t auth_tag_length = gOpt.getAuthTagLength();
- EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, auth_tag_length);
- PlainPacket plain_packet(MAX_PACKET_LENGTH);
-
- for(;;) {
- PacketSourceEndpoint remote_end;
-
- plain_packet.setLength(MAX_PACKET_LENGTH);
- encrypted_packet.withAuthTag(false);
- encrypted_packet.setLength(MAX_PACKET_LENGTH);
-
- // read packet from socket
- int len;
- try {
- len = src->recv(encrypted_packet.getBuf(), encrypted_packet.getLength(), remote_end);
- } catch(std::exception& /*e*/) {
- //TODO: do something here
- //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what();
- continue;
- }
- if(len < 0) {
- continue; // silently ignore socket recv errors, this is probably no good idea...
- }
-
- if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) {
- continue; // ignore short packets
- }
- encrypted_packet.setLength(len);
-
- mux_t mux = encrypted_packet.getMux();
- // autodetect peer
- if(gConnectionList.empty() && gOpt.getRemoteAddr() == "") {
- cLog.msg(Log::PRIO_NOTICE) << "autodetected remote host " << remote_end;
- createConnection(remote_end, gOpt.getSeqWindowSize(),mux);
- }
-
- ConnectionMap::iterator cit = gConnectionList.getConnection(mux);
- if(cit == gConnectionList.getEnd()) {
- continue;
- }
- ConnectionParam& conn = cit->second;
-
- // check whether auth tag is ok or not
- if(!a->checkTag(conn.kd_, encrypted_packet)) {
- cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!";
- continue;
- }
-
- // Replay Protection
- if(conn.seq_window_.checkAndAdd(encrypted_packet.getSenderId(), encrypted_packet.getSeqNr())) {
- cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_
- << " seq:"<< encrypted_packet.getSeqNr() << " sid: "<< encrypted_packet.getSenderId();
- continue;
- }
-
- //Allow dynamic IP changes
- //TODO: add command line option to turn this off
- if(remote_end != conn.remote_end_) {
- cLog.msg(Log::PRIO_NOTICE) << "connection "<< mux << " autodetected remote host ip changed " << remote_end;
- conn.remote_end_=remote_end;
-#ifndef ANYTUN_NOSYNC
- SyncCommand sc(gConnectionList,mux);
- gSyncQueue.push(sc);
-#endif
- }
- // ignore zero length packets
- if(encrypted_packet.getPayloadLength() <= PlainPacket::getHeaderLength()) {
- continue;
- }
-
- // decrypt packet
- c->decrypt(conn.kd_, encrypted_packet, plain_packet);
-
- // check payload_type
- if((dev->getType() == TYPE_TUN && plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN4 &&
- plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN6) ||
- (dev->getType() == TYPE_TAP && plain_packet.getPayloadType() != PAYLOAD_TYPE_TAP)) {
- continue;
- }
-
- // write it on the device
- dev->write(plain_packet.getPayload(), plain_packet.getLength());
- }
- } catch(std::runtime_error& e) {
- cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught runtime_error: " << e.what();
- } catch(std::exception& e) {
- cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught exception: " << e.what();
- }
-}
-
void startSendRecvThreads(TunDevice* dev, PacketSource* src)
{
unsigned int nthreads = boost::thread::hardware_concurrency();
@@ -560,7 +489,7 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
nthreads = 4;
src->waitUntilReady();
-
+#ifdef ANYTUN_MULTITHREAD
uint16_t plain_in_queue_length = 2 * nthreads + 4;
Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
@@ -600,6 +529,10 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
boost::thread(boost::bind(threadReadEncryptedInQueueAndWritePlainOutQueue, encrypted_in_queue, encrypted_memory_pool, plain_out_queue, plain_memory_pool));
// }
boost::thread(boost::bind(threadWriteToTunDevice, plain_out_queue, plain_memory_pool, dev));
+#else
+ boost::thread(boost::bind(sender, dev, src));
+ boost::thread(boost::bind(receiver, dev, src));
+#endif
}