diff options
author | Christian Pointner <equinox@spreadspace.org> | 2016-05-25 22:54:21 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2016-05-25 22:54:21 +0200 |
commit | 0c4e68d5afb47b2e378cf782f289c328c05a11d3 (patch) | |
tree | bb761f3e6509bd52fcecf58ec4a6c5d3dfa4e37f /src/streamer.c | |
parent | updated copyright info (diff) |
major cleanup and refactoring
Diffstat (limited to 'src/streamer.c')
-rw-r--r-- | src/streamer.c | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/src/streamer.c b/src/streamer.c index 4475562..1d49fcb 100644 --- a/src/streamer.c +++ b/src/streamer.c @@ -145,28 +145,25 @@ static void add_fd(streamer_t* streamer, int fd) g_signal_emit_by_name(G_OBJECT(streamer->sink_), "add", fd, NULL); } +/* static void remove_fd(streamer_t* streamer, int fd) { log_printf(INFO, "removing fd %d from fdsink", fd); g_signal_emit_by_name(G_OBJECT(streamer->sink_), "remove-flush", fd, NULL); } +*/ -struct client_struct { - int fd_; - streamer_t* streamer_; -}; - -static gpointer client_thread_func(gpointer data) +void client_thread_func(gpointer data, gpointer user_data) { - struct client_struct *client = (struct client_struct*)data; + streamer_t *streamer = (streamer_t*)user_data; + int fd = *((int*)user_data); char buf; int nlcnt = 0; for(;;) { - int len = recv(client->fd_, &buf, 1, 0); + int len = recv(fd, &buf, 1, 0); if(len!=1) { - free(client); - return NULL; + return; } if(buf == '\n') nlcnt++; @@ -179,10 +176,9 @@ static gpointer client_thread_func(gpointer data) char* answer = "HTTP/1.0 200 OK\n\n"; int written = 0; for(;;) { - int len = send(client->fd_, &answer[written], strlen(answer) - written, 0); + int len = send(fd, &answer[written], strlen(answer) - written, 0); if(len < 0) { - free(client); - return NULL; + return; } written+=len; @@ -190,9 +186,7 @@ static gpointer client_thread_func(gpointer data) break; } - add_fd(client->streamer_, client->fd_); - free(client); - return NULL; + add_fd(streamer, fd); } static gpointer streamer_thread_func(gpointer data) @@ -200,25 +194,25 @@ static gpointer streamer_thread_func(gpointer data) streamer_t *streamer = (streamer_t*)data; log_printf(NOTICE, "streamer thread started"); - GstBuffer* buf = NULL; struct sockaddr_in remote_addr; memset (&remote_addr, 0, sizeof(remote_addr)); for(;;) { - int alen=sizeof(remote_addr); + socklen_t alen=sizeof(remote_addr); int new_client = accept(streamer->fd_, (struct sockaddr *)&remote_addr, &alen); if(new_client==-1) { log_printf(ERROR, "accept() call failed"); break; } log_printf(INFO, "streamer: new connection %s:%d (fd=%d)", inet_ntoa(remote_addr.sin_addr), ntohs(remote_addr.sin_port), new_client); - struct client_struct* client = malloc(sizeof(struct client_struct)); - if(!client) { + + int *client_fd = malloc(sizeof(int)); + if (!new_client) { log_printf(ERROR, "streamer: memory error"); break; } - client->fd_ = new_client; - client->streamer_ = streamer; - g_thread_create(client_thread_func, client, FALSE, NULL); + + *client_fd = new_client; + g_thread_pool_push(streamer->client_pool_, client_fd, NULL); } log_printf(NOTICE, "streamer thread stopped"); @@ -231,12 +225,18 @@ int streamer_start(streamer_t* streamer) if(!streamer) return -1; - streamer->thread_ = g_thread_create(streamer_thread_func, streamer, TRUE, NULL); + streamer->thread_ = g_thread_new("streamer", streamer_thread_func, streamer); if(!streamer->thread_) { log_printf(ERROR, "streamer thread could not be started"); return -1; } + streamer->client_pool_ = g_thread_pool_new (client_thread_func, streamer, 8, TRUE, NULL); + if(!streamer->client_pool_) { + log_printf(ERROR, "streamer thread could not be started"); + return -1; + } + return 0; } @@ -247,6 +247,8 @@ void streamer_stop(streamer_t* streamer) shutdown(streamer->fd_, SHUT_RDWR); + g_thread_pool_free(streamer->client_pool_, TRUE, TRUE); + if(streamer->thread_) { log_printf(NOTICE, "waiting for streamer thread to stop"); g_thread_join(streamer->thread_); |