summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOthmar Gsenger <otti@anytun.org>2014-09-01 21:10:57 +0000
committerOthmar Gsenger <otti@anytun.org>2014-09-01 21:10:57 +0000
commitfc927f97f7060d638fa4e61a9331df49ceef143f (patch)
tree05236c6a1fd463012e4a617a88ba859141547607
parentnew threading working, but only one crypto thread per direction possible yet (diff)
added QueuePusher, added ANYTUN_MULTITHREAD comile option
-rw-r--r--src/anytun.cpp203
-rw-r--r--src/channel.hpp3
-rwxr-xr-xsrc/configure14
-rw-r--r--src/queuePusher.hpp80
4 files changed, 162 insertions, 138 deletions
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 8cb9c43..cb3c3ed 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -92,9 +92,13 @@
#endif
#include "cryptinit.hpp"
-#include "channel.hpp"
#include "sysExec.h"
+#ifdef ANYTUN_MULTITHREAD
+#include "channel.hpp"
+#include "queuePusher.hpp"
+#endif
+
bool disableRouting = false;
void createConnection(const PacketSourceEndpoint& remote_end, window_size_t seqSize, mux_t mux)
@@ -145,6 +149,21 @@ void syncListener()
}
#endif
+#ifndef ANYTUN_MULTITHREAD
+void sender(TunDevice* dev, PacketSource* src)
+{
+ if(!dev || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "sender died because either dev or src is null";
+ return;
+ }
+ PlainPacket * plain_packet = new PlainPacket(MAX_PACKET_LENGTH);
+ EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, gOpt.getAuthTagLength());
+ cLog.msg(Log::PRIO_DEBUG) << "started sender";
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_OUTBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_OUTBOUND));
+ uint16_t mux = gOpt.getMux();
+ PacketSourceEndpoint emptyEndpoint;
+#else
// Start 1 Thread per Tun device
void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPacket*>* plain_in_queue, Channel<PlainPacket*>* plain_memory_pool )
{
@@ -153,23 +172,23 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack
return;
}
cLog.msg(Log::PRIO_DEBUG) << "started threadReadTunDeviceAndWriteToPlainInQueue";
+#endif
try {
for(;;) {
-// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue loop";
+#ifdef ANYTUN_MULTITHREAD
PlainPacket * plain_packet = NULL;
plain_memory_pool->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
+#endif
plain_packet->setLength(MAX_PACKET_LENGTH);
-// cLog.msg(Log::PRIO_DEBUG) << "threadReadTunDeviceAndWriteToPlainInQueue after pop";
// read packet from device
int len = dev->read(plain_packet->getPayload(), plain_packet->getPayloadLength());
if(len < 0) {
- plain_memory_pool->push(plain_packet);
continue; // silently ignore device read errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < PlainPacket::getHeaderLength()) {
- plain_memory_pool->push(plain_packet);
continue; // ignore short packets
}
plain_packet->setPayloadLength(len);
@@ -181,7 +200,8 @@ void threadReadTunDeviceAndWriteToPlainInQueue(TunDevice* dev, Channel<PlainPack
} else {
plain_packet->setPayloadType(0);
}
- plain_in_queue->push(plain_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_plain.changeQueue(plain_in_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadTunDeviceAndWriteToPlainInQueue died due to an uncaught runtime_error: " << e.what();
@@ -208,14 +228,15 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
for(;;) {
PlainPacket * plain_packet = NULL;
plain_in_queue->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
EncryptedPacket * encrypted_packet = NULL;
encrypted_memory_pool->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
+#endif
encrypted_packet->withAuthTag(false);
encrypted_packet->setLength(MAX_PACKET_LENGTH);
if(gConnectionList.empty()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
//std::cout << "got Packet for plain "<<plain_packet.getDstAddr().toString();
@@ -227,8 +248,6 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
//std::cout << " -> "<<mux << std::endl;
cit = gConnectionList.getConnection(mux);
} catch(std::exception&) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
} // no route
else {
@@ -239,16 +258,12 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
#endif
if(cit==gConnectionList.getEnd()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue; //no connection
}
ConnectionParam& conn = cit->second;
if(conn.remote_end_ == emptyEndpoint) {
//cLog.msg(Log::PRIO_INFO) << "no remote address set";
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
@@ -262,8 +277,8 @@ void threadReadPlainInQueueAndWriteEncryptedOutQueue(Channel<PlainPacket*>* plai
a->generate(conn.kd_, *encrypted_packet);
encrypted_packet->setEndpoint(conn.remote_end_);
- plain_memory_pool->push(plain_packet);
- encrypted_out_queue->push(encrypted_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_encrypted.changeQueue(encrypted_out_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadPlainInQueueAndWriteEncryptedOutQueue died due to an uncaught runtime_error: " << e.what();
@@ -283,13 +298,14 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel
for(;;) {
EncryptedPacket * encrypted_packet = NULL;
encrypted_out_queue->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
+#endif
try {
src->send(encrypted_packet->getBuf(), encrypted_packet->getLength(), encrypted_packet->getEndpoint());
} catch(std::exception& /*e*/) {
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not send data: " << e.what();
}
- encrypted_memory_pool->push(encrypted_packet);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadWriteToSocket died due to an uncaught runtime_error: " << e.what();
@@ -298,6 +314,25 @@ void threadWriteToSocket(Channel<EncryptedPacket*>* encrypted_out_queue, Channel
}
}
+#ifndef ANYTUN_MULTITHREAD
+void receiver(TunDevice* dev, PacketSource* src)
+{
+ if(!dev || !src) {
+ cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null";
+ return;
+ }
+
+ try {
+ std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND));
+ std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND));
+
+ uint32_t auth_tag_length = gOpt.getAuthTagLength();
+ EncryptedPacket * encrypted_packet = new EncryptedPacket(MAX_PACKET_LENGTH, auth_tag_length);
+ PlainPacket * plain_packet= new PlainPacket(MAX_PACKET_LENGTH);
+
+ for(;;) {
+ PacketSourceEndpoint remote_end;
+#else
void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<EncryptedPacket*>* encrypted_in_queue, Channel<EncryptedPacket*>* encrypted_memory_pool)
{
if(!src || ! encrypted_in_queue || ! encrypted_memory_pool) {
@@ -311,9 +346,10 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<Encry
for(;;) {
EncryptedPacket * encrypted_packet = NULL;
encrypted_memory_pool->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
encrypted_packet->setLength(MAX_PACKET_LENGTH);
PacketSourceEndpoint remote_end;
-
+#endif
encrypted_packet->withAuthTag(false);
encrypted_packet->setLength(MAX_PACKET_LENGTH);
@@ -324,21 +360,19 @@ void threadReadSocketAndWriteToEncryptedInQueue( PacketSource*src, Channel<Encry
} catch(std::exception& /*e*/) {
//TODO: do something here
//cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what();
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
if(len < 0) {
- encrypted_memory_pool->push(encrypted_packet);
continue; // silently ignore socket recv errors, this is probably no good idea...
}
if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) {
- encrypted_memory_pool->push(encrypted_packet);
continue; // ignore short packets
}
encrypted_packet->setLength(len);
encrypted_packet->setEndpoint(remote_end);
- encrypted_in_queue->push(encrypted_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_encrypted.changeQueue(encrypted_in_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadSocketAndWriteToEncryptedInQueue thread died due to an uncaught runtime_error: " << e.what();
@@ -360,9 +394,12 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
for(;;) {
PlainPacket * plain_packet = NULL;
plain_memory_pool->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
plain_packet->setLength(MAX_PACKET_LENGTH);
EncryptedPacket * encrypted_packet = NULL;
encrypted_in_queue->pop(&encrypted_packet);
+ QueuePusher<EncryptedPacket> qp_encrypted(encrypted_packet,encrypted_memory_pool);
+#endif
PacketSourceEndpoint remote_end = encrypted_packet->getEndpoint();
mux_t mux = encrypted_packet->getMux();
// autodetect peer
@@ -373,8 +410,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
ConnectionMap::iterator cit = gConnectionList.getConnection(mux);
if(cit == gConnectionList.getEnd()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
ConnectionParam& conn = cit->second;
@@ -382,8 +417,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
// check whether auth tag is ok or not
if(!a->checkTag(conn.kd_, *encrypted_packet)) {
cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!";
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
@@ -391,8 +424,6 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
if(conn.seq_window_.checkAndAdd(encrypted_packet->getSenderId(), encrypted_packet->getSeqNr())) {
cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_
<< " seq:"<< encrypted_packet->getSeqNr() << " sid: "<< encrypted_packet->getSenderId();
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
@@ -408,16 +439,14 @@ void threadReadEncryptedInQueueAndWritePlainOutQueue(Channel<EncryptedPacket*>*
}
// ignore zero length packets
if(encrypted_packet->getPayloadLength() <= PlainPacket::getHeaderLength()) {
- plain_memory_pool->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
continue;
}
// decrypt packet
c->decrypt(conn.kd_, *encrypted_packet, *plain_packet);
- plain_out_queue->push(plain_packet);
- encrypted_memory_pool->push(encrypted_packet);
+#ifdef ANYTUN_MULTITHREAD
+ qp_plain.changeQueue(plain_out_queue);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadReadEncryptedInQueueAndWritePlainOutQueue thread died due to an uncaught runtime_error: " << e.what();
@@ -433,6 +462,8 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla
for(;;) {
PlainPacket * plain_packet = NULL;
plain_out_queue->pop(&plain_packet);
+ QueuePusher<PlainPacket> qp_plain(plain_packet,plain_memory_pool);
+#endif
if((dev->getType() == TYPE_TUN && plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN4 &&
plain_packet->getPayloadType() != PAYLOAD_TYPE_TUN6) ||
(dev->getType() == TYPE_TAP && plain_packet->getPayloadType() != PAYLOAD_TYPE_TAP)) {
@@ -442,7 +473,6 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla
// write it on the device
dev->write(plain_packet->getPayload(), plain_packet->getLength());
- plain_memory_pool->push(plain_packet);
}
} catch(std::runtime_error& e) {
cLog.msg(Log::PRIO_ERROR) << "threadWriteToTunDevice thread died due to an uncaught runtime_error: " << e.what();
@@ -452,107 +482,6 @@ void threadWriteToTunDevice( Channel<PlainPacket*>* plain_out_queue, Channel<Pla
}
-void receiver(TunDevice* dev, PacketSource* src)
-{
- if(!dev || !src) {
- cLog.msg(Log::PRIO_ERROR) << "receiver thread died because either dev or src pointer is null";
- return;
- }
-
- try {
- std::auto_ptr<Cipher> c(CipherFactory::create(gOpt.getCipher(), KD_INBOUND));
- std::auto_ptr<AuthAlgo> a(AuthAlgoFactory::create(gOpt.getAuthAlgo(), KD_INBOUND));
-
- uint32_t auth_tag_length = gOpt.getAuthTagLength();
- EncryptedPacket encrypted_packet(MAX_PACKET_LENGTH, auth_tag_length);
- PlainPacket plain_packet(MAX_PACKET_LENGTH);
-
- for(;;) {
- PacketSourceEndpoint remote_end;
-
- plain_packet.setLength(MAX_PACKET_LENGTH);
- encrypted_packet.withAuthTag(false);
- encrypted_packet.setLength(MAX_PACKET_LENGTH);
-
- // read packet from socket
- int len;
- try {
- len = src->recv(encrypted_packet.getBuf(), encrypted_packet.getLength(), remote_end);
- } catch(std::exception& /*e*/) {
- //TODO: do something here
- //cLog.msg(Log::PRIO_ERROR) << "could not recive packet "<< e.what();
- continue;
- }
- if(len < 0) {
- continue; // silently ignore socket recv errors, this is probably no good idea...
- }
-
- if(static_cast<uint32_t>(len) < (EncryptedPacket::getHeaderLength() + auth_tag_length)) {
- continue; // ignore short packets
- }
- encrypted_packet.setLength(len);
-
- mux_t mux = encrypted_packet.getMux();
- // autodetect peer
- if(gConnectionList.empty() && gOpt.getRemoteAddr() == "") {
- cLog.msg(Log::PRIO_NOTICE) << "autodetected remote host " << remote_end;
- createConnection(remote_end, gOpt.getSeqWindowSize(),mux);
- }
-
- ConnectionMap::iterator cit = gConnectionList.getConnection(mux);
- if(cit == gConnectionList.getEnd()) {
- continue;
- }
- ConnectionParam& conn = cit->second;
-
- // check whether auth tag is ok or not
- if(!a->checkTag(conn.kd_, encrypted_packet)) {
- cLog.msg(Log::PRIO_NOTICE) << "wrong Authentication Tag!";
- continue;
- }
-
- // Replay Protection
- if(conn.seq_window_.checkAndAdd(encrypted_packet.getSenderId(), encrypted_packet.getSeqNr())) {
- cLog.msg(Log::PRIO_NOTICE) << "Replay attack from " << conn.remote_end_
- << " seq:"<< encrypted_packet.getSeqNr() << " sid: "<< encrypted_packet.getSenderId();
- continue;
- }
-
- //Allow dynamic IP changes
- //TODO: add command line option to turn this off
- if(remote_end != conn.remote_end_) {
- cLog.msg(Log::PRIO_NOTICE) << "connection "<< mux << " autodetected remote host ip changed " << remote_end;
- conn.remote_end_=remote_end;
-#ifndef ANYTUN_NOSYNC
- SyncCommand sc(gConnectionList,mux);
- gSyncQueue.push(sc);
-#endif
- }
- // ignore zero length packets
- if(encrypted_packet.getPayloadLength() <= PlainPacket::getHeaderLength()) {
- continue;
- }
-
- // decrypt packet
- c->decrypt(conn.kd_, encrypted_packet, plain_packet);
-
- // check payload_type
- if((dev->getType() == TYPE_TUN && plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN4 &&
- plain_packet.getPayloadType() != PAYLOAD_TYPE_TUN6) ||
- (dev->getType() == TYPE_TAP && plain_packet.getPayloadType() != PAYLOAD_TYPE_TAP)) {
- continue;
- }
-
- // write it on the device
- dev->write(plain_packet.getPayload(), plain_packet.getLength());
- }
- } catch(std::runtime_error& e) {
- cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught runtime_error: " << e.what();
- } catch(std::exception& e) {
- cLog.msg(Log::PRIO_ERROR) << "receiver thread died due to an uncaught exception: " << e.what();
- }
-}
-
void startSendRecvThreads(TunDevice* dev, PacketSource* src)
{
unsigned int nthreads = boost::thread::hardware_concurrency();
@@ -560,7 +489,7 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
nthreads = 4;
src->waitUntilReady();
-
+#ifdef ANYTUN_MULTITHREAD
uint16_t plain_in_queue_length = 2 * nthreads + 4;
Channel<PlainPacket*>* plain_in_queue = new Channel<PlainPacket*>(plain_in_queue_length);
@@ -600,6 +529,10 @@ void startSendRecvThreads(TunDevice* dev, PacketSource* src)
boost::thread(boost::bind(threadReadEncryptedInQueueAndWritePlainOutQueue, encrypted_in_queue, encrypted_memory_pool, plain_out_queue, plain_memory_pool));
// }
boost::thread(boost::bind(threadWriteToTunDevice, plain_out_queue, plain_memory_pool, dev));
+#else
+ boost::thread(boost::bind(sender, dev, src));
+ boost::thread(boost::bind(receiver, dev, src));
+#endif
}
diff --git a/src/channel.hpp b/src/channel.hpp
index c30dd61..c2a1c66 100644
--- a/src/channel.hpp
+++ b/src/channel.hpp
@@ -62,9 +62,6 @@ class Channel
#endif
{
private:
-#ifdef BOOST_HAS_TRIVIAL_ASSIGN
- BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value));
-#endif
boost::mutex mtx_;
boost::circular_buffer<T> cb_;
Semaphore sem_read_, sem_write_;
diff --git a/src/configure b/src/configure
index f070150..ff6ba01 100755
--- a/src/configure
+++ b/src/configure
@@ -64,6 +64,7 @@ MANDIR=''
INSTALLMANPAGE=1
EXAMPLESDIR=''
INSTALLEXAMPLES=1
+MULTITHREAD=0
BOOST_PREFIX=''
GCRYPT_PREFIX=''
@@ -89,6 +90,8 @@ print_usage() {
echo " --enable-passphrase enable master key and salt passphrase"
echo " --disable-routing disable built-in routing capability"
echo " --enable-routing enable built-in routing capability"
+ echo " --enable-new-multithread enable new experimental multithreading"
+ echo " --disable-new-multithread disable new experimental multithreading"
echo " --cross-prefix=<PREFIX> add PREFIX to compiler calls"
echo " --with-boost=<PREFIX> don't use systemwide boost"
echo " --with-gcrypt=<PREFIX> don't use systemwide gcrypt"
@@ -154,6 +157,12 @@ do
--disable-routing)
ROUTING=0
;;
+ --enable-new-multithread)
+ MULTITHREAD=1
+ ;;
+ --disable-new-multithread)
+ MULTITHREAD=0
+ ;;
--ebuild-compat)
EBUILD_COMPAT=1
;;
@@ -300,6 +309,11 @@ if [ $ROUTING -eq 0 ]; then
echo "disabling built-in routing capability"
fi
+if [ $MULTITHREAD -eq 1 ]; then
+ CXXFLAGS=$CXXFLAGS' -DANYTUN_MULTITHREAD'
+ echo "enabling new experimental mutlithread"
+fi
+
if [ -z "$BINDIR" ]; then
BINDIR=$PREFIX/bin
fi
diff --git a/src/queuePusher.hpp b/src/queuePusher.hpp
new file mode 100644
index 0000000..7e960db
--- /dev/null
+++ b/src/queuePusher.hpp
@@ -0,0 +1,80 @@
+/*
+ * anytun
+ *
+ * The secure anycast tunneling protocol (satp) defines a protocol used
+ * for communication between any combination of unicast and anycast
+ * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
+ * mode and allows tunneling of every ETHER TYPE protocol (e.g.
+ * ethernet, ip, arp ...). satp directly includes cryptography and
+ * message authentication based on the methods used by SRTP. It is
+ * intended to deliver a generic, scaleable and secure solution for
+ * tunneling and relaying of packets of any protocol.
+ *
+ *
+ * Copyright (C) 2007-2014 Markus Grüneis, Othmar Gsenger, Erwin Nindl,
+ * Christian Pointner <satp@wirdorange.org>
+ *
+ * This file is part of Anytun.
+ *
+ * Anytun is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * Anytun is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Anytun. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link the code of portions of this program with the
+ * OpenSSL library under certain conditions as described in each
+ * individual source file, and distribute linked combinations
+ * including the two.
+ * You must obey the GNU General Public License in all respects
+ * for all of the code used other than OpenSSL. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you
+ * do not wish to do so, delete this exception statement from your
+ * version. If you delete this exception statement from all source
+ * files in the program, then also delete it here.
+ */
+
+#ifndef ANYTUN_queue_pusher_hpp_INCLUDED
+#define ANYTUN_queue_pusher_hpp_INCLUDED
+
+#ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS
+#include <boost/noncopyable.hpp>
+#endif
+
+template<typename T>
+class QueuePusher
+#ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS
+ : boost::noncopyable
+#endif
+{
+private:
+ T * object_;
+ Channel<T *> * queue_;
+
+public:
+#ifndef BOOST_NO_CXX11_DELETED_FUNCTIONS
+ QueuePusher(QueuePusher const &) = delete;
+ QueuePusher(QueuePusher &&) = delete;
+ QueuePusher& operator=(const QueuePusher &) = delete;
+#endif
+ void changeQueue( Channel<T *> * queue ) {
+ queue_=queue;
+ }
+ QueuePusher(T * object,Channel<T *> * queue)
+ :object_(object),queue_(queue) {};
+ ~QueuePusher() {
+ if(queue_ && object_)
+ queue_->push(object_);
+ }
+};
+
+#endif