summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@anytun.org>2009-03-22 23:57:35 +0000
committerChristian Pointner <equinox@anytun.org>2009-03-22 23:57:35 +0000
commit623e15d35dffbda9f30d0d4aa7e42439723df31e (patch)
treee3487d30ec5c67a1d6cdf669fa51af5a903e127f
parentadded gResolver to anytun-config (diff)
added gResolver to SyncServer
some cleanup
-rw-r--r--src/anytun-controld.cpp76
-rw-r--r--src/anytun.cpp86
-rw-r--r--src/packetSource.cpp5
-rw-r--r--src/packetSource.h1
-rw-r--r--src/syncServer.cpp55
-rw-r--r--src/syncServer.h22
6 files changed, 101 insertions, 144 deletions
diff --git a/src/anytun-controld.cpp b/src/anytun-controld.cpp
index 30948c0..f32ec83 100644
--- a/src/anytun-controld.cpp
+++ b/src/anytun-controld.cpp
@@ -42,23 +42,22 @@
#include "log.h"
#include "signalController.h"
#include "options.h"
+#include "resolver.h"
#include "syncServer.h"
#include "daemon.hpp"
void syncOnConnect(SyncTcpConnection * connptr)
{
- std::ifstream file( gOpt.getFileName().c_str() );
- if( file.is_open() )
- {
- std::string line;
- while (! file.eof() )
- {
- getline (file,line);
- connptr->Send(line);
- }
- file.close();
- }
+ std::ifstream file(gOpt.getFileName().c_str());
+ if(file.is_open()) {
+ std::string line;
+ while (!file.eof()) {
+ getline (file,line);
+ connptr->Send(line);
+ }
+ file.close();
+ }
}
void syncListener()
@@ -66,30 +65,15 @@ void syncListener()
boost::asio::io_service io_service;
try
{
- SyncTcpConnection::proto::resolver resolver(io_service);
- SyncTcpConnection::proto::endpoint e;
- if(gOpt.getBindToAddr()!="")
- {
- SyncTcpConnection::proto::resolver::query query(gOpt.getBindToAddr(), gOpt.getBindToPort());
- e = *resolver.resolve(query);
- } else {
- SyncTcpConnection::proto::resolver::query query(gOpt.getBindToPort());
- e = *resolver.resolve(query);
- }
-
-
- SyncServer server(io_service,e);
- server.onConnect=boost::bind(syncOnConnect,_1);
- io_service.run();
+ SyncServer server(gOpt.getBindToAddr(), gOpt.getBindToPort(), boost::bind(syncOnConnect, _1));
+ server.run();
}
- catch (std::exception& e)
- {
- std::string addr = gOpt.getBindToAddr() == "" ? "*" : gOpt.getBindToAddr();
- cLog.msg(Log::PRIO_ERROR) << "cannot bind to " << addr << ":" << gOpt.getBindToPort()
- << " (" << e.what() << ") exiting.." << std::endl;
- //return false;
+ catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught runtime_error: " << e.what();
+ }
+ catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught exception: " << e.what();
}
- //return true;
}
int main(int argc, char* argv[])
@@ -134,33 +118,19 @@ int main(int argc, char* argv[])
}
PrivInfo privs(gOpt.getUsername(), gOpt.getGroupname());
-
- std::ofstream pidFile;
- if(gOpt.getPidFile() != "") {
- pidFile.open(gOpt.getPidFile().c_str());
- if(!pidFile.is_open()) {
- std::cout << "can't open pid file" << std::endl;
- }
- }
-
- if(gOpt.getChrootDir() != "")
- do_chroot(gOpt.getChrootDir());
-
- privs.drop();
-
if(gOpt.getDaemonize()) {
daemonize();
daemonized = true;
}
- if(pidFile.is_open()) {
- pid_t pid = getpid();
- pidFile << pid;
- pidFile.close();
- }
-
gSignalController.init();
+ gResolver.init();
+ if(gOpt.getChrootDir() != "")
+ do_chroot(gOpt.getChrootDir());
+
+ privs.drop();
+
boost::thread * syncListenerThread;
syncListenerThread = new boost::thread(boost::bind(syncListener));
diff --git a/src/anytun.cpp b/src/anytun.cpp
index 5715d5a..28458e0 100644
--- a/src/anytun.cpp
+++ b/src/anytun.cpp
@@ -80,7 +80,7 @@
bool disableRouting = false;
-void createConnection(const PacketSourceEndpoint & remote_end, window_size_t seqSize, mux_t mux)
+void createConnection(const PacketSourceEndpoint& remote_end, window_size_t seqSize, mux_t mux)
{
SeqWindow* seq = new SeqWindow(seqSize);
seq_nr_t seq_nr_=0;
@@ -105,39 +105,24 @@ void createConnectionError(const std::exception& e)
#ifndef ANYTUN_NOSYNC
void syncConnector(const OptionHost& connto)
{
- SyncClient sc(connto.addr, connto.port);
- sc.run();
+ SyncClient sc(connto.addr, connto.port);
+ sc.run();
}
void syncListener()
{
try
{
- boost::asio::io_service io_service;
- SyncTcpConnection::proto::resolver resolver(io_service);
- SyncTcpConnection::proto::endpoint e;
- if(gOpt.getLocalSyncAddr()!="")
- {
- SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
- e = *resolver.resolve(query);
- } else {
- SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
- e = *resolver.resolve(query);
- }
-
-
- SyncServer server(io_service,e);
- server.onConnect=boost::bind(syncOnConnect,_1);
- gSyncQueue.setSyncServerPtr(&server);
- io_service.run();
+ SyncServer server(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort(), boost::bind(syncOnConnect, _1));
+ gSyncQueue.setSyncServerPtr(&server);
+ server.run();
}
- catch (std::exception& e)
- {
- std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
- cLog.msg(Log::PRIO_ERROR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
- << " (" << e.what() << ")" << std::endl;
+ catch(std::runtime_error& e) {
+ cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught runtime_error: " << e.what();
+ }
+ catch(std::exception& e) {
+ cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught exception: " << e.what();
}
-
}
#endif
@@ -350,6 +335,8 @@ void startSendRecvThreads(PrivInfo& privs, TunDevice* dev, PacketSource* src)
boost::thread(boost::bind(receiver, dev, src));
}
+
+
#ifdef WIN_SERVICE
int main(int argc, char* argv[])
{
@@ -461,11 +448,7 @@ int main(int argc, char* argv[])
}
#endif
- PacketSource* src;
- if(gOpt.getLocalAddr() == "")
- src = new UDPPacketSource(gOpt.getLocalPort());
- else
- src = new UDPPacketSource(gOpt.getLocalAddr(), gOpt.getLocalPort());
+ PacketSource* src = new UDPPacketSource(gOpt.getLocalAddr(), gOpt.getLocalPort());
if(gOpt.getRemoteAddr() != "")
gResolver.resolveUdp(gOpt.getRemoteAddr(), gOpt.getRemotePort(), boost::bind(createConnection, _1, gOpt.getSeqWindowSize(), gOpt.getMux()), boost::bind(createConnectionError, _1), gOpt.getResolvAddrType());
@@ -486,14 +469,13 @@ int main(int argc, char* argv[])
#endif
#ifndef ANYTUN_NOSYNC
- boost::thread * syncListenerThread;
+ boost::thread* syncListenerThread = NULL;
if(gOpt.getLocalSyncPort() != "")
syncListenerThread = new boost::thread(boost::bind(syncListener));
- std::list<boost::thread *> connectThreads;
- for(HostList::const_iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
- connectThreads.push_back(new boost::thread(boost::bind(syncConnector, *it)));
- }
+ boost::thread_group connectThreads;
+ for(HostList::const_iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
+ connectThreads.create_thread(boost::bind(syncConnector, *it));
#endif
// wait for packet source to finish in a seperate thread in order
@@ -507,31 +489,13 @@ int main(int argc, char* argv[])
int ret = gSignalController.run();
#endif
- // TODO cleanup threads here!
- /*
- pthread_cancel(senderThread);
- pthread_cancel(receiverThread);
-#ifndef ANYTUN_NOSYNC
- if ( gOpt.getLocalSyncPort())
- pthread_cancel(syncListenerThread);
- for( std::list<pthread_t>::iterator it = connectThreads.begin() ;it != connectThreads.end(); ++it)
- pthread_cancel(*it);
-#endif
-
- pthread_join(senderThread, NULL);
- pthread_join(receiverThread, NULL);
-#ifndef ANYTUN_NOSYNC
- if ( gOpt.getLocalSyncPort())
- pthread_join(syncListenerThread, NULL);
-
- for( std::list<pthread_t>::iterator it = connectThreads.begin() ;it != connectThreads.end(); ++it)
- pthread_join(*it, NULL);
-#endif
- if(src)
- delete src;
- if(connTo)
- delete connTo;
- */
+// TODO: stop all threads and cleanup
+//
+// if(src)
+// delete src;
+// if(connTo)
+// delete connTo;
+
#if defined(WIN_SERVICE)
gWinService.stop();
#endif
diff --git a/src/packetSource.cpp b/src/packetSource.cpp
index e291175..9266b57 100644
--- a/src/packetSource.cpp
+++ b/src/packetSource.cpp
@@ -44,11 +44,6 @@ void PacketSource::waitUntilReady()
ready_sem_.down();
}
-UDPPacketSource::UDPPacketSource(std::string port) : sock_(io_service_)
-{
- gResolver.resolveUdp("", port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType());
-}
-
UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) : sock_(io_service_)
{
gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType());
diff --git a/src/packetSource.h b/src/packetSource.h
index 6909125..1957eb8 100644
--- a/src/packetSource.h
+++ b/src/packetSource.h
@@ -59,7 +59,6 @@ class UDPPacketSource : public PacketSource
public:
typedef boost::asio::ip::udp proto;
- UDPPacketSource(std::string port);
UDPPacketSource(std::string localaddr, std::string port);
u_int32_t recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote);
diff --git a/src/syncServer.cpp b/src/syncServer.cpp
index c53f809..bd5120c 100644
--- a/src/syncServer.cpp
+++ b/src/syncServer.cpp
@@ -30,41 +30,60 @@
*/
#include "syncServer.h"
+#include "resolver.h"
+#include "log.h"
//using asio::ip::tcp;
-SyncServer::SyncServer(boost::asio::io_service& io_service, SyncTcpConnection::proto::endpoint tcp_endpoint )
- : acceptor_(io_service, tcp_endpoint)
+SyncServer::SyncServer(std::string localaddr, std::string port, ConnectCallback onConnect)
+ : acceptor_(io_service_), onConnect_(onConnect)
{
+ gResolver.resolveTcp(localaddr, port, boost::bind(&SyncServer::onResolve, this, _1), boost::bind(&SyncServer::onResolvError, this, _1));
+}
+
+void SyncServer::onResolve(const SyncTcpConnection::proto::endpoint& e)
+{
+ acceptor_.open(e.protocol());
+ acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
+ acceptor_.bind(e);
+ acceptor_.listen();
start_accept();
+ ready_sem_.up();
+ cLog.msg(Log::PRIO_NOTICE) << "sync server listening on " << e;
}
-void SyncServer::start_accept()
+void SyncServer::onResolvError(const std::runtime_error& e)
{
- Lock lock(mutex_);
- SyncTcpConnection::pointer new_connection =
- SyncTcpConnection::create(acceptor_.io_service());
- conns_.push_back(new_connection);
-
- acceptor_.async_accept(new_connection->socket(),
- boost::bind(&SyncServer::handle_accept, this, new_connection,
- boost::asio::placeholders::error));
+ cLog.msg(Log::PRIO_ERROR) << "sync server bind/listen failed: " << e.what();
+ // TODO: stop daemon??
+}
+
+void SyncServer::run()
+{
+ ready_sem_.down();
+ io_service_.run();
}
void SyncServer::send(std::string message)
{
Lock lock(mutex_);
- for(std::list<SyncTcpConnection::pointer>::iterator it = conns_.begin() ;it != conns_.end(); ++it) {
+ for(std::list<SyncTcpConnection::pointer>::iterator it = conns_.begin() ;it != conns_.end(); ++it)
(*it)->Send(message);
- }
}
-void SyncServer::handle_accept(SyncTcpConnection::pointer new_connection,
- const boost::system::error_code& error)
+void SyncServer::start_accept()
+{
+ Lock lock(mutex_);
+ SyncTcpConnection::pointer new_connection = SyncTcpConnection::create(acceptor_.io_service());
+ conns_.push_back(new_connection);
+ acceptor_.async_accept(new_connection->socket(),
+ boost::bind(&SyncServer::handle_accept, this, new_connection, boost::asio::placeholders::error));
+}
+
+void SyncServer::handle_accept(SyncTcpConnection::pointer new_connection, const boost::system::error_code& error)
{
- if (!error)
- {
- new_connection->onConnect=onConnect;
+ if (!error) {
+ new_connection->onConnect = onConnect_;
new_connection->start();
start_accept();
}
diff --git a/src/syncServer.h b/src/syncServer.h
index fa5fe1f..138f46e 100644
--- a/src/syncServer.h
+++ b/src/syncServer.h
@@ -44,18 +44,28 @@
#include <list>
#include "syncTcpConnection.h"
+typedef boost::function<void (SyncTcpConnection *)> ConnectCallback;
+
class SyncServer
{
public:
- SyncServer(boost::asio::io_service& io_service, SyncTcpConnection::proto::endpoint tcp_endpoint );
- boost::function<void(SyncTcpConnection *)> onConnect;
+ SyncServer(std::string localaddr, std::string port, ConnectCallback onConnect);
+ void onResolve(const SyncTcpConnection::proto::endpoint& e);
+ void onResolvError(const std::runtime_error& e);
+
+ void run();
+ void send(std::string message);
+
std::list<SyncTcpConnection::pointer> conns_;
- void send(std::string message);
+
private:
void start_accept();
- void handle_accept(SyncTcpConnection::pointer new_connection,
- const boost::system::error_code& error);
- Mutex mutex_; //Mutex for list conns_
+ void handle_accept(SyncTcpConnection::pointer new_connection, const boost::system::error_code& error);
+
+ Mutex mutex_; //Mutex for list conns_
+ boost::asio::io_service io_service_;
SyncTcpConnection::proto::acceptor acceptor_;
+ ConnectCallback onConnect_;
+ Semaphore ready_sem_;
};
#endif