summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/anytun.cpp98
-rw-r--r--src/channel.hpp16
2 files changed, 74 insertions, 40 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 9e97e5a..10dca8d 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -144,47 +144,72 @@ void syncListener()
}
#endif
-void sender(TunDevice* dev, PacketSource* src)
+// Start 1 Thread per Tun device
+void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool )
{
- if(!dev || !src) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died because either dev or src pointer is null";
+ if(!dev || !plain_in_queue || ! plain_memory_pool) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToEncryptedQueue died because either dev or plain_in_queue or plain_memory_pool, pointer is null";
return;
}
-
try {
- std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
- std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
-
- PlainPacket plain_packet(MAX_PACKET_LENGTH);
- EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
-
- uint16_t mux = gOpt.getMux();
- PacketSourceEndpoint emptyEndpoint;
for(;;) {
- plain_packet.setLength(MAX_PACKET_LENGTH);
- encrypted_packet.withAuthTag(false);
- encrypted_packet.setLength(MAX_PACKET_LENGTH);
+// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop";
+ PlainPacket * plain_packet = NULL;
+ plain_memory_pool->pop(&plain_packet);
+// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop";
// read packet from device
- int len = dev->read(plain_packet.getPayload(), plain_packet.getPayloadLength());
+ int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength());
if(len < 0) {
+ plain_memory_pool->push(plain_packet);
continue; // silently ignore device read errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < PlainPacket::getHeaderLength()) {
+ plain_memory_pool->push(plain_packet);
continue; // ignore short packets
}
- plain_packet.setPayloadLength(len);
+ plain_packet->setPayloadLength(len);
// set payload type
if(dev->getType() == TYPE_TUN) {
- plain_packet.setPayloadType(PAYLOAD_TYPE_TUN);
+ plain_packet->setPayloadType(PAYLOAD_TYPE_TUN);
} else if(dev->getType() == TYPE_TAP) {
- plain_packet.setPayloadType(PAYLOAD_TYPE_TAP);
+ plain_packet->setPayloadType(PAYLOAD_TYPE_TAP);
} else {
- plain_packet.setPayloadType(0);
+ plain_packet->setPayloadType(0);
}
+ plain_in_queue->push(plain_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught exception: " << e.what();
+ }
+}
+
+void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool, PacketSource* src)
+{
+ if(!plain_in_queue || !plain_memory_pool || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or src pointer is null";
+ return;
+ }
+
+ try {
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
+
+ EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
+
+ uint16_t mux = gOpt.getMux();
+ PacketSourceEndpoint emptyEndpoint;
+ for(;;) {
+ PlainPacket * plain_packet = NULL;
+ plain_in_queue->pop(&plain_packet);
+ encrypted_packet.withAuthTag(false);
+ encrypted_packet.setLength(MAX_PACKET_LENGTH);
if(gConnectionList.empty()) {
+ plain_memory_pool->push(plain_packet);
continue;
}
//std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString();
@@ -192,10 +217,13 @@ void sender(TunDevice* dev, PacketSource* src)
#ifndef NO_ROUTING
if(!disableRouting)
try {
- mux = gRoutingTable.getRoute(plain_packet.getDstAddr());
+ mux = gRoutingTable.getRoute(plain_packet->getDstAddr());
//std::cout << " -> "<<mux << std::endl;
cit = gConnectionList.getConnection(mux);
- } catch(std::exception&) { continue; } // no route
+ } catch(std::exception&) {
+ plain_memory_pool->push(plain_packet);
+ continue;
+ } // no route
else {
cit = gConnectionList.getBegin();
}
@@ -204,17 +232,19 @@ void sender(TunDevice* dev, PacketSource* src)
#endif
if(cit==gConnectionList.getEnd()) {
+ plain_memory_pool->push(plain_packet);
continue; //no connection
}
ConnectionParam& conn = cit->second;
if(conn.remote_end_ == emptyEndpoint) {
//cLog.msg(Log::PRIO_INFO) << "no remote address set";
+ plain_memory_pool->push(plain_packet);
continue;
}
// encrypt packet
- c->encrypt(conn.kd_, plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
+ c->encrypt(conn.kd_, *plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
encrypted_packet.setHeader(conn.seq_nr_, gOpt.getSenderId(), mux);
conn.seq_nr_++;
@@ -228,11 +258,12 @@ void sender(TunDevice* dev, PacketSource* src)
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what();
}
+ plain_memory_pool->push(plain_packet);
}
} catch(std::runtime_error& e) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught runtime_error: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught runtime_error: " << e.what();
} catch(std::exception& e) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught exception: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught exception: " << e.what();
}
}
@@ -340,13 +371,16 @@ void receiver(TunDevice* dev, PacketSource* src)
void startSendRecvThreads(TunDevice* dev, PacketSource* src)
{
src->waitUntilReady();
- //channel<int> a(1);
- //a.push(23);
- //a.push(23);
- //int i=0;
- //a.pop(i);
- //std::cout << i << std::endl;
- boost::thread(boost::bind(sender, dev, src));
+ uint16_t plain_in_queue_length = 10;
+ Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
+
+ uint16_t plain_memory_pool_length = plain_in_queue_length + 1;
+ Channel<PlainPacket*>* plain_memory_pool = new Channel<PlainPacket*>(plain_memory_pool_length);
+ for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) {
+ plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH));
+ }
+ boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool));
+ boost::thread(boost::bind(threadReadPlainInQueueAndWriteToSocket, plain_in_queue, plain_memory_pool, src));
boost::thread(boost::bind(receiver, dev, src));
}
diff --git a/src/channel.hpp b/src/channel.hpp
index 8c69f19..8441750 100644
--- a/src/channel.hpp
+++ b/src/channel.hpp
@@ -50,7 +50,7 @@
#include <boost/circular_buffer.hpp>
template<typename T>
-class channel
+class Channel
{
private:
boost::mutex mtx_;
@@ -61,24 +61,24 @@ private:
boost::lock_guard<boost::mutex> guard(mtx_);
cb_.push_back(t);
}
- void pop_cb(T & ret) {
+ void pop_cb(T * ret) {
boost::lock_guard<boost::mutex> guard(mtx_);
- ret = cb_[0];
+ *ret = cb_[0];
cb_.pop_front();
}
public:
- channel(channel const &) = delete;
-// channel(channel &&) = delete;
- channel& operator=(const channel &) = delete;
- channel(unsigned int num_elements=10)
+ Channel(Channel const &) = delete;
+// Channel(Channel &&) = delete;
+ Channel& operator=(const Channel &) = delete;
+ Channel(unsigned int num_elements=10)
:cb_(num_elements),sem_read_(0),sem_write_(num_elements) {};
void push(T const & t ) {
sem_write_.down();
this->push_cb(t);
sem_read_.up();
}
- void pop(T & ret) {
+ void pop(T * ret) {
sem_read_.down();
this->pop_cb(ret);
sem_write_.up();