From 623e15d35dffbda9f30d0d4aa7e42439723df31e Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 22 Mar 2009 23:57:35 +0000 Subject: added gResolver to SyncServer some cleanup --- src/anytun-controld.cpp | 76 +++++++++++++------------------------------ src/anytun.cpp | 86 ++++++++++++++----------------------------------- src/packetSource.cpp | 5 --- src/packetSource.h | 1 - src/syncServer.cpp | 55 ++++++++++++++++++++----------- src/syncServer.h | 22 +++++++++---- 6 files changed, 101 insertions(+), 144 deletions(-) diff --git a/src/anytun-controld.cpp b/src/anytun-controld.cpp index 30948c0..f32ec83 100644 --- a/src/anytun-controld.cpp +++ b/src/anytun-controld.cpp @@ -42,23 +42,22 @@ #include "log.h" #include "signalController.h" #include "options.h" +#include "resolver.h" #include "syncServer.h" #include "daemon.hpp" void syncOnConnect(SyncTcpConnection * connptr) { - std::ifstream file( gOpt.getFileName().c_str() ); - if( file.is_open() ) - { - std::string line; - while (! file.eof() ) - { - getline (file,line); - connptr->Send(line); - } - file.close(); - } + std::ifstream file(gOpt.getFileName().c_str()); + if(file.is_open()) { + std::string line; + while (!file.eof()) { + getline (file,line); + connptr->Send(line); + } + file.close(); + } } void syncListener() @@ -66,30 +65,15 @@ void syncListener() boost::asio::io_service io_service; try { - SyncTcpConnection::proto::resolver resolver(io_service); - SyncTcpConnection::proto::endpoint e; - if(gOpt.getBindToAddr()!="") - { - SyncTcpConnection::proto::resolver::query query(gOpt.getBindToAddr(), gOpt.getBindToPort()); - e = *resolver.resolve(query); - } else { - SyncTcpConnection::proto::resolver::query query(gOpt.getBindToPort()); - e = *resolver.resolve(query); - } - - - SyncServer server(io_service,e); - server.onConnect=boost::bind(syncOnConnect,_1); - io_service.run(); + SyncServer server(gOpt.getBindToAddr(), gOpt.getBindToPort(), boost::bind(syncOnConnect, _1)); + server.run(); } - catch (std::exception& e) - { - std::string addr = gOpt.getBindToAddr() == "" ? "*" : gOpt.getBindToAddr(); - cLog.msg(Log::PRIO_ERROR) << "cannot bind to " << addr << ":" << gOpt.getBindToPort() - << " (" << e.what() << ") exiting.." << std::endl; - //return false; + catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught runtime_error: " << e.what(); + } + catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught exception: " << e.what(); } - //return true; } int main(int argc, char* argv[]) @@ -134,33 +118,19 @@ int main(int argc, char* argv[]) } PrivInfo privs(gOpt.getUsername(), gOpt.getGroupname()); - - std::ofstream pidFile; - if(gOpt.getPidFile() != "") { - pidFile.open(gOpt.getPidFile().c_str()); - if(!pidFile.is_open()) { - std::cout << "can't open pid file" << std::endl; - } - } - - if(gOpt.getChrootDir() != "") - do_chroot(gOpt.getChrootDir()); - - privs.drop(); - if(gOpt.getDaemonize()) { daemonize(); daemonized = true; } - if(pidFile.is_open()) { - pid_t pid = getpid(); - pidFile << pid; - pidFile.close(); - } - gSignalController.init(); + gResolver.init(); + if(gOpt.getChrootDir() != "") + do_chroot(gOpt.getChrootDir()); + + privs.drop(); + boost::thread * syncListenerThread; syncListenerThread = new boost::thread(boost::bind(syncListener)); diff --git a/src/anytun.cpp b/src/anytun.cpp index 5715d5a..28458e0 100644 --- a/src/anytun.cpp +++ b/src/anytun.cpp @@ -80,7 +80,7 @@ bool disableRouting = false; -void createConnection(const PacketSourceEndpoint & remote_end, window_size_t seqSize, mux_t mux) +void createConnection(const PacketSourceEndpoint& remote_end, window_size_t seqSize, mux_t mux) { SeqWindow* seq = new SeqWindow(seqSize); seq_nr_t seq_nr_=0; @@ -105,39 +105,24 @@ void createConnectionError(const std::exception& e) #ifndef ANYTUN_NOSYNC void syncConnector(const OptionHost& connto) { - SyncClient sc(connto.addr, connto.port); - sc.run(); + SyncClient sc(connto.addr, connto.port); + sc.run(); } void syncListener() { try { - boost::asio::io_service io_service; - SyncTcpConnection::proto::resolver resolver(io_service); - SyncTcpConnection::proto::endpoint e; - if(gOpt.getLocalSyncAddr()!="") - { - SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort()); - e = *resolver.resolve(query); - } else { - SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort()); - e = *resolver.resolve(query); - } - - - SyncServer server(io_service,e); - server.onConnect=boost::bind(syncOnConnect,_1); - gSyncQueue.setSyncServerPtr(&server); - io_service.run(); + SyncServer server(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort(), boost::bind(syncOnConnect, _1)); + gSyncQueue.setSyncServerPtr(&server); + server.run(); } - catch (std::exception& e) - { - std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr(); - cLog.msg(Log::PRIO_ERROR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort() - << " (" << e.what() << ")" << std::endl; + catch(std::runtime_error& e) { + cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught runtime_error: " << e.what(); + } + catch(std::exception& e) { + cLog.msg(Log::PRIO_ERROR) << "sync listener thread died due to an uncaught exception: " << e.what(); } - } #endif @@ -350,6 +335,8 @@ void startSendRecvThreads(PrivInfo& privs, TunDevice* dev, PacketSource* src) boost::thread(boost::bind(receiver, dev, src)); } + + #ifdef WIN_SERVICE int main(int argc, char* argv[]) { @@ -461,11 +448,7 @@ int main(int argc, char* argv[]) } #endif - PacketSource* src; - if(gOpt.getLocalAddr() == "") - src = new UDPPacketSource(gOpt.getLocalPort()); - else - src = new UDPPacketSource(gOpt.getLocalAddr(), gOpt.getLocalPort()); + PacketSource* src = new UDPPacketSource(gOpt.getLocalAddr(), gOpt.getLocalPort()); if(gOpt.getRemoteAddr() != "") gResolver.resolveUdp(gOpt.getRemoteAddr(), gOpt.getRemotePort(), boost::bind(createConnection, _1, gOpt.getSeqWindowSize(), gOpt.getMux()), boost::bind(createConnectionError, _1), gOpt.getResolvAddrType()); @@ -486,14 +469,13 @@ int main(int argc, char* argv[]) #endif #ifndef ANYTUN_NOSYNC - boost::thread * syncListenerThread; + boost::thread* syncListenerThread = NULL; if(gOpt.getLocalSyncPort() != "") syncListenerThread = new boost::thread(boost::bind(syncListener)); - std::list connectThreads; - for(HostList::const_iterator it = connect_to.begin() ;it != connect_to.end(); ++it) { - connectThreads.push_back(new boost::thread(boost::bind(syncConnector, *it))); - } + boost::thread_group connectThreads; + for(HostList::const_iterator it = connect_to.begin() ;it != connect_to.end(); ++it) + connectThreads.create_thread(boost::bind(syncConnector, *it)); #endif // wait for packet source to finish in a seperate thread in order @@ -507,31 +489,13 @@ int main(int argc, char* argv[]) int ret = gSignalController.run(); #endif - // TODO cleanup threads here! - /* - pthread_cancel(senderThread); - pthread_cancel(receiverThread); -#ifndef ANYTUN_NOSYNC - if ( gOpt.getLocalSyncPort()) - pthread_cancel(syncListenerThread); - for( std::list::iterator it = connectThreads.begin() ;it != connectThreads.end(); ++it) - pthread_cancel(*it); -#endif - - pthread_join(senderThread, NULL); - pthread_join(receiverThread, NULL); -#ifndef ANYTUN_NOSYNC - if ( gOpt.getLocalSyncPort()) - pthread_join(syncListenerThread, NULL); - - for( std::list::iterator it = connectThreads.begin() ;it != connectThreads.end(); ++it) - pthread_join(*it, NULL); -#endif - if(src) - delete src; - if(connTo) - delete connTo; - */ +// TODO: stop all threads and cleanup +// +// if(src) +// delete src; +// if(connTo) +// delete connTo; + #if defined(WIN_SERVICE) gWinService.stop(); #endif diff --git a/src/packetSource.cpp b/src/packetSource.cpp index e291175..9266b57 100644 --- a/src/packetSource.cpp +++ b/src/packetSource.cpp @@ -44,11 +44,6 @@ void PacketSource::waitUntilReady() ready_sem_.down(); } -UDPPacketSource::UDPPacketSource(std::string port) : sock_(io_service_) -{ - gResolver.resolveUdp("", port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType()); -} - UDPPacketSource::UDPPacketSource(std::string localaddr, std::string port) : sock_(io_service_) { gResolver.resolveUdp(localaddr, port, boost::bind(&UDPPacketSource::onResolve, this, _1), boost::bind(&UDPPacketSource::onError, this, _1), gOpt.getResolvAddrType()); diff --git a/src/packetSource.h b/src/packetSource.h index 6909125..1957eb8 100644 --- a/src/packetSource.h +++ b/src/packetSource.h @@ -59,7 +59,6 @@ class UDPPacketSource : public PacketSource public: typedef boost::asio::ip::udp proto; - UDPPacketSource(std::string port); UDPPacketSource(std::string localaddr, std::string port); u_int32_t recv(u_int8_t* buf, u_int32_t len, PacketSourceEndpoint& remote); diff --git a/src/syncServer.cpp b/src/syncServer.cpp index c53f809..bd5120c 100644 --- a/src/syncServer.cpp +++ b/src/syncServer.cpp @@ -30,41 +30,60 @@ */ #include "syncServer.h" +#include "resolver.h" +#include "log.h" //using asio::ip::tcp; -SyncServer::SyncServer(boost::asio::io_service& io_service, SyncTcpConnection::proto::endpoint tcp_endpoint ) - : acceptor_(io_service, tcp_endpoint) +SyncServer::SyncServer(std::string localaddr, std::string port, ConnectCallback onConnect) + : acceptor_(io_service_), onConnect_(onConnect) { + gResolver.resolveTcp(localaddr, port, boost::bind(&SyncServer::onResolve, this, _1), boost::bind(&SyncServer::onResolvError, this, _1)); +} + +void SyncServer::onResolve(const SyncTcpConnection::proto::endpoint& e) +{ + acceptor_.open(e.protocol()); + acceptor_.set_option(boost::asio::socket_base::reuse_address(true)); + acceptor_.bind(e); + acceptor_.listen(); start_accept(); + ready_sem_.up(); + cLog.msg(Log::PRIO_NOTICE) << "sync server listening on " << e; } -void SyncServer::start_accept() +void SyncServer::onResolvError(const std::runtime_error& e) { - Lock lock(mutex_); - SyncTcpConnection::pointer new_connection = - SyncTcpConnection::create(acceptor_.io_service()); - conns_.push_back(new_connection); - - acceptor_.async_accept(new_connection->socket(), - boost::bind(&SyncServer::handle_accept, this, new_connection, - boost::asio::placeholders::error)); + cLog.msg(Log::PRIO_ERROR) << "sync server bind/listen failed: " << e.what(); + // TODO: stop daemon?? +} + +void SyncServer::run() +{ + ready_sem_.down(); + io_service_.run(); } void SyncServer::send(std::string message) { Lock lock(mutex_); - for(std::list::iterator it = conns_.begin() ;it != conns_.end(); ++it) { + for(std::list::iterator it = conns_.begin() ;it != conns_.end(); ++it) (*it)->Send(message); - } } -void SyncServer::handle_accept(SyncTcpConnection::pointer new_connection, - const boost::system::error_code& error) +void SyncServer::start_accept() +{ + Lock lock(mutex_); + SyncTcpConnection::pointer new_connection = SyncTcpConnection::create(acceptor_.io_service()); + conns_.push_back(new_connection); + acceptor_.async_accept(new_connection->socket(), + boost::bind(&SyncServer::handle_accept, this, new_connection, boost::asio::placeholders::error)); +} + +void SyncServer::handle_accept(SyncTcpConnection::pointer new_connection, const boost::system::error_code& error) { - if (!error) - { - new_connection->onConnect=onConnect; + if (!error) { + new_connection->onConnect = onConnect_; new_connection->start(); start_accept(); } diff --git a/src/syncServer.h b/src/syncServer.h index fa5fe1f..138f46e 100644 --- a/src/syncServer.h +++ b/src/syncServer.h @@ -44,18 +44,28 @@ #include #include "syncTcpConnection.h" +typedef boost::function ConnectCallback; + class SyncServer { public: - SyncServer(boost::asio::io_service& io_service, SyncTcpConnection::proto::endpoint tcp_endpoint ); - boost::function onConnect; + SyncServer(std::string localaddr, std::string port, ConnectCallback onConnect); + void onResolve(const SyncTcpConnection::proto::endpoint& e); + void onResolvError(const std::runtime_error& e); + + void run(); + void send(std::string message); + std::list conns_; - void send(std::string message); + private: void start_accept(); - void handle_accept(SyncTcpConnection::pointer new_connection, - const boost::system::error_code& error); - Mutex mutex_; //Mutex for list conns_ + void handle_accept(SyncTcpConnection::pointer new_connection, const boost::system::error_code& error); + + Mutex mutex_; //Mutex for list conns_ + boost::asio::io_service io_service_; SyncTcpConnection::proto::acceptor acceptor_; + ConnectCallback onConnect_; + Semaphore ready_sem_; }; #endif -- cgit v1.2.3