From 32bd6f8f365d47ad1c54931a1c58f7830927016a Mon Sep 17 00:00:00 2001 From: Othmar Gsenger Date: Thu, 28 Aug 2014 20:15:59 +0000 Subject: new threading model complete, but not working --- src/anytun.cpp | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 171 insertions(+), 6 deletions(-) diff --git a/src/anytun.cpp b/src/anytun.cpp index 36d785e..6fec2d7 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -297,6 +297,159 @@ void threadWriteToSocket(Channel* encrypted_out_queue, Channel } } +void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel* encrypted_in_queue, Channel* encrypted_memory_pool) +{ + if(!src || ! encrypted_in_queue || ! encrypted_memory_pool) { + cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either src or encrypted_in_queue encrypted_memory_pool pointer is null"; + return; + } + + try { + uint32_t auth_tag_length = gOpt.getAuthTagLength(); + + for(;;) { + EncryptedPacket * encrypted_packet = NULL; + encrypted_memory_pool->pop(&encrypted_packet); + PacketSourceEndpoint remote_end; + + encrypted_packet->withAuthTag(false); + encrypted_packet->setLength(MAX_PACKET_LENGTH); + + // read packet from socket + int len; + try { + len = src->recv(encrypted_packet->getBuf(), encrypted_packet->getLength(), remote_end); + } catch(std::exception& /*e*/) { + //TODO: do something here + //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what(); + encrypted_memory_pool->push(encrypted_packet); + continue; + } + if(len < 0) { + encrypted_memory_pool->push(encrypted_packet); + continue; // silently ignore socket recv errors, this is probably no good idea... + } + + if(static_cast(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) { + encrypted_memory_pool->push(encrypted_packet); + continue; // ignore short packets + } + encrypted_packet->setLength(len); + encrypted_packet->setEndpoint(remote_end); + encrypted_in_queue->push(encrypted_packet); + } + } catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught runtime_error: " << e.what(); + } catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught exception: " << e.what(); + } + +} + +void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel* encrypted_in_queue, Channel* encrypted_memory_pool, Channel* plain_out_queue, Channel* plain_memory_pool) +{ + if(!encrypted_in_queue || !encrypted_memory_pool || !plain_out_queue || !plain_memory_pool) { + cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either encrypted_in_queue or encrypted_memory_pool or plain_out_queue or plain_memory_pool pointer is null"; + return; + } + try { + std::auto_ptr c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND)); + std::auto_ptr a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND)); + for(;;) { + PlainPacket * plain_packet = NULL; + plain_memory_pool->pop(&plain_packet); + plain_packet->setLength(MAX_PACKET_LENGTH); + EncryptedPacket * encrypted_packet = NULL; + encrypted_in_queue->pop(&encrypted_packet); + PacketSourceEndpoint remote_end = encrypted_packet->getEndpoint(); + mux_t mux = encrypted_packet->getMux(); + // autodetect peer + if(gConnectionList.empty() && gOpt.getRemoteAddr() == "") { + cLog.msg(Log::PRIO_NOTICE) << "autodetected remote host " << remote_end; + createConnection(remote_end, gOpt.getSeqWindowSize(),mux); + } + + ConnectionMap::iterator cit = gConnectionList.getConnection(mux); + if(cit == gConnectionList.getEnd()) { + plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); + continue; + } + ConnectionParam& conn = cit->second; + + // check whether auth tag is ok or not + if(!a->checkTag(conn.kd_, *encrypted_packet)) { + cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!"; + plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); + continue; + } + + // Replay Protection + if(conn.seq_window_.checkAndAdd(encrypted_packet->getSenderId(), encrypted_packet->getSeqNr())) { + cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_ + << " seq:"<< encrypted_packet->getSeqNr() << " sid: "<< encrypted_packet->getSenderId(); + plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); + continue; + } + + //Allow dynamic IP changes + //TODO: add command line option to turn this off + if(remote_end != conn.remote_end_) { + cLog.msg(Log::PRIO_NOTICE) << "connection "<< mux << " autodetected remote host ip changed " << remote_end; + conn.remote_end_=remote_end; +#ifndef ANYTUN_NOSYNC + SyncCommand sc(gConnectionList,mux); + gSyncQueue.push(sc); +#endif + } + // ignore zero length packets + if(encrypted_packet->getPayloadLength() <= PlainPacket::getHeaderLength()) { + plain_memory_pool->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); + continue; + } + + // decrypt packet + c->decrypt(conn.kd_, *encrypted_packet, *plain_packet); + + plain_out_queue->push(plain_packet); + encrypted_memory_pool->push(encrypted_packet); + } + } catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught runtime_error: " << e.what(); + } catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught exception: " << e.what(); + } + +} + +void threadWriteToTunDevice( Channel* plain_out_queue, Channel* plain_memory_pool, TunDevice* dev) +{ + try { + for(;;) { + PlainPacket * plain_packet = NULL; + plain_out_queue->pop(&plain_packet); + if((dev->getType() == TYPE_TUN && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN4 && + plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN6) || + (dev->getType() == TYPE_TAP && plain_packet->getPayloadType() != PAYLOAD_TYPE_TAP)) { + plain_memory_pool->push(plain_packet); + continue; + } + + // write it on the device + dev->write(plain_packet->getPayload(), plain_packet->getLength()); + plain_memory_pool->push(plain_packet); + } + } catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught runtime_error: " << e.what(); + } catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught exception: " << e.what(); + } + +} + void receiver(TunDevice* dev, PacketSource* src) { if(!dev || !src) { @@ -409,8 +562,12 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) uint16_t plain_in_queue_length = 2 * nthreads + 4; Channel* plain_in_queue = new Channel(plain_in_queue_length); - uint16_t plain_memory_pool_length = plain_in_queue_length + 1; + uint16_t plain_out_queue_length = 2 * nthreads + 4; + Channel* plain_out_queue = new Channel(plain_out_queue_length); + + uint16_t plain_memory_pool_length = plain_in_queue_length + plain_out_queue_length + 1; Channel* plain_memory_pool = new Channel(plain_memory_pool_length); + for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) { plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH)); } @@ -418,7 +575,10 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) uint16_t encrypted_out_queue_length = plain_in_queue_length; Channel* encrypted_out_queue = new Channel(encrypted_out_queue_length); - uint16_t encrypted_memory_pool_length = encrypted_out_queue_length + 1; + uint16_t encrypted_in_queue_length = plain_in_queue_length; + Channel* encrypted_in_queue = new Channel(encrypted_in_queue_length); + + uint16_t encrypted_memory_pool_length = encrypted_out_queue_length + encrypted_in_queue_length + 1; Channel* encrypted_memory_pool = new Channel(encrypted_memory_pool_length); uint32_t auth_tag_length = gOpt.getAuthTagLength(); @@ -427,12 +587,17 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src) } boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool)); - for(uint16_t cnt=0; cnt < nthreads; cnt++) { +// for(uint16_t cnt=0; cnt < nthreads; cnt++) { boost::thread(boost::bind(threadReadPlainInQueueAndWriteEncryptedOutQueue, plain_in_queue, plain_memory_pool, encrypted_out_queue, encrypted_memory_pool)); - } - //threadWriteToSocket(Channel* encrypted_out_queue, Channel* encrypted_memory_pool, PacketSource* src) +// } boost::thread(boost::bind(threadWriteToSocket, encrypted_out_queue, encrypted_memory_pool, src)); - boost::thread(boost::bind(receiver, dev, src)); + + + boost::thread(boost::bind(threadReadSocketAndWriteToEncryptedInQueue, src, encrypted_in_queue, encrypted_memory_pool)); +// for(uint16_t cnt=0; cnt < nthreads; cnt++) { + boost::thread(boost::bind(threadReadEncryptedInQueueAndWritePlainOutQueue, encrypted_in_queue, encrypted_memory_pool, plain_out_queue, plain_memory_pool)); +// } + boost::thread(boost::bind(threadWriteToTunDevice, plain_out_queue, plain_memory_pool, dev)); } -- cgit v1.2.3