From 47747b07b5d90d7e90541ed657a03a06a9a3b176 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 25 May 2015 01:21:11 +0200 Subject: try not to loose data when removing client... needs testing!!! --- src/clients.c | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ src/clients.h | 5 ++- 2 files changed, 109 insertions(+), 13 deletions(-) diff --git a/src/clients.c b/src/clients.c index 31a61bf..f59d575 100644 --- a/src/clients.c +++ b/src/clients.c @@ -97,6 +97,7 @@ static int handle_connect(client_t* c, int32_t buffer_size_) log_printf(INFO, "successfully added client %d", c->fd_[0]); c->state_ = CONNECTED; + c->fd_state_[1] = ESTABLISHED; return 0; } @@ -119,6 +120,7 @@ int clients_add(clients_t* list, int fd, const tcp_endpoint_t remote_end, const } element->state_ = CONNECTING; element->fd_[0] = fd; + element->fd_state_[0] = ESTABLISHED; element->fd_[1] = socket(remote_end.addr_.ss_family, SOCK_STREAM, 0); if(element->fd_[1] < 0) { log_printf(INFO, "Error on socket(): %s, not adding client %d", strerror(errno), element->fd_[0]); @@ -126,6 +128,7 @@ int clients_add(clients_t* list, int fd, const tcp_endpoint_t remote_end, const free(element); return -1; } + element->fd_state_[1] = ESTABLISHING; int on = 1; if(setsockopt(element->fd_[0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) || @@ -215,6 +218,7 @@ void clients_print(clients_t* list) switch(c->state_) { case CONNECTING: state = '>'; break; case CONNECTED: state = 'c'; break; + case CLOSING: state = '-'; break; } log_printf(NOTICE, "[%c] client #%d/%d: %lld bytes received, %lld bytes sent", state, c->fd_[0], c->fd_[1], c->transferred_[0], c->transferred_[1]); } @@ -230,14 +234,20 @@ void clients_read_fds(clients_t* list, fd_set* set, int* max_fd) slist_element_t* tmp = list->list_.first_; while(tmp) { client_t* c = (client_t*)tmp->data_; - if(c && c->state_ == CONNECTED) { - if(c->write_buf_offset_[1] < c->write_buf_[1].length_) { + if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) { + if(c->fd_state_[0] != CLOSE_PENDING && c->fd_state_[0] != CLOSE_LINGER + && c->write_buf_offset_[1] < c->write_buf_[1].length_) { FD_SET(c->fd_[0], set); *max_fd = *max_fd > c->fd_[0] ? *max_fd : c->fd_[0]; + + log_printf(DEBUG, "adding %d for READ", c->fd_[0]); } - if(c->write_buf_offset_[0] < c->write_buf_[0].length_) { + if(c->fd_state_[1] != CLOSE_PENDING && c->fd_state_[1] != CLOSE_LINGER + && c->write_buf_offset_[0] < c->write_buf_[0].length_) { FD_SET(c->fd_[1], set); *max_fd = *max_fd > c->fd_[1] ? *max_fd : c->fd_[1]; + + log_printf(DEBUG, "adding %d for READ", c->fd_[1]); } } tmp = tmp->next_; @@ -252,23 +262,76 @@ void clients_write_fds(clients_t* list, fd_set* set, int* max_fd) slist_element_t* tmp = list->list_.first_; while(tmp) { client_t* c = (client_t*)tmp->data_; - if(c && c->state_ == CONNECTED) { - if(c->write_buf_offset_[0]) { + if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) { + if((c->fd_state_[0] == ESTABLISHED || c->fd_state_[0] == CLOSE_PENDING || c->fd_state_[0] == FIN_PENDING) + && c->write_buf_offset_[0]) { FD_SET(c->fd_[0], set); *max_fd = *max_fd > c->fd_[0] ? *max_fd : c->fd_[0]; + + log_printf(DEBUG, "adding %d for WRITE", c->fd_[0]); } - if(c->write_buf_offset_[1]) { + if((c->fd_state_[1] == ESTABLISHED || c->fd_state_[1] == CLOSE_PENDING || c->fd_state_[1] == FIN_PENDING) + && c->write_buf_offset_[1]) { FD_SET(c->fd_[1], set); *max_fd = *max_fd > c->fd_[1] ? *max_fd : c->fd_[1]; + + log_printf(DEBUG, "adding %d for WRITE", c->fd_[1]); } } else if(c && c->state_ == CONNECTING) { FD_SET(c->fd_[1], set); *max_fd = *max_fd > c->fd_[1] ? *max_fd : c->fd_[1]; + + log_printf(DEBUG, "adding %d for WRITE (connecting)", c->fd_[1]); } tmp = tmp->next_; } } +static char* client_fd_state_to_string(client_fd_state_t s) +{ + switch(s) { + case ESTABLISHING: return "establishing"; + case ESTABLISHED: return "established"; + case FIN_PENDING: return "FIN pending"; + case FIN_LINGER: return "FIN linger"; + case CLOSE_PENDING: return "CLOSE pending"; + case CLOSE_LINGER: return "CLOSE linger"; + } +} + +static int client_handle_close(client_t* c, int in, int out) +{ + log_printf(DEBUG, "client %d: recv(%d) returned 0, %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], c->fd_[in], + c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]), + c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1])); + + c->state_ = CLOSING; + c->fd_state_[in] = (c->write_buf_offset_[in]) ? CLOSE_PENDING : CLOSE_LINGER; + + switch(c->fd_state_[out]) { + case ESTABLISHING: return 1; // this will never be reached - remove client for now > assert() this in future?? + case ESTABLISHED: { + if(c->write_buf_offset_[out]) { + c->fd_state_[out] = FIN_PENDING; + } else { + c->fd_state_[out] = FIN_LINGER; + shutdown(c->fd_[out], SHUT_WR); + } + break; + } + case FIN_PENDING: log_printf(ERROR, "client %d: socket %d is already in FIN_PENDING state???", c->fd_[0], c->fd_[out]); break; + case FIN_LINGER: log_printf(ERROR, "client %d: socket %d is already in FIN_LINGER state???", c->fd_[0], c->fd_[out]); break; + case CLOSE_PENDING: log_printf(DEBUG, "client %d: socket %d has data pending - removal postponed", c->fd_[0], c->fd_[out]); break; + case CLOSE_LINGER: log_printf(INFO, "client %d: both connections are closed now - removing it", c->fd_[0]); return 1; + } + + log_printf(DEBUG, "client %d is now %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], + c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]), + c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1])); + + return 0; +} + int clients_read(clients_t* list, fd_set* set) { if(!list) @@ -278,7 +341,7 @@ int clients_read(clients_t* list, fd_set* set) while(tmp) { client_t* c = (client_t*)tmp->data_; tmp = tmp->next_; - if(c && c->state_ == CONNECTED) { + if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) { int i; for(i=0; i<2; ++i) { int in, out; @@ -288,16 +351,19 @@ int clients_read(clients_t* list, fd_set* set) } else continue; + log_printf(DEBUG, "calling recv(%d)", c->fd_[in]); int len = recv(c->fd_[in], &(c->write_buf_[out].buf_[c->write_buf_offset_[out]]), c->write_buf_[out].length_ - c->write_buf_offset_[out], 0); if(len < 0) { + // TODO: the other socket might still have data pending.... log_printf(INFO, "Error on recv(): %s, removing client %d", strerror(errno), c->fd_[0]); slist_remove(&(list->list_), c); break; } else if(!len) { - log_printf(INFO, "client %d closed connection, removing it", c->fd_[0]); - slist_remove(&(list->list_), c); - break; + if(client_handle_close(c, in, out)) { + slist_remove(&(list->list_), c); + break; + } } else c->write_buf_offset_[out] += len; @@ -308,6 +374,26 @@ int clients_read(clients_t* list, fd_set* set) return 0; } +static int client_handle_pending(client_t* c, int i) +{ + log_printf(DEBUG, "client %d: cleared write buffer[%d], %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], c->fd_[i], + c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]), + c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1])); + + if(c->fd_state_[i] == CLOSE_PENDING) { + c->fd_state_[i] = CLOSE_LINGER; + } else { + c->fd_state_[i] = FIN_LINGER; + } + shutdown(c->fd_[i], SHUT_WR); + + log_printf(DEBUG, "client %d is now %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], c->fd_[i], + c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]), + c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1])); + + return 0; +} + int clients_write(clients_t* list, fd_set* set) { if(!list) @@ -317,12 +403,15 @@ int clients_write(clients_t* list, fd_set* set) while(tmp) { client_t* c = (client_t*)tmp->data_; tmp = tmp->next_; - if(c && c->state_ == CONNECTED) { + if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) { int i; for(i=0; i<2; ++i) { if(FD_ISSET(c->fd_[i], set)) { + log_printf(DEBUG, "calling send(%d)", c->fd_[i]); + int len = send(c->fd_[i], c->write_buf_[i].buf_, c->write_buf_offset_[i], 0); if(len < 0) { + // TODO: the other socket might still have data pending.... log_printf(INFO, "Error on send(): %s, removing client %d", strerror(errno), c->fd_[0]); slist_remove(&(list->list_), c); break; @@ -333,8 +422,12 @@ int clients_write(clients_t* list, fd_set* set) memmove(c->write_buf_[i].buf_, &c->write_buf_[i].buf_[len], c->write_buf_offset_[i] - len); c->write_buf_offset_[i] -= len; } - else + else { c->write_buf_offset_[i] = 0; + if(c->fd_state_[i] == CLOSE_PENDING || c->fd_state_[i] == FIN_PENDING) + if(client_handle_pending(c, i)) + slist_remove(&(list->list_), c); + } } } } diff --git a/src/clients.h b/src/clients.h index 0c3206a..ce0f371 100644 --- a/src/clients.h +++ b/src/clients.h @@ -35,11 +35,14 @@ #define BUFFER_LENGTH 102400 -enum client_state_enum { CONNECTING, CONNECTED }; +enum client_state_enum { CONNECTING, CONNECTED, CLOSING }; typedef enum client_state_enum client_state_t; +enum client_fd_state_enum { ESTABLISHING, ESTABLISHED, FIN_PENDING, FIN_LINGER, CLOSE_PENDING, CLOSE_LINGER }; +typedef enum client_fd_state_enum client_fd_state_t; typedef struct { int fd_[2]; + client_fd_state_t fd_state_[2]; buffer_t write_buf_[2]; u_int32_t write_buf_offset_[2]; client_state_t state_; -- cgit v1.2.3