summaryrefslogtreecommitdiff
path: root/src/streamer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/streamer.c')
-rw-r--r--src/streamer.c50
1 files changed, 26 insertions, 24 deletions
diff --git a/src/streamer.c b/src/streamer.c
index 4475562..1d49fcb 100644
--- a/src/streamer.c
+++ b/src/streamer.c
@@ -145,28 +145,25 @@ static void add_fd(streamer_t* streamer, int fd)
g_signal_emit_by_name(G_OBJECT(streamer->sink_), "add", fd, NULL);
}
+/*
static void remove_fd(streamer_t* streamer, int fd)
{
log_printf(INFO, "removing fd %d from fdsink", fd);
g_signal_emit_by_name(G_OBJECT(streamer->sink_), "remove-flush", fd, NULL);
}
+*/
-struct client_struct {
- int fd_;
- streamer_t* streamer_;
-};
-
-static gpointer client_thread_func(gpointer data)
+void client_thread_func(gpointer data, gpointer user_data)
{
- struct client_struct *client = (struct client_struct*)data;
+ streamer_t *streamer = (streamer_t*)user_data;
+ int fd = *((int*)user_data);
char buf;
int nlcnt = 0;
for(;;) {
- int len = recv(client->fd_, &buf, 1, 0);
+ int len = recv(fd, &buf, 1, 0);
if(len!=1) {
- free(client);
- return NULL;
+ return;
}
if(buf == '\n') nlcnt++;
@@ -179,10 +176,9 @@ static gpointer client_thread_func(gpointer data)
char* answer = "HTTP/1.0 200 OK\n\n";
int written = 0;
for(;;) {
- int len = send(client->fd_, &answer[written], strlen(answer) - written, 0);
+ int len = send(fd, &answer[written], strlen(answer) - written, 0);
if(len < 0) {
- free(client);
- return NULL;
+ return;
}
written+=len;
@@ -190,9 +186,7 @@ static gpointer client_thread_func(gpointer data)
break;
}
- add_fd(client->streamer_, client->fd_);
- free(client);
- return NULL;
+ add_fd(streamer, fd);
}
static gpointer streamer_thread_func(gpointer data)
@@ -200,25 +194,25 @@ static gpointer streamer_thread_func(gpointer data)
streamer_t *streamer = (streamer_t*)data;
log_printf(NOTICE, "streamer thread started");
- GstBuffer* buf = NULL;
struct sockaddr_in remote_addr;
memset (&remote_addr, 0, sizeof(remote_addr));
for(;;) {
- int alen=sizeof(remote_addr);
+ socklen_t alen=sizeof(remote_addr);
int new_client = accept(streamer->fd_, (struct sockaddr *)&remote_addr, &alen);
if(new_client==-1) {
log_printf(ERROR, "accept() call failed");
break;
}
log_printf(INFO, "streamer: new connection %s:%d (fd=%d)", inet_ntoa(remote_addr.sin_addr), ntohs(remote_addr.sin_port), new_client);
- struct client_struct* client = malloc(sizeof(struct client_struct));
- if(!client) {
+
+ int *client_fd = malloc(sizeof(int));
+ if (!new_client) {
log_printf(ERROR, "streamer: memory error");
break;
}
- client->fd_ = new_client;
- client->streamer_ = streamer;
- g_thread_create(client_thread_func, client, FALSE, NULL);
+
+ *client_fd = new_client;
+ g_thread_pool_push(streamer->client_pool_, client_fd, NULL);
}
log_printf(NOTICE, "streamer thread stopped");
@@ -231,12 +225,18 @@ int streamer_start(streamer_t* streamer)
if(!streamer)
return -1;
- streamer->thread_ = g_thread_create(streamer_thread_func, streamer, TRUE, NULL);
+ streamer->thread_ = g_thread_new("streamer", streamer_thread_func, streamer);
if(!streamer->thread_) {
log_printf(ERROR, "streamer thread could not be started");
return -1;
}
+ streamer->client_pool_ = g_thread_pool_new (client_thread_func, streamer, 8, TRUE, NULL);
+ if(!streamer->client_pool_) {
+ log_printf(ERROR, "streamer thread could not be started");
+ return -1;
+ }
+
return 0;
}
@@ -247,6 +247,8 @@ void streamer_stop(streamer_t* streamer)
shutdown(streamer->fd_, SHUT_RDWR);
+ g_thread_pool_free(streamer->client_pool_, TRUE, TRUE);
+
if(streamer->thread_) {
log_printf(NOTICE, "waiting for streamer thread to stop");
g_thread_join(streamer->thread_);