summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOthmar Gsenger <otti@anytun.org>2014-07-31 10:54:03 +0000
committerOthmar Gsenger <otti@anytun.org>2014-07-31 10:54:03 +0000
commit9cd05a9b74ee5b18fc12c5ee1cb5192337db6590 (patch)
tree96346343ee10d52d941da423c07f0d300ca4ba4f
parentadded a go style channel implementation. this might still need some performan... (diff)
added one new thread. memory pool still needs work. some kind of auto pointer returning memory would be nice. crypto needs bigger refactoring i'm afraid
-rw-r--r--src/anytun.cpp98
-rw-r--r--src/channel.hpp16
2 files changed, 74 insertions, 40 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 9e97e5a..10dca8d 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -144,47 +144,72 @@ void syncListener()
}
#endif
-void sender(TunDevice* dev, PacketSource* src)
+// Start 1 Thread per Tun device
+void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool )
{
- if(!dev || !src) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died because either dev or src pointer is null";
+ if(!dev || !plain_in_queue || ! plain_memory_pool) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToEncryptedQueue died because either dev or plain_in_queue or plain_memory_pool, pointer is null";
return;
}
-
try {
- std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
- std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
-
- PlainPacket plain_packet(MAX_PACKET_LENGTH);
- EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
-
- uint16_t mux = gOpt.getMux();
- PacketSourceEndpoint emptyEndpoint;
for(;;) {
- plain_packet.setLength(MAX_PACKET_LENGTH);
- encrypted_packet.withAuthTag(false);
- encrypted_packet.setLength(MAX_PACKET_LENGTH);
+// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop";
+ PlainPacket * plain_packet = NULL;
+ plain_memory_pool->pop(&plain_packet);
+// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop";
// read packet from device
- int len = dev->read(plain_packet.getPayload(), plain_packet.getPayloadLength());
+ int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength());
if(len < 0) {
+ plain_memory_pool->push(plain_packet);
continue; // silently ignore device read errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < PlainPacket::getHeaderLength()) {
+ plain_memory_pool->push(plain_packet);
continue; // ignore short packets
}
- plain_packet.setPayloadLength(len);
+ plain_packet->setPayloadLength(len);
// set payload type
if(dev->getType() == TYPE_TUN) {
- plain_packet.setPayloadType(PAYLOAD_TYPE_TUN);
+ plain_packet->setPayloadType(PAYLOAD_TYPE_TUN);
} else if(dev->getType() == TYPE_TAP) {
- plain_packet.setPayloadType(PAYLOAD_TYPE_TAP);
+ plain_packet->setPayloadType(PAYLOAD_TYPE_TAP);
} else {
- plain_packet.setPayloadType(0);
+ plain_packet->setPayloadType(0);
}
+ plain_in_queue->push(plain_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught exception: " << e.what();
+ }
+}
+
+void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool, PacketSource* src)
+{
+ if(!plain_in_queue || !plain_memory_pool || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or src pointer is null";
+ return;
+ }
+
+ try {
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
+
+ EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
+
+ uint16_t mux = gOpt.getMux();
+ PacketSourceEndpoint emptyEndpoint;
+ for(;;) {
+ PlainPacket * plain_packet = NULL;
+ plain_in_queue->pop(&plain_packet);
+ encrypted_packet.withAuthTag(false);
+ encrypted_packet.setLength(MAX_PACKET_LENGTH);
if(gConnectionList.empty()) {
+ plain_memory_pool->push(plain_packet);
continue;
}
//std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString();
@@ -192,10 +217,13 @@ void sender(TunDevice* dev, PacketSource* src)
#ifndef NO_ROUTING
if(!disableRouting)
try {
- mux = gRoutingTable.getRoute(plain_packet.getDstAddr());
+ mux = gRoutingTable.getRoute(plain_packet->getDstAddr());
//std::cout << " -> "<<mux << std::endl;
cit = gConnectionList.getConnection(mux);
- } catch(std::exception&) { continue; } // no route
+ } catch(std::exception&) {
+ plain_memory_pool->push(plain_packet);
+ continue;
+ } // no route
else {
cit = gConnectionList.getBegin();
}
@@ -204,17 +232,19 @@ void sender(TunDevice* dev, PacketSource* src)
#endif
if(cit==gConnectionList.getEnd()) {
+ plain_memory_pool->push(plain_packet);
continue; //no connection
}
ConnectionParam& conn = cit->second;
if(conn.remote_end_ == emptyEndpoint) {
//cLog.msg(Log::PRIO_INFO) << "no remote address set";
+ plain_memory_pool->push(plain_packet);
continue;
}
// encrypt packet
- c->encrypt(conn.kd_, plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
+ c->encrypt(conn.kd_, *plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
encrypted_packet.setHeader(conn.seq_nr_, gOpt.getSenderId(), mux);
conn.seq_nr_++;
@@ -228,11 +258,12 @@ void sender(TunDevice* dev, PacketSource* src)
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what();
}
+ plain_memory_pool->push(plain_packet);
}
} catch(std::runtime_error& e) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught runtime_error: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught runtime_error: " << e.what();
} catch(std::exception& e) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught exception: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught exception: " << e.what();
}
}
@@ -340,13 +371,16 @@ void receiver(TunDevice* dev, PacketSource* src)
void startSendRecvThreads(TunDevice* dev, PacketSource* src)
{
src->waitUntilReady();
- //channel<int> a(1);
- //a.push(23);
- //a.push(23);
- //int i=0;
- //a.pop(i);
- //std::cout << i << std::endl;
- boost::thread(boost::bind(sender, dev, src));
+ uint16_t plain_in_queue_length = 10;
+ Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
+
+ uint16_t plain_memory_pool_length = plain_in_queue_length + 1;
+ Channel<PlainPacket*>* plain_memory_pool = new Channel<PlainPacket*>(plain_memory_pool_length);
+ for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) {
+ plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH));
+ }
+ boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool));
+ boost::thread(boost::bind(threadReadPlainInQueueAndWriteToSocket, plain_in_queue, plain_memory_pool, src));
boost::thread(boost::bind(receiver, dev, src));
}
diff --git a/src/channel.hpp b/src/channel.hpp
index 8c69f19..8441750 100644
--- a/src/channel.hpp
+++ b/src/channel.hpp
@@ -50,7 +50,7 @@
#include <boost/circular_buffer.hpp>
template<typename T>
-class channel
+class Channel
{
private:
boost::mutex mtx_;
@@ -61,24 +61,24 @@ private:
boost::lock_guard<boost::mutex> guard(mtx_);
cb_.push_back(t);
}
- void pop_cb(T & ret) {
+ void pop_cb(T * ret) {
boost::lock_guard<boost::mutex> guard(mtx_);
- ret = cb_[0];
+ *ret = cb_[0];
cb_.pop_front();
}
public:
- channel(channel const &) = delete;
-// channel(channel &&) = delete;
- channel& operator=(const channel &) = delete;
- channel(unsigned int num_elements=10)
+ Channel(Channel const &) = delete;
+// Channel(Channel &&) = delete;
+ Channel& operator=(const Channel &) = delete;
+ Channel(unsigned int num_elements=10)
:cb_(num_elements),sem_read_(0),sem_write_(num_elements) {};
void push(T const & t ) {
sem_write_.down();
this->push_cb(t);
sem_read_.up();
}
- void pop(T & ret) {
+ void pop(T * ret) {
sem_read_.down();
this->pop_cb(ret);
sem_write_.up();