summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2015-05-25 01:21:11 +0200
committerChristian Pointner <equinox@spreadspace.org>2015-05-25 01:21:19 +0200
commit47747b07b5d90d7e90541ed657a03a06a9a3b176 (patch)
tree1110048e00ba3a155e9938034fd618a4b5b4a873
parentupdated changelog for release (diff)
try not to loose data when removing client... needs testing!!!
-rw-r--r--src/clients.c117
-rw-r--r--src/clients.h5
2 files changed, 109 insertions, 13 deletions
diff --git a/src/clients.c b/src/clients.c
index 31a61bf..f59d575 100644
--- a/src/clients.c
+++ b/src/clients.c
@@ -97,6 +97,7 @@ static int handle_connect(client_t* c, int32_t buffer_size_)
log_printf(INFO, "successfully added client %d", c->fd_[0]);
c->state_ = CONNECTED;
+ c->fd_state_[1] = ESTABLISHED;
return 0;
}
@@ -119,6 +120,7 @@ int clients_add(clients_t* list, int fd, const tcp_endpoint_t remote_end, const
}
element->state_ = CONNECTING;
element->fd_[0] = fd;
+ element->fd_state_[0] = ESTABLISHED;
element->fd_[1] = socket(remote_end.addr_.ss_family, SOCK_STREAM, 0);
if(element->fd_[1] < 0) {
log_printf(INFO, "Error on socket(): %s, not adding client %d", strerror(errno), element->fd_[0]);
@@ -126,6 +128,7 @@ int clients_add(clients_t* list, int fd, const tcp_endpoint_t remote_end, const
free(element);
return -1;
}
+ element->fd_state_[1] = ESTABLISHING;
int on = 1;
if(setsockopt(element->fd_[0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) ||
@@ -215,6 +218,7 @@ void clients_print(clients_t* list)
switch(c->state_) {
case CONNECTING: state = '>'; break;
case CONNECTED: state = 'c'; break;
+ case CLOSING: state = '-'; break;
}
log_printf(NOTICE, "[%c] client #%d/%d: %lld bytes received, %lld bytes sent", state, c->fd_[0], c->fd_[1], c->transferred_[0], c->transferred_[1]);
}
@@ -230,14 +234,20 @@ void clients_read_fds(clients_t* list, fd_set* set, int* max_fd)
slist_element_t* tmp = list->list_.first_;
while(tmp) {
client_t* c = (client_t*)tmp->data_;
- if(c && c->state_ == CONNECTED) {
- if(c->write_buf_offset_[1] < c->write_buf_[1].length_) {
+ if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) {
+ if(c->fd_state_[0] != CLOSE_PENDING && c->fd_state_[0] != CLOSE_LINGER
+ && c->write_buf_offset_[1] < c->write_buf_[1].length_) {
FD_SET(c->fd_[0], set);
*max_fd = *max_fd > c->fd_[0] ? *max_fd : c->fd_[0];
+
+ log_printf(DEBUG, "adding %d for READ", c->fd_[0]);
}
- if(c->write_buf_offset_[0] < c->write_buf_[0].length_) {
+ if(c->fd_state_[1] != CLOSE_PENDING && c->fd_state_[1] != CLOSE_LINGER
+ && c->write_buf_offset_[0] < c->write_buf_[0].length_) {
FD_SET(c->fd_[1], set);
*max_fd = *max_fd > c->fd_[1] ? *max_fd : c->fd_[1];
+
+ log_printf(DEBUG, "adding %d for READ", c->fd_[1]);
}
}
tmp = tmp->next_;
@@ -252,23 +262,76 @@ void clients_write_fds(clients_t* list, fd_set* set, int* max_fd)
slist_element_t* tmp = list->list_.first_;
while(tmp) {
client_t* c = (client_t*)tmp->data_;
- if(c && c->state_ == CONNECTED) {
- if(c->write_buf_offset_[0]) {
+ if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) {
+ if((c->fd_state_[0] == ESTABLISHED || c->fd_state_[0] == CLOSE_PENDING || c->fd_state_[0] == FIN_PENDING)
+ && c->write_buf_offset_[0]) {
FD_SET(c->fd_[0], set);
*max_fd = *max_fd > c->fd_[0] ? *max_fd : c->fd_[0];
+
+ log_printf(DEBUG, "adding %d for WRITE", c->fd_[0]);
}
- if(c->write_buf_offset_[1]) {
+ if((c->fd_state_[1] == ESTABLISHED || c->fd_state_[1] == CLOSE_PENDING || c->fd_state_[1] == FIN_PENDING)
+ && c->write_buf_offset_[1]) {
FD_SET(c->fd_[1], set);
*max_fd = *max_fd > c->fd_[1] ? *max_fd : c->fd_[1];
+
+ log_printf(DEBUG, "adding %d for WRITE", c->fd_[1]);
}
} else if(c && c->state_ == CONNECTING) {
FD_SET(c->fd_[1], set);
*max_fd = *max_fd > c->fd_[1] ? *max_fd : c->fd_[1];
+
+ log_printf(DEBUG, "adding %d for WRITE (connecting)", c->fd_[1]);
}
tmp = tmp->next_;
}
}
+static char* client_fd_state_to_string(client_fd_state_t s)
+{
+ switch(s) {
+ case ESTABLISHING: return "establishing";
+ case ESTABLISHED: return "established";
+ case FIN_PENDING: return "FIN pending";
+ case FIN_LINGER: return "FIN linger";
+ case CLOSE_PENDING: return "CLOSE pending";
+ case CLOSE_LINGER: return "CLOSE linger";
+ }
+}
+
+static int client_handle_close(client_t* c, int in, int out)
+{
+ log_printf(DEBUG, "client %d: recv(%d) returned 0, %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], c->fd_[in],
+ c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]),
+ c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1]));
+
+ c->state_ = CLOSING;
+ c->fd_state_[in] = (c->write_buf_offset_[in]) ? CLOSE_PENDING : CLOSE_LINGER;
+
+ switch(c->fd_state_[out]) {
+ case ESTABLISHING: return 1; // this will never be reached - remove client for now > assert() this in future??
+ case ESTABLISHED: {
+ if(c->write_buf_offset_[out]) {
+ c->fd_state_[out] = FIN_PENDING;
+ } else {
+ c->fd_state_[out] = FIN_LINGER;
+ shutdown(c->fd_[out], SHUT_WR);
+ }
+ break;
+ }
+ case FIN_PENDING: log_printf(ERROR, "client %d: socket %d is already in FIN_PENDING state???", c->fd_[0], c->fd_[out]); break;
+ case FIN_LINGER: log_printf(ERROR, "client %d: socket %d is already in FIN_LINGER state???", c->fd_[0], c->fd_[out]); break;
+ case CLOSE_PENDING: log_printf(DEBUG, "client %d: socket %d has data pending - removal postponed", c->fd_[0], c->fd_[out]); break;
+ case CLOSE_LINGER: log_printf(INFO, "client %d: both connections are closed now - removing it", c->fd_[0]); return 1;
+ }
+
+ log_printf(DEBUG, "client %d is now %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0],
+ c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]),
+ c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1]));
+
+ return 0;
+}
+
int clients_read(clients_t* list, fd_set* set)
{
if(!list)
@@ -278,7 +341,7 @@ int clients_read(clients_t* list, fd_set* set)
while(tmp) {
client_t* c = (client_t*)tmp->data_;
tmp = tmp->next_;
- if(c && c->state_ == CONNECTED) {
+ if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) {
int i;
for(i=0; i<2; ++i) {
int in, out;
@@ -288,16 +351,19 @@ int clients_read(clients_t* list, fd_set* set)
}
else continue;
+ log_printf(DEBUG, "calling recv(%d)", c->fd_[in]);
int len = recv(c->fd_[in], &(c->write_buf_[out].buf_[c->write_buf_offset_[out]]), c->write_buf_[out].length_ - c->write_buf_offset_[out], 0);
if(len < 0) {
+ // TODO: the other socket might still have data pending....
log_printf(INFO, "Error on recv(): %s, removing client %d", strerror(errno), c->fd_[0]);
slist_remove(&(list->list_), c);
break;
}
else if(!len) {
- log_printf(INFO, "client %d closed connection, removing it", c->fd_[0]);
- slist_remove(&(list->list_), c);
- break;
+ if(client_handle_close(c, in, out)) {
+ slist_remove(&(list->list_), c);
+ break;
+ }
}
else
c->write_buf_offset_[out] += len;
@@ -308,6 +374,26 @@ int clients_read(clients_t* list, fd_set* set)
return 0;
}
+static int client_handle_pending(client_t* c, int i)
+{
+ log_printf(DEBUG, "client %d: cleared write buffer[%d], %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], c->fd_[i],
+ c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]),
+ c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1]));
+
+ if(c->fd_state_[i] == CLOSE_PENDING) {
+ c->fd_state_[i] = CLOSE_LINGER;
+ } else {
+ c->fd_state_[i] = FIN_LINGER;
+ }
+ shutdown(c->fd_[i], SHUT_WR);
+
+ log_printf(DEBUG, "client %d is now %d: bytes=%d state=%s, %d: bytes=%d state=%s", c->fd_[0], c->fd_[i],
+ c->fd_[0], c->write_buf_offset_[0], client_fd_state_to_string(c->fd_state_[0]),
+ c->fd_[1], c->write_buf_offset_[1], client_fd_state_to_string(c->fd_state_[1]));
+
+ return 0;
+}
+
int clients_write(clients_t* list, fd_set* set)
{
if(!list)
@@ -317,12 +403,15 @@ int clients_write(clients_t* list, fd_set* set)
while(tmp) {
client_t* c = (client_t*)tmp->data_;
tmp = tmp->next_;
- if(c && c->state_ == CONNECTED) {
+ if(c && (c->state_ == CONNECTED || c->state_ == CLOSING)) {
int i;
for(i=0; i<2; ++i) {
if(FD_ISSET(c->fd_[i], set)) {
+ log_printf(DEBUG, "calling send(%d)", c->fd_[i]);
+
int len = send(c->fd_[i], c->write_buf_[i].buf_, c->write_buf_offset_[i], 0);
if(len < 0) {
+ // TODO: the other socket might still have data pending....
log_printf(INFO, "Error on send(): %s, removing client %d", strerror(errno), c->fd_[0]);
slist_remove(&(list->list_), c);
break;
@@ -333,8 +422,12 @@ int clients_write(clients_t* list, fd_set* set)
memmove(c->write_buf_[i].buf_, &c->write_buf_[i].buf_[len], c->write_buf_offset_[i] - len);
c->write_buf_offset_[i] -= len;
}
- else
+ else {
c->write_buf_offset_[i] = 0;
+ if(c->fd_state_[i] == CLOSE_PENDING || c->fd_state_[i] == FIN_PENDING)
+ if(client_handle_pending(c, i))
+ slist_remove(&(list->list_), c);
+ }
}
}
}
diff --git a/src/clients.h b/src/clients.h
index 0c3206a..ce0f371 100644
--- a/src/clients.h
+++ b/src/clients.h
@@ -35,11 +35,14 @@
#define BUFFER_LENGTH 102400
-enum client_state_enum { CONNECTING, CONNECTED };
+enum client_state_enum { CONNECTING, CONNECTED, CLOSING };
typedef enum client_state_enum client_state_t;
+enum client_fd_state_enum { ESTABLISHING, ESTABLISHED, FIN_PENDING, FIN_LINGER, CLOSE_PENDING, CLOSE_LINGER };
+typedef enum client_fd_state_enum client_fd_state_t;
typedef struct {
int fd_[2];
+ client_fd_state_t fd_state_[2];
buffer_t write_buf_[2];
u_int32_t write_buf_offset_[2];
client_state_t state_;