summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOthmar Gsenger <otti@anytun.org>2014-08-28 20:15:59 +0000
committerOthmar Gsenger <otti@anytun.org>2014-08-28 20:15:59 +0000
commit32bd6f8f365d47ad1c54931a1c58f7830927016a (patch)
tree2dcd72aaed652dfaa40a5226cac3035643f414db
parentmultithreaded sender seems to work... (diff)
new threading model complete, but not working
-rw-r--r--src/anytun.cpp177
1 files changed, 171 insertions, 6 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 36d785e..6fec2d7 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -297,6 +297,159 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel
}
}
+void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<EncryptedPacket*>* encrypted_in_queue, Channel<EncryptedPacket*>* encrypted_memory_pool)
+{
+ if(!src || ! encrypted_in_queue || ! encrypted_memory_pool) {
+ cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either src or encrypted_in_queue encrypted_memory_pool pointer is null";
+ return;
+ }
+
+ try {
+ uint32_t auth_tag_length = gOpt.getAuthTagLength();
+
+ for(;;) {
+ EncryptedPacket * encrypted_packet = NULL;
+ encrypted_memory_pool->pop(&encrypted_packet);
+ PacketSourceEndpoint remote_end;
+
+ encrypted_packet->withAuthTag(false);
+ encrypted_packet->setLength(MAX_PACKET_LENGTH);
+
+ // read packet from socket
+ int len;
+ try {
+ len = src->recv(encrypted_packet->getBuf(), encrypted_packet->getLength(), remote_end);
+ } catch(std::exception& /*e*/) {
+ //TODO: do something here
+ //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what();
+ encrypted_memory_pool->push(encrypted_packet);
+ continue;
+ }
+ if(len < 0) {
+ encrypted_memory_pool->push(encrypted_packet);
+ continue; // silently ignore socket recv errors, this is probably no good idea...
+ }
+
+ if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) {
+ encrypted_memory_pool->push(encrypted_packet);
+ continue; // ignore short packets
+ }
+ encrypted_packet->setLength(len);
+ encrypted_packet->setEndpoint(remote_end);
+ encrypted_in_queue->push(encrypted_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught exception: " << e.what();
+ }
+
+}
+
+void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>* encrypted_in_queue, Channel<EncryptedPacket*>* encrypted_memory_pool, Channel<PlainPacket*>* plain_out_queue, Channel<PlainPacket*>* plain_memory_pool)
+{
+ if(!encrypted_in_queue || !encrypted_memory_pool || !plain_out_queue || !plain_memory_pool) {
+ cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either encrypted_in_queue or encrypted_memory_pool or plain_out_queue or plain_memory_pool pointer is null";
+ return;
+ }
+ try {
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND));
+ for(;;) {
+ PlainPacket * plain_packet = NULL;
+ plain_memory_pool->pop(&plain_packet);
+ plain_packet->setLength(MAX_PACKET_LENGTH);
+ EncryptedPacket * encrypted_packet = NULL;
+ encrypted_in_queue->pop(&encrypted_packet);
+ PacketSourceEndpoint remote_end = encrypted_packet->getEndpoint();
+ mux_t mux = encrypted_packet->getMux();
+ // autodetect peer
+ if(gConnectionList.empty() && gOpt.getRemoteAddr() == "") {
+ cLog.msg(Log::PRIO_NOTICE) << "autodetected remote host " << remote_end;
+ createConnection(remote_end, gOpt.getSeqWindowSize(),mux);
+ }
+
+ ConnectionMap::iterator cit = gConnectionList.getConnection(mux);
+ if(cit == gConnectionList.getEnd()) {
+ plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
+ continue;
+ }
+ ConnectionParam& conn = cit->second;
+
+ // check whether auth tag is ok or not
+ if(!a->checkTag(conn.kd_, *encrypted_packet)) {
+ cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!";
+ plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
+ continue;
+ }
+
+ // Replay Protection
+ if(conn.seq_window_.checkAndAdd(encrypted_packet->getSenderId(), encrypted_packet->getSeqNr())) {
+ cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_
+ << " seq:"<< encrypted_packet->getSeqNr() << " sid: "<< encrypted_packet->getSenderId();
+ plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
+ continue;
+ }
+
+ //Allow dynamic IP changes
+ //TODO: add command line option to turn this off
+ if(remote_end != conn.remote_end_) {
+ cLog.msg(Log::PRIO_NOTICE) << "connection "<< mux << " autodetected remote host ip changed " << remote_end;
+ conn.remote_end_=remote_end;
+#ifndef ANYTUN_NOSYNC
+ SyncCommand sc(gConnectionList,mux);
+ gSyncQueue.push(sc);
+#endif
+ }
+ // ignore zero length packets
+ if(encrypted_packet->getPayloadLength() <= PlainPacket::getHeaderLength()) {
+ plain_memory_pool->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
+ continue;
+ }
+
+ // decrypt packet
+ c->decrypt(conn.kd_, *encrypted_packet, *plain_packet);
+
+ plain_out_queue->push(plain_packet);
+ encrypted_memory_pool->push(encrypted_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught exception: " << e.what();
+ }
+
+}
+
+void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<PlainPacket*>* plain_memory_pool, TunDevice* dev)
+{
+ try {
+ for(;;) {
+ PlainPacket * plain_packet = NULL;
+ plain_out_queue->pop(&plain_packet);
+ if((dev->getType() == TYPE_TUN && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN4 &&
+ plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN6) ||
+ (dev->getType() == TYPE_TAP && plain_packet->getPayloadType() != PAYLOAD_TYPE_TAP)) {
+ plain_memory_pool->push(plain_packet);
+ continue;
+ }
+
+ // write it on the device
+ dev->write(plain_packet->getPayload(), plain_packet->getLength());
+ plain_memory_pool->push(plain_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught exception: " << e.what();
+ }
+
+}
+
void receiver(TunDevice* dev, PacketSource* src)
{
if(!dev || !src) {
@@ -409,8 +562,12 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
uint16_t plain_in_queue_length = 2 * nthreads + 4;
Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
- uint16_t plain_memory_pool_length = plain_in_queue_length + 1;
+ uint16_t plain_out_queue_length = 2 * nthreads + 4;
+ Channel<PlainPacket*>* plain_out_queue = new Channel<PlainPacket*>(plain_out_queue_length);
+
+ uint16_t plain_memory_pool_length = plain_in_queue_length + plain_out_queue_length + 1;
Channel<PlainPacket*>* plain_memory_pool = new Channel<PlainPacket*>(plain_memory_pool_length);
+
for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) {
plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH));
}
@@ -418,7 +575,10 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
uint16_t encrypted_out_queue_length = plain_in_queue_length;
Channel<EncryptedPacket*>* encrypted_out_queue = new Channel<EncryptedPacket*>(encrypted_out_queue_length);
- uint16_t encrypted_memory_pool_length = encrypted_out_queue_length + 1;
+ uint16_t encrypted_in_queue_length = plain_in_queue_length;
+ Channel<EncryptedPacket*>* encrypted_in_queue = new Channel<EncryptedPacket*>(encrypted_in_queue_length);
+
+ uint16_t encrypted_memory_pool_length = encrypted_out_queue_length + encrypted_in_queue_length + 1;
Channel<EncryptedPacket*>* encrypted_memory_pool = new Channel<EncryptedPacket*>(encrypted_memory_pool_length);
uint32_t auth_tag_length = gOpt.getAuthTagLength();
@@ -427,12 +587,17 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
}
boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool));
- for(uint16_t cnt=0; cnt < nthreads; cnt++) {
+// for(uint16_t cnt=0; cnt < nthreads; cnt++) {
boost::thread(boost::bind(threadReadPlainInQueueAndWriteEncryptedOutQueue, plain_in_queue, plain_memory_pool, encrypted_out_queue, encrypted_memory_pool));
- }
- //threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel<EncryptedPacket*>* encrypted_memory_pool, PacketSource* src)
+// }
boost::thread(boost::bind(threadWriteToSocket, encrypted_out_queue, encrypted_memory_pool, src));
- boost::thread(boost::bind(receiver, dev, src));
+
+
+ boost::thread(boost::bind(threadReadSocketAndWriteToEncryptedInQueue, src, encrypted_in_queue, encrypted_memory_pool));
+// for(uint16_t cnt=0; cnt < nthreads; cnt++) {
+ boost::thread(boost::bind(threadReadEncryptedInQueueAndWritePlainOutQueue, encrypted_in_queue, encrypted_memory_pool, plain_out_queue, plain_memory_pool));
+// }
+ boost::thread(boost::bind(threadWriteToTunDevice, plain_out_queue, plain_memory_pool, dev));
}