From fa4f4a8a50c4bab3a3e247fb7186a7f9a00dfc11 Mon Sep 17 00:00:00 2001 From: Othmar Gsenger Date: Mon, 3 Dec 2007 10:51:16 +0000 Subject: Added syncsocket --- Sockets/SocketHandler.cpp | 1422 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1422 insertions(+) create mode 100644 Sockets/SocketHandler.cpp (limited to 'Sockets/SocketHandler.cpp') diff --git a/Sockets/SocketHandler.cpp b/Sockets/SocketHandler.cpp new file mode 100644 index 0000000..c563aac --- /dev/null +++ b/Sockets/SocketHandler.cpp @@ -0,0 +1,1422 @@ +/** \file SocketHandler.cpp + ** \date 2004-02-13 + ** \author grymse@alhem.net +**/ +/* +Copyright (C) 2004-2007 Anders Hedstrom + +This library is made available under the terms of the GNU GPL. + +If you would like to use this library in a closed-source application, +a separate license agreement is available. For information about +the closed-source license agreement for the C++ sockets library, +please visit http://www.alhem.net/Sockets/license.html and/or +email license@alhem.net. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ +#ifdef _WIN32 +#ifdef _MSC_VER +#pragma warning(disable:4786) +#endif +#endif +#include +#include + +#include "SocketHandler.h" +#include "UdpSocket.h" +#include "ResolvSocket.h" +#include "ResolvServer.h" +#include "TcpSocket.h" +#include "Mutex.h" +#include "Utility.h" +#include "SocketAddress.h" + +#ifdef SOCKETS_NAMESPACE +namespace SOCKETS_NAMESPACE { +#endif + + +#ifdef _DEBUG +#define DEB(x) x; fflush(stderr); +#else +#define DEB(x) +#endif + + +SocketHandler::SocketHandler(StdLog *p) +:m_stdlog(p) +,m_mutex(m_mutex) +,m_b_use_mutex(false) +,m_maxsock(0) +,m_preverror(-1) +,m_errcnt(0) +,m_tlast(0) +#ifdef ENABLE_SOCKS4 +,m_socks4_host(0) +,m_socks4_port(0) +,m_bTryDirect(false) +#endif +#ifdef ENABLE_RESOLVER +,m_resolv_id(0) +,m_resolver(NULL) +#endif +#ifdef ENABLE_POOL +,m_b_enable_pool(false) +#endif +#ifdef ENABLE_TRIGGERS +,m_next_trigger_id(0) +#endif +#ifdef ENABLE_DETACH +,m_slave(false) +#endif +{ + FD_ZERO(&m_rfds); + FD_ZERO(&m_wfds); + FD_ZERO(&m_efds); +} + + +SocketHandler::SocketHandler(Mutex& mutex,StdLog *p) +:m_stdlog(p) +,m_mutex(mutex) +,m_b_use_mutex(true) +,m_maxsock(0) +,m_preverror(-1) +,m_errcnt(0) +,m_tlast(0) +#ifdef ENABLE_SOCKS4 +,m_socks4_host(0) +,m_socks4_port(0) +,m_bTryDirect(false) +#endif +#ifdef ENABLE_RESOLVER +,m_resolv_id(0) +,m_resolver(NULL) +#endif +#ifdef ENABLE_POOL +,m_b_enable_pool(false) +#endif +#ifdef ENABLE_TRIGGERS +,m_next_trigger_id(0) +#endif +#ifdef ENABLE_DETACH +,m_slave(false) +#endif +{ + m_mutex.Lock(); + FD_ZERO(&m_rfds); + FD_ZERO(&m_wfds); + FD_ZERO(&m_efds); +} + + +SocketHandler::~SocketHandler() +{ +#ifdef ENABLE_RESOLVER + if (m_resolver) + { + m_resolver -> Quit(); + } +#endif + { + while (m_sockets.size()) + { +DEB( fprintf(stderr, "Emptying sockets list in SocketHandler destructor, %d instances\n", (int)m_sockets.size());) + socket_m::iterator it = m_sockets.begin(); + Socket *p = it -> second; + if (p) + { +DEB( fprintf(stderr, " fd %d\n", p -> GetSocket());) + p -> Close(); +DEB( fprintf(stderr, " fd closed %d\n", p -> GetSocket());) +// p -> OnDelete(); // hey, I turn this back on. what's the worst that could happen??!! + // MinionSocket breaks, calling MinderHandler methods in OnDelete - + // MinderHandler is already gone when that happens... + + // only delete socket when controlled + // ie master sockethandler can delete non-detached sockets + // and a slave sockethandler can only delete a detach socket + if (p -> DeleteByHandler() +#ifdef ENABLE_DETACH + && !(m_slave ^ p -> IsDetached()) +#endif + ) + { + p -> SetErasedByHandler(); + delete p; + } + m_sockets.erase(it); + } + else + { + m_sockets.erase(it); + } +DEB( fprintf(stderr, "next\n");) + } +DEB( fprintf(stderr, "/Emptying sockets list in SocketHandler destructor, %d instances\n", (int)m_sockets.size());) + } +#ifdef ENABLE_RESOLVER + if (m_resolver) + { + delete m_resolver; + } +#endif + if (m_b_use_mutex) + { + m_mutex.Unlock(); + } +} + + +Mutex& SocketHandler::GetMutex() const +{ + return m_mutex; +} + + +#ifdef ENABLE_DETACH +void SocketHandler::SetSlave(bool x) +{ + m_slave = x; +} + + +bool SocketHandler::IsSlave() +{ + return m_slave; +} +#endif + + +void SocketHandler::RegStdLog(StdLog *log) +{ + m_stdlog = log; +} + + +void SocketHandler::LogError(Socket *p,const std::string& user_text,int err,const std::string& sys_err,loglevel_t t) +{ + if (m_stdlog) + { + m_stdlog -> error(this, p, user_text, err, sys_err, t); + } +} + + +void SocketHandler::Add(Socket *p) +{ + if (p -> GetSocket() == INVALID_SOCKET) + { + LogError(p, "Add", -1, "Invalid socket", LOG_LEVEL_WARNING); + if (p -> CloseAndDelete()) + { + m_delete.push_back(p); + } + return; + } + if (m_add.find(p -> GetSocket()) != m_add.end()) + { + LogError(p, "Add", (int)p -> GetSocket(), "Attempt to add socket already in add queue", LOG_LEVEL_FATAL); + m_delete.push_back(p); + return; + } + m_add[p -> GetSocket()] = p; +} + + +void SocketHandler::Get(SOCKET s,bool& r,bool& w,bool& e) +{ + if (s >= 0) + { + r = FD_ISSET(s, &m_rfds) ? true : false; + w = FD_ISSET(s, &m_wfds) ? true : false; + e = FD_ISSET(s, &m_efds) ? true : false; + } +} + + +void SocketHandler::Set(SOCKET s,bool bRead,bool bWrite,bool bException) +{ +DEB( fprintf(stderr, "Set(%d, %s, %s, %s)\n", s, bRead ? "true" : "false", bWrite ? "true" : "false", bException ? "true" : "false");) + if (s >= 0) + { + if (bRead) + { + if (!FD_ISSET(s, &m_rfds)) + { + FD_SET(s, &m_rfds); + } + } + else + { + FD_CLR(s, &m_rfds); + } + if (bWrite) + { + if (!FD_ISSET(s, &m_wfds)) + { + FD_SET(s, &m_wfds); + } + } + else + { + FD_CLR(s, &m_wfds); + } + if (bException) + { + if (!FD_ISSET(s, &m_efds)) + { + FD_SET(s, &m_efds); + } + } + else + { + FD_CLR(s, &m_efds); + } + } +} + + +int SocketHandler::Select(long sec,long usec) +{ + struct timeval tv; + tv.tv_sec = sec; + tv.tv_usec = usec; + return Select(&tv); +} + + +int SocketHandler::Select() +{ + if (m_fds_callonconnect.size() || +#ifdef ENABLE_DETACH + (!m_slave && m_fds_detach.size()) || +#endif + m_fds_timeout.size() || + m_fds_retry.size() || + m_fds_close.size() || + m_fds_erase.size()) + { + return Select(0, 200000); + } + return Select(NULL); +} + + +int SocketHandler::Select(struct timeval *tsel) +{ + size_t ignore = 0; + while (m_add.size() > ignore) + { + if (m_sockets.size() >= FD_SETSIZE) + { + LogError(NULL, "Select", (int)m_sockets.size(), "FD_SETSIZE reached", LOG_LEVEL_WARNING); + break; + } + socket_m::iterator it = m_add.begin(); + SOCKET s = it -> first; + Socket *p = it -> second; +DEB( fprintf(stderr, "Trying to add fd %d, m_add.size() %d, ignore %d\n", (int)s, (int)m_add.size(), (int)ignore);) + // + if (m_sockets.find(p -> GetSocket()) != m_sockets.end()) + { + LogError(p, "Add", (int)p -> GetSocket(), "Attempt to add socket already in controlled queue", LOG_LEVEL_FATAL); + // %! it's a dup, don't add to delete queue, just ignore it + m_delete.push_back(p); + m_add.erase(it); +// ignore++; + continue; + } + if (!p -> CloseAndDelete()) + { + StreamSocket *scp = dynamic_cast(p); + if (scp && scp -> Connecting()) // 'Open' called before adding socket + { + Set(s,false,true); + } + else + { + TcpSocket *tcp = dynamic_cast(p); + bool bWrite = tcp ? tcp -> GetOutputLength() != 0 : false; + if (p -> IsDisableRead()) + { + Set(s, false, bWrite); + } + else + { + Set(s, true, bWrite); + } + } + m_maxsock = (s > m_maxsock) ? s : m_maxsock; + } + else + { + LogError(p, "Add", (int)p -> GetSocket(), "Trying to add socket with SetCloseAndDelete() true", LOG_LEVEL_WARNING); + } + // only add to m_fds (process fd_set events) if + // slave handler and detached/detaching socket + // master handler and non-detached socket +#ifdef ENABLE_DETACH + if (!(m_slave ^ p -> IsDetach())) +#endif + { + m_fds.push_back(s); + } + m_sockets[s] = p; + // + m_add.erase(it); + } +#ifdef MACOSX + fd_set rfds; + fd_set wfds; + fd_set efds; + FD_COPY(&m_rfds, &rfds); + FD_COPY(&m_wfds, &wfds); + FD_COPY(&m_efds, &efds); +#else + fd_set rfds = m_rfds; + fd_set wfds = m_wfds; + fd_set efds = m_efds; +#endif + int n; + if (m_b_use_mutex) + { + m_mutex.Unlock(); + n = select( (int)(m_maxsock + 1),&rfds,&wfds,&efds,tsel); + m_mutex.Lock(); + } + else + { + n = select( (int)(m_maxsock + 1),&rfds,&wfds,&efds,tsel); + } + if (n == -1) + { + /* + EBADF An invalid file descriptor was given in one of the sets. + EINTR A non blocked signal was caught. + EINVAL n is negative. Or struct timeval contains bad time values (<0). + ENOMEM select was unable to allocate memory for internal tables. + */ + if (Errno != m_preverror || m_errcnt++ % 10000 == 0) + { + LogError(NULL, "select", Errno, StrError(Errno)); +DEB( fprintf(stderr, "m_maxsock: %d\n", m_maxsock); + fprintf(stderr, "%s\n", Errno == EINVAL ? "EINVAL" : + Errno == EINTR ? "EINTR" : + Errno == EBADF ? "EBADF" : + Errno == ENOMEM ? "ENOMEM" : ""); + // test bad fd + for (SOCKET i = 0; i <= m_maxsock; i++) + { + bool t = false; + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + if (FD_ISSET(i, &m_rfds)) + { + FD_SET(i, &rfds); + t = true; + } + if (FD_ISSET(i, &m_wfds)) + { + FD_SET(i, &wfds); + t = true; + } + if (FD_ISSET(i, &m_efds)) + { + FD_SET(i, &efds); + t = true; + } + if (t && m_sockets.find(i) == m_sockets.end()) + { + fprintf(stderr, "Bad fd in fd_set: %d\n", i); + } + } +) // DEB + m_preverror = Errno; + } + /// \todo rebuild fd_set's from active sockets list (m_sockets) here + } + else + if (!n) + { + m_preverror = -1; + } + else + if (n > 0) + { + for (socket_v::iterator it2 = m_fds.begin(); it2 != m_fds.end() && n; it2++) + { + SOCKET i = *it2; + if (FD_ISSET(i, &rfds)) + { + socket_m::iterator itmp = m_sockets.find(i); + if (itmp != m_sockets.end()) // found + { + Socket *p = itmp -> second; + // new SSL negotiate method +#ifdef HAVE_OPENSSL + if (p -> IsSSLNegotiate()) + { + p -> SSLNegotiate(); + } + else +#endif + { + p -> OnRead(); + } + } + else + { + LogError(NULL, "GetSocket/handler/1", (int)i, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + n--; + } + if (FD_ISSET(i, &wfds)) + { + socket_m::iterator itmp = m_sockets.find(i); + if (itmp != m_sockets.end()) // found + { + Socket *p = itmp -> second; + // new SSL negotiate method +#ifdef HAVE_OPENSSL + if (p -> IsSSLNegotiate()) + { + p -> SSLNegotiate(); + } + else +#endif + { + p -> OnWrite(); + } + } + else + { + LogError(NULL, "GetSocket/handler/2", (int)i, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + n--; + } + if (FD_ISSET(i, &efds)) + { + socket_m::iterator itmp = m_sockets.find(i); + if (itmp != m_sockets.end()) // found + { + Socket *p = itmp -> second; + p -> OnException(); + } + else + { + LogError(NULL, "GetSocket/handler/3", (int)i, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + n--; + } + } // m_fds loop + m_preverror = -1; + } // if (n > 0) + + // check CallOnConnect - EVENT + if (m_fds_callonconnect.size()) + { + socket_v tmp = m_fds_callonconnect; + for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++) + { + Socket *p = NULL; + { + socket_m::iterator itmp = m_sockets.find(*it); + if (itmp != m_sockets.end()) // found + { + p = itmp -> second; + } + else + { + LogError(NULL, "GetSocket/handler/4", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + } + if (p) + { +// if (p -> CallOnConnect() && p -> Ready() ) + { + p -> SetConnected(); // moved here from inside if (tcp) check below +#ifdef HAVE_OPENSSL + if (p -> IsSSL()) // SSL Enabled socket + p -> OnSSLConnect(); + else +#endif +#ifdef ENABLE_SOCKS4 + if (p -> Socks4()) + p -> OnSocks4Connect(); + else +#endif + { + TcpSocket *tcp = dynamic_cast(p); + if (tcp) + { + if (tcp -> GetOutputLength()) + { + p -> OnWrite(); + } + } +#ifdef ENABLE_RECONNECT + if (tcp && tcp -> IsReconnect()) + p -> OnReconnect(); + else +#endif + { +// LogError(p, "Calling OnConnect", 0, "Because CallOnConnect", LOG_LEVEL_INFO); + p -> OnConnect(); + } + } +// p -> SetCallOnConnect( false ); + AddList(p -> GetSocket(), LIST_CALLONCONNECT, false); + } + } + } + } +#ifdef ENABLE_DETACH + // check detach of socket if master handler - EVENT + if (!m_slave && m_fds_detach.size()) + { + // %! why not using tmp list here??!? + for (socket_v::iterator it = m_fds_detach.begin(); it != m_fds_detach.end(); it++) + { + Socket *p = NULL; + { + socket_m::iterator itmp = m_sockets.find(*it); + if (itmp != m_sockets.end()) // found + { + p = itmp -> second; + } + else + { + LogError(NULL, "GetSocket/handler/5", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + } + if (p) + { +// if (p -> IsDetach()) + { + Set(p -> GetSocket(), false, false, false); + // After DetachSocket(), all calls to Handler() will return a reference + // to the new slave SocketHandler running in the new thread. + p -> DetachSocket(); + // Adding the file descriptor to m_fds_erase will now also remove the + // socket from the detach queue - tnx knightmad + m_fds_erase.push_back(p -> GetSocket()); + } + } + } + } +#endif + // check Connecting - connection timeout - conditional event + if (m_fds_timeout.size()) + { + time_t tnow = time(NULL); + if (tnow != m_tlast) + { + socket_v tmp = m_fds_timeout; +DEB( fprintf(stderr, "Checking %d socket(s) for timeout\n", tmp.size());) + for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++) + { + Socket *p = NULL; + { + socket_m::iterator itmp = m_sockets.find(*it); + if (itmp != m_sockets.end()) // found + { + p = itmp -> second; + } + else + { + itmp = m_add.find(*it); + if (itmp != m_add.end()) + { + p = itmp -> second; + } + else + { + LogError(NULL, "GetSocket/handler/6", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + } + } + if (p) + { + if (p -> Timeout(tnow)) + { + StreamSocket *scp = dynamic_cast(p); + if (scp && scp -> Connecting()) + p -> OnConnectTimeout(); + else + p -> OnTimeout(); + p -> SetTimeout(0); + } + } + } + m_tlast = tnow; + } // tnow != tlast + } + // check retry client connect - EVENT + if (m_fds_retry.size()) + { + socket_v tmp = m_fds_retry; + for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++) + { + Socket *p = NULL; + { + socket_m::iterator itmp = m_sockets.find(*it); + if (itmp != m_sockets.end()) // found + { + p = itmp -> second; + } + else + { + LogError(NULL, "GetSocket/handler/7", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + } + if (p) + { +// if (p -> RetryClientConnect()) + { + TcpSocket *tcp = dynamic_cast(p); + SOCKET nn = *it; //(*it3).first; + tcp -> SetRetryClientConnect(false); +DEB( fprintf(stderr, "Close() before retry client connect\n");) + p -> Close(); // removes from m_fds_retry + std::auto_ptr ad = p -> GetClientRemoteAddress(); + if (ad.get()) + { + tcp -> Open(*ad); + } + else + { + LogError(p, "RetryClientConnect", 0, "no address", LOG_LEVEL_ERROR); + } + Add(p); + m_fds_erase.push_back(nn); + } + } + } + } + // check close and delete - conditional event + if (m_fds_close.size()) + { + socket_v tmp = m_fds_close; +DEB( fprintf(stderr, "m_fds_close.size() == %d\n", (int)m_fds_close.size());) + for (socket_v::iterator it = tmp.begin(); it != tmp.end(); it++) + { + Socket *p = NULL; + { + socket_m::iterator itmp = m_sockets.find(*it); + if (itmp != m_sockets.end()) // found + { + p = itmp -> second; + } + else + { + itmp = m_add.find(*it); + if (itmp != m_add.end()) + { + p = itmp -> second; + } + else + { + LogError(NULL, "GetSocket/handler/8", (int)*it, "Did not find expected socket using file descriptor", LOG_LEVEL_WARNING); + } + } + } + if (p) + { +// if (p -> CloseAndDelete() ) + { + TcpSocket *tcp = dynamic_cast(p); + // new graceful tcp - flush and close timeout 5s + if (tcp && p -> IsConnected() && tcp -> GetFlushBeforeClose() && +#ifdef HAVE_OPENSSL + !tcp -> IsSSL() && +#endif + p -> TimeSinceClose() < 5) + { +DEB( fprintf(stderr, " close(1)\n");) + if (tcp -> GetOutputLength()) + { + LogError(p, "Closing", (int)tcp -> GetOutputLength(), "Sending all data before closing", LOG_LEVEL_INFO); + } + else // shutdown write when output buffer is empty + if (!(tcp -> GetShutdown() & SHUT_WR)) + { + SOCKET nn = *it; + if (nn != INVALID_SOCKET && shutdown(nn, SHUT_WR) == -1) + { + LogError(p, "graceful shutdown", Errno, StrError(Errno), LOG_LEVEL_ERROR); + } + tcp -> SetShutdown(SHUT_WR); + } + } + else +#ifdef ENABLE_RECONNECT + if (tcp && p -> IsConnected() && tcp -> Reconnect()) + { + SOCKET nn = *it; //(*it3).first; +DEB( fprintf(stderr, " close(2) fd %d\n", nn);) + p -> SetCloseAndDelete(false); + tcp -> SetIsReconnect(); + p -> SetConnected(false); +DEB( fprintf(stderr, "Close() before reconnect\n");) + p -> Close(); // dispose of old file descriptor (Open creates a new) + p -> OnDisconnect(); + std::auto_ptr ad = p -> GetClientRemoteAddress(); + if (ad.get()) + { + tcp -> Open(*ad); + } + else + { + LogError(p, "Reconnect", 0, "no address", LOG_LEVEL_ERROR); + } + tcp -> ResetConnectionRetries(); + Add(p); + m_fds_erase.push_back(nn); + } + else +#endif + { + SOCKET nn = *it; //(*it3).first; +DEB( fprintf(stderr, " close(3) fd %d GetSocket() %d\n", nn, p -> GetSocket());) + if (tcp && p -> IsConnected() && tcp -> GetOutputLength()) + { + LogError(p, "Closing", (int)tcp -> GetOutputLength(), "Closing socket while data still left to send", LOG_LEVEL_WARNING); + } +#ifdef ENABLE_POOL + if (p -> Retain() && !p -> Lost()) + { + PoolSocket *p2 = new PoolSocket(*this, p); + p2 -> SetDeleteByHandler(); + Add(p2); + // + p -> SetCloseAndDelete(false); // added - remove from m_fds_close + } + else +#endif // ENABLE_POOL + { + Set(p -> GetSocket(),false,false,false); +DEB( fprintf(stderr, "Close() before OnDelete\n");) + p -> Close(); + } + p -> OnDelete(); + if (p -> DeleteByHandler()) + { + p -> SetErasedByHandler(); + } + m_fds_erase.push_back(nn); + } + } + } + } + } + + // check erased sockets + bool check_max_fd = false; + while (m_fds_erase.size()) + { + socket_v::iterator it = m_fds_erase.begin(); + SOCKET nn = *it; +#ifdef ENABLE_DETACH + { + for (socket_v::iterator it = m_fds_detach.begin(); it != m_fds_detach.end(); it++) + { + if (*it == nn) + { + m_fds_detach.erase(it); + break; + } + } + } +#endif + { + for (socket_v::iterator it = m_fds.begin(); it != m_fds.end(); it++) + { + if (*it == nn) + { + m_fds.erase(it); + break; + } + } + } + { + socket_m::iterator it = m_sockets.find(nn); + if (it != m_sockets.end()) + { + Socket *p = it -> second; + /* Sometimes a SocketThread class can finish its run before the master + sockethandler gets here. In that case, the SocketThread has set the + 'ErasedByHandler' flag on the socket which will make us end up with a + double delete on the socket instance. + The fix is to make sure that the master sockethandler only can delete + non-detached sockets, and a slave sockethandler only can delete + detach sockets. */ + if (p -> ErasedByHandler() +#ifdef ENABLE_DETACH + && !(m_slave ^ p -> IsDetached()) +#endif + ) + { +#ifdef ENABLE_TRIGGERS + bool again = false; + do + { + again = false; + for (std::map::iterator it = m_trigger_src.begin(); it != m_trigger_src.end(); it++) + { + int id = it -> first; + Socket *src = it -> second; + if (src == p) + { + for (std::map::iterator it = m_trigger_dst[id].begin(); it != m_trigger_dst[id].end(); it++) + { + Socket *dst = it -> first; + if (Valid(dst)) + { + dst -> OnCancelled(id); + } + } + m_trigger_src.erase(m_trigger_src.find(id)); + m_trigger_dst.erase(m_trigger_dst.find(id)); + again = true; + break; + } + } + } while (again); +#endif + delete p; + } + m_sockets.erase(it); + } + } + m_fds_erase.erase(it); + check_max_fd = true; + } + // calculate max file descriptor for select() call + if (check_max_fd) + { + m_maxsock = 0; + for (socket_v::iterator it = m_fds.begin(); it != m_fds.end(); it++) + { + SOCKET s = *it; + m_maxsock = s > m_maxsock ? s : m_maxsock; + } + } + // remove Add's that fizzed + while (m_delete.size()) + { + std::list::iterator it = m_delete.begin(); + Socket *p = *it; + p -> OnDelete(); + m_delete.erase(it); + if (p -> DeleteByHandler() +#ifdef ENABLE_DETACH + && !(m_slave ^ p -> IsDetached()) +#endif + ) + { + p -> SetErasedByHandler(); +#ifdef ENABLE_TRIGGERS + bool again = false; + do + { + again = false; + for (std::map::iterator it = m_trigger_src.begin(); it != m_trigger_src.end(); it++) + { + int id = it -> first; + Socket *src = it -> second; + if (src == p) + { + for (std::map::iterator it = m_trigger_dst[id].begin(); it != m_trigger_dst[id].end(); it++) + { + Socket *dst = it -> first; + if (Valid(dst)) + { + dst -> OnCancelled(id); + } + } + m_trigger_src.erase(m_trigger_src.find(id)); + m_trigger_dst.erase(m_trigger_dst.find(id)); + again = true; + break; + } + } + } while (again); +#endif + delete p; + } + } + return n; +} + + +#ifdef ENABLE_RESOLVER +bool SocketHandler::Resolving(Socket *p0) +{ + std::map::iterator it = m_resolve_q.find(p0); + return it != m_resolve_q.end(); +} +#endif + + +bool SocketHandler::Valid(Socket *p0) +{ + for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); it++) + { + Socket *p = it -> second; + if (p0 == p) + return true; + } + return false; +} + + +bool SocketHandler::OkToAccept(Socket *) +{ + return true; +} + + +size_t SocketHandler::GetCount() +{ +/* +printf(" m_sockets : %d\n", m_sockets.size()); +printf(" m_add : %d\n", m_add.size()); +printf(" m_delete : %d\n", m_delete.size()); +*/ + return m_sockets.size() + m_add.size() + m_delete.size(); +} + + +#ifdef ENABLE_SOCKS4 +void SocketHandler::SetSocks4Host(ipaddr_t a) +{ + m_socks4_host = a; +} + + +void SocketHandler::SetSocks4Host(const std::string& host) +{ + Utility::u2ip(host, m_socks4_host); +} + + +void SocketHandler::SetSocks4Port(port_t port) +{ + m_socks4_port = port; +} + + +void SocketHandler::SetSocks4Userid(const std::string& id) +{ + m_socks4_userid = id; +} +#endif + + +#ifdef ENABLE_RESOLVER +int SocketHandler::Resolve(Socket *p,const std::string& host,port_t port) +{ + // check cache + ResolvSocket *resolv = new ResolvSocket(*this, p, host, port); + resolv -> SetId(++m_resolv_id); + resolv -> SetDeleteByHandler(); + ipaddr_t local; + Utility::u2ip("127.0.0.1", local); + if (!resolv -> Open(local, m_resolver_port)) + { + LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL); + } + Add(resolv); + m_resolve_q[p] = true; +DEB( fprintf(stderr, " *** Resolve '%s:%d' id#%d m_resolve_q size: %d p: %p\n", host.c_str(), port, resolv -> GetId(), m_resolve_q.size(), p);) + return resolv -> GetId(); +} + + +#ifdef ENABLE_IPV6 +int SocketHandler::Resolve6(Socket *p,const std::string& host,port_t port) +{ + // check cache + ResolvSocket *resolv = new ResolvSocket(*this, p, host, port, true); + resolv -> SetId(++m_resolv_id); + resolv -> SetDeleteByHandler(); + ipaddr_t local; + Utility::u2ip("127.0.0.1", local); + if (!resolv -> Open(local, m_resolver_port)) + { + LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL); + } + Add(resolv); + m_resolve_q[p] = true; + return resolv -> GetId(); +} +#endif + + +int SocketHandler::Resolve(Socket *p,ipaddr_t a) +{ + // check cache + ResolvSocket *resolv = new ResolvSocket(*this, p, a); + resolv -> SetId(++m_resolv_id); + resolv -> SetDeleteByHandler(); + ipaddr_t local; + Utility::u2ip("127.0.0.1", local); + if (!resolv -> Open(local, m_resolver_port)) + { + LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL); + } + Add(resolv); + m_resolve_q[p] = true; + return resolv -> GetId(); +} + + +#ifdef ENABLE_IPV6 +int SocketHandler::Resolve(Socket *p,in6_addr& a) +{ + // check cache + ResolvSocket *resolv = new ResolvSocket(*this, p, a); + resolv -> SetId(++m_resolv_id); + resolv -> SetDeleteByHandler(); + ipaddr_t local; + Utility::u2ip("127.0.0.1", local); + if (!resolv -> Open(local, m_resolver_port)) + { + LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL); + } + Add(resolv); + m_resolve_q[p] = true; + return resolv -> GetId(); +} +#endif + + +void SocketHandler::EnableResolver(port_t port) +{ + if (!m_resolver) + { + m_resolver_port = port; + m_resolver = new ResolvServer(port); + } +} + + +bool SocketHandler::ResolverReady() +{ + return m_resolver ? m_resolver -> Ready() : false; +} +#endif // ENABLE_RESOLVER + + +#ifdef ENABLE_SOCKS4 +void SocketHandler::SetSocks4TryDirect(bool x) +{ + m_bTryDirect = x; +} + + +ipaddr_t SocketHandler::GetSocks4Host() +{ + return m_socks4_host; +} + + +port_t SocketHandler::GetSocks4Port() +{ + return m_socks4_port; +} + + +const std::string& SocketHandler::GetSocks4Userid() +{ + return m_socks4_userid; +} + + +bool SocketHandler::Socks4TryDirect() +{ + return m_bTryDirect; +} +#endif + + +#ifdef ENABLE_RESOLVER +bool SocketHandler::ResolverEnabled() +{ + return m_resolver ? true : false; +} + + +port_t SocketHandler::GetResolverPort() +{ + return m_resolver_port; +} +#endif // ENABLE_RESOLVER + + +#ifdef ENABLE_POOL +ISocketHandler::PoolSocket *SocketHandler::FindConnection(int type,const std::string& protocol,SocketAddress& ad) +{ + for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end() && m_sockets.size(); it++) + { + PoolSocket *pools = dynamic_cast(it -> second); + if (pools) + { + if (pools -> GetSocketType() == type && + pools -> GetSocketProtocol() == protocol && +// %! pools -> GetClientRemoteAddress() && + *pools -> GetClientRemoteAddress() == ad) + { + m_sockets.erase(it); + pools -> SetRetain(); // avoid Close in Socket destructor + return pools; // Caller is responsible that this socket is deleted + } + } + } + return NULL; +} + + +void SocketHandler::EnablePool(bool x) +{ + m_b_enable_pool = x; +} + + +bool SocketHandler::PoolEnabled() +{ + return m_b_enable_pool; +} +#endif + + +void SocketHandler::Remove(Socket *p) +{ +#ifdef ENABLE_RESOLVER + std::map::iterator it4 = m_resolve_q.find(p); + if (it4 != m_resolve_q.end()) + m_resolve_q.erase(it4); +#endif + if (p -> ErasedByHandler()) + { + return; + } + for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); it++) + { + if (it -> second == p) + { + LogError(p, "Remove", -1, "Socket destructor called while still in use", LOG_LEVEL_WARNING); + m_sockets.erase(it); + return; + } + } + for (socket_m::iterator it2 = m_add.begin(); it2 != m_add.end(); it2++) + { + if ((*it2).second == p) + { + LogError(p, "Remove", -2, "Socket destructor called while still in use", LOG_LEVEL_WARNING); + m_add.erase(it2); + return; + } + } + for (std::list::iterator it3 = m_delete.begin(); it3 != m_delete.end(); it3++) + { + if (*it3 == p) + { + LogError(p, "Remove", -3, "Socket destructor called while still in use", LOG_LEVEL_WARNING); + m_delete.erase(it3); + return; + } + } +} + + +void SocketHandler::CheckSanity() +{ + CheckList(m_fds, "active sockets"); // active sockets + CheckList(m_fds_erase, "sockets to be erased"); // should always be empty anyway + CheckList(m_fds_callonconnect, "checklist CallOnConnect"); +#ifdef ENABLE_DETACH + CheckList(m_fds_detach, "checklist Detach"); +#endif + CheckList(m_fds_timeout, "checklist Timeout"); + CheckList(m_fds_retry, "checklist retry client connect"); + CheckList(m_fds_close, "checklist close and delete"); +} + + +void SocketHandler::CheckList(socket_v& ref,const std::string& listname) +{ + for (socket_v::iterator it = ref.begin(); it != ref.end(); it++) + { + SOCKET s = *it; + if (m_sockets.find(s) != m_sockets.end()) + continue; + if (m_add.find(s) != m_add.end()) + continue; + bool found = false; + for (std::list::iterator it = m_delete.begin(); it != m_delete.end(); it++) + { + Socket *p = *it; + if (p -> GetSocket() == s) + { + found = true; + break; + } + } + if (!found) + { + fprintf(stderr, "CheckList failed for \"%s\": fd %d\n", listname.c_str(), s); + } + } +} + + +void SocketHandler::AddList(SOCKET s,list_t which_one,bool add) +{ + if (s == INVALID_SOCKET) + { +DEB( fprintf(stderr, "AddList: invalid_socket\n");) + return; + } + socket_v& ref = + (which_one == LIST_CALLONCONNECT) ? m_fds_callonconnect : +#ifdef ENABLE_DETACH + (which_one == LIST_DETACH) ? m_fds_detach : +#endif + (which_one == LIST_TIMEOUT) ? m_fds_timeout : + (which_one == LIST_RETRY) ? m_fds_retry : + (which_one == LIST_CLOSE) ? m_fds_close : m_fds_close; + if (add) + { +#ifdef ENABLE_DETACH +DEB( fprintf(stderr, "AddList; %5d: %s: %s\n", s, (which_one == LIST_CALLONCONNECT) ? "CallOnConnect" : + (which_one == LIST_DETACH) ? "Detach" : + (which_one == LIST_TIMEOUT) ? "Timeout" : + (which_one == LIST_RETRY) ? "Retry" : + (which_one == LIST_CLOSE) ? "Close" : "", + add ? "Add" : "Remove");) +#else +DEB( fprintf(stderr, "AddList; %5d: %s: %s\n", s, (which_one == LIST_CALLONCONNECT) ? "CallOnConnect" : + (which_one == LIST_TIMEOUT) ? "Timeout" : + (which_one == LIST_RETRY) ? "Retry" : + (which_one == LIST_CLOSE) ? "Close" : "", + add ? "Add" : "Remove");) +#endif + } + if (add) + { + for (socket_v::iterator it = ref.begin(); it != ref.end(); it++) + { + if (*it == s) // already there + { + return; + } + } + ref.push_back(s); + return; + } + // remove + for (socket_v::iterator it = ref.begin(); it != ref.end(); it++) + { + if (*it == s) + { + ref.erase(it); + break; + } + } +//DEB( fprintf(stderr, "/AddList\n");) +} + + +#ifdef ENABLE_TRIGGERS +int SocketHandler::TriggerID(Socket *src) +{ + int id = m_next_trigger_id++; + m_trigger_src[id] = src; + return id; +} + + +bool SocketHandler::Subscribe(int id, Socket *dst) +{ + if (m_trigger_src.find(id) != m_trigger_src.end()) + { + std::map::iterator it = m_trigger_dst[id].find(dst); + if (it != m_trigger_dst[id].end()) + { + m_trigger_dst[id][dst] = true; + return true; + } + LogError(dst, "Subscribe", id, "Already subscribed", LOG_LEVEL_INFO); + return false; + } + LogError(dst, "Subscribe", id, "Trigger id not found", LOG_LEVEL_INFO); + return false; +} + + +bool SocketHandler::Unsubscribe(int id, Socket *dst) +{ + if (m_trigger_src.find(id) != m_trigger_src.end()) + { + std::map::iterator it = m_trigger_dst[id].find(dst); + if (it != m_trigger_dst[id].end()) + { + m_trigger_dst[id].erase(it); + return true; + } + LogError(dst, "Unsubscribe", id, "Not subscribed", LOG_LEVEL_INFO); + return false; + } + LogError(dst, "Unsubscribe", id, "Trigger id not found", LOG_LEVEL_INFO); + return false; +} + + +void SocketHandler::Trigger(int id, Socket::TriggerData& data, bool erase) +{ + if (m_trigger_src.find(id) != m_trigger_src.end()) + { + data.SetSource( m_trigger_src[id] ); + for (std::map::iterator it = m_trigger_dst[id].begin(); it != m_trigger_dst[id].end(); it++) + { + Socket *dst = it -> first; + if (Valid(dst)) + { + dst -> OnTrigger(id, data); + } + } + if (erase) + { + m_trigger_src.erase(m_trigger_src.find(id)); + m_trigger_dst.erase(m_trigger_dst.find(id)); + } + } + else + { + LogError(NULL, "Trigger", id, "Trigger id not found", LOG_LEVEL_INFO); + } +} +#endif // ENABLE_TRIGGERS + + +#ifdef SOCKETS_NAMESPACE +} +#endif + -- cgit v1.2.3