summaryrefslogtreecommitdiff
path: root/src/anytun.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/anytun.cpp')
-rw-r--r--src/anytun.cpp98
1 files changed, 66 insertions, 32 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 9e97e5a..10dca8d 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -144,47 +144,72 @@ void syncListener()
}
#endif
-void sender(TunDevice* dev, PacketSource* src)
+// Start 1 Thread per Tun device
+void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool )
{
- if(!dev || !src) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died because either dev or src pointer is null";
+ if(!dev || !plain_in_queue || ! plain_memory_pool) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToEncryptedQueue died because either dev or plain_in_queue or plain_memory_pool, pointer is null";
return;
}
-
try {
- std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
- std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
-
- PlainPacket plain_packet(MAX_PACKET_LENGTH);
- EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
-
- uint16_t mux = gOpt.getMux();
- PacketSourceEndpoint emptyEndpoint;
for(;;) {
- plain_packet.setLength(MAX_PACKET_LENGTH);
- encrypted_packet.withAuthTag(false);
- encrypted_packet.setLength(MAX_PACKET_LENGTH);
+// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop";
+ PlainPacket * plain_packet = NULL;
+ plain_memory_pool->pop(&plain_packet);
+// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop";
// read packet from device
- int len = dev->read(plain_packet.getPayload(), plain_packet.getPayloadLength());
+ int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength());
if(len < 0) {
+ plain_memory_pool->push(plain_packet);
continue; // silently ignore device read errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < PlainPacket::getHeaderLength()) {
+ plain_memory_pool->push(plain_packet);
continue; // ignore short packets
}
- plain_packet.setPayloadLength(len);
+ plain_packet->setPayloadLength(len);
// set payload type
if(dev->getType() == TYPE_TUN) {
- plain_packet.setPayloadType(PAYLOAD_TYPE_TUN);
+ plain_packet->setPayloadType(PAYLOAD_TYPE_TUN);
} else if(dev->getType() == TYPE_TAP) {
- plain_packet.setPayloadType(PAYLOAD_TYPE_TAP);
+ plain_packet->setPayloadType(PAYLOAD_TYPE_TAP);
} else {
- plain_packet.setPayloadType(0);
+ plain_packet->setPayloadType(0);
}
+ plain_in_queue->push(plain_packet);
+ }
+ } catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what();
+ } catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught exception: " << e.what();
+ }
+}
+
+void threadReadPlainInQueueAndWriteToSocket(Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool, PacketSource* src)
+{
+ if(!plain_in_queue || !plain_memory_pool || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "sender thread died because either plain_in_queue or plain_memory_pool or src pointer is null";
+ return;
+ }
+
+ try {
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
+
+ EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
+
+ uint16_t mux = gOpt.getMux();
+ PacketSourceEndpoint emptyEndpoint;
+ for(;;) {
+ PlainPacket * plain_packet = NULL;
+ plain_in_queue->pop(&plain_packet);
+ encrypted_packet.withAuthTag(false);
+ encrypted_packet.setLength(MAX_PACKET_LENGTH);
if(gConnectionList.empty()) {
+ plain_memory_pool->push(plain_packet);
continue;
}
//std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString();
@@ -192,10 +217,13 @@ void sender(TunDevice* dev, PacketSource* src)
#ifndef NO_ROUTING
if(!disableRouting)
try {
- mux = gRoutingTable.getRoute(plain_packet.getDstAddr());
+ mux = gRoutingTable.getRoute(plain_packet->getDstAddr());
//std::cout << " -> "<<mux << std::endl;
cit = gConnectionList.getConnection(mux);
- } catch(std::exception&) { continue; } // no route
+ } catch(std::exception&) {
+ plain_memory_pool->push(plain_packet);
+ continue;
+ } // no route
else {
cit = gConnectionList.getBegin();
}
@@ -204,17 +232,19 @@ void sender(TunDevice* dev, PacketSource* src)
#endif
if(cit==gConnectionList.getEnd()) {
+ plain_memory_pool->push(plain_packet);
continue; //no connection
}
ConnectionParam& conn = cit->second;
if(conn.remote_end_ == emptyEndpoint) {
//cLog.msg(Log::PRIO_INFO) << "no remote address set";
+ plain_memory_pool->push(plain_packet);
continue;
}
// encrypt packet
- c->encrypt(conn.kd_, plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
+ c->encrypt(conn.kd_, *plain_packet, encrypted_packet, conn.seq_nr_, gOpt.getSenderId(), mux);
encrypted_packet.setHeader(conn.seq_nr_, gOpt.getSenderId(), mux);
conn.seq_nr_++;
@@ -228,11 +258,12 @@ void sender(TunDevice* dev, PacketSource* src)
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what();
}
+ plain_memory_pool->push(plain_packet);
}
} catch(std::runtime_error& e) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught runtime_error: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught runtime_error: " << e.what();
} catch(std::exception& e) {
- cLog.msg(Log::PRIO_ERROR) << "sender thread died due to an uncaught exception: " << e.what();
+ cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteToSocket died due to an uncaught exception: " << e.what();
}
}
@@ -340,13 +371,16 @@ void receiver(TunDevice* dev, PacketSource* src)
void startSendRecvThreads(TunDevice* dev, PacketSource* src)
{
src->waitUntilReady();
- //channel<int> a(1);
- //a.push(23);
- //a.push(23);
- //int i=0;
- //a.pop(i);
- //std::cout << i << std::endl;
- boost::thread(boost::bind(sender, dev, src));
+ uint16_t plain_in_queue_length = 10;
+ Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
+
+ uint16_t plain_memory_pool_length = plain_in_queue_length + 1;
+ Channel<PlainPacket*>* plain_memory_pool = new Channel<PlainPacket*>(plain_memory_pool_length);
+ for(uint16_t cnt=0; cnt < plain_memory_pool_length; cnt++) {
+ plain_memory_pool->push(new PlainPacket(MAX_PACKET_LENGTH));
+ }
+ boost::thread(boost::bind(threadReadTunDeviceAndWriteToPlainInQueue, dev, plain_in_queue, plain_memory_pool));
+ boost::thread(boost::bind(threadReadPlainInQueueAndWriteToSocket, plain_in_queue, plain_memory_pool, src));
boost::thread(boost::bind(receiver, dev, src));
}