/* * gstdvbbackend * * gstdvbbackend is a small programm which captures a given set of dvb * channels from one dvb device and provides the streams via minimal http. * * * Copyright (C) 2011 Christian Pointner * * This file is part of gstdvbbackend. * * gstdvbbackend is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * gstdvbbackend is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with gstdvbbackend. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include "streamer.h" #include "datatypes.h" #include "log.h" static void added_cb(GstElement* sink, gint fd, gpointer data) { gint num_fds; // g_object_get(G_OBJECT(sink), "num_fds", &num_fds, NULL); // log_printf(INFO, "fdsink: successfully added client %d (sink has now %d fds)", fd, num_fds); log_printf(INFO, "fdsink: successfully added client %d", fd); } static void removed_cb(GstElement* sink, gint fd, gpointer data) { gint num_fds; // g_object_get(G_OBJECT(sink), "num_fds", &num_fds, NULL); // log_printf(INFO, "fdsink: successfully removed client %d (sink has now %d fds)", fd, num_fds); log_printf(INFO, "fdsink: successfully removed client %d", fd); } static void fdremoved_cb(GstElement* sink, gint fd, gpointer data) { gint num_fds; // g_object_get(G_OBJECT(sink), "num_fds", &num_fds, NULL); // log_printf(INFO, "fdsink: successfully removed fd %d (sink has now %d fds)", fd, num_fds); log_printf(INFO, "fdsink: successfully removed fd %d", fd); } int init_server(const char* host, const char* port) { int server = socket(AF_INET, SOCK_STREAM, 0); if(server < 0) { log_printf(ERROR, "streamer: socket() call failed"); return -1; } // TODO: resolve host and port struct sockaddr_in local_addr; memset((char *) &local_addr, 0, sizeof(local_addr)); local_addr.sin_family = AF_INET; local_addr.sin_port = htons(9001); local_addr.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; if (setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { log_printf(ERROR, "streamer: setsockopt(SO_REUSEADDR) failed"); return -1; } if(bind(server, (struct sockaddr *)&local_addr, sizeof(local_addr))==-1) { log_printf(ERROR, "streamer: bind() call failed"); return -1; } if(listen(server, 5)==-1) { log_printf(ERROR, "streamer: listen() call failed"); return -1; } return server; } int streamer_init(streamer_t* streamer, GMainLoop *loop, const char* host, const char* port) { if(!streamer) return -1; streamer->fd_ = init_server(host, port); if(streamer->fd_ < 0) { return streamer->fd_; } streamer->loop_ = loop; streamer->sink_ = gst_element_factory_make("multifdsink", "streamer"); if(!streamer->sink_) { log_printf(ERROR, "the streamer object could not be created. Exiting."); return -1; } // TODO: how the heck should we get the right value? 3 means keyframe... g_object_set(G_OBJECT(streamer->sink_), "recover-policy", 3, NULL); g_signal_connect(G_OBJECT(streamer->sink_), "client-added", G_CALLBACK(added_cb), streamer); g_signal_connect(G_OBJECT(streamer->sink_), "client-removed", G_CALLBACK(removed_cb), streamer); g_signal_connect(G_OBJECT(streamer->sink_), "client-fd-removed", G_CALLBACK(fdremoved_cb), streamer); streamer->thread_ = NULL; return 0; } static void add_fd(streamer_t* streamer, int fd) { log_printf(INFO, "adding fd %d to fdsink", fd); g_signal_emit_by_name(G_OBJECT(streamer->sink_), "add", fd, NULL); } static void remove_fd(streamer_t* streamer, int fd) { log_printf(INFO, "removing fd %d from fdsink", fd); g_signal_emit_by_name(G_OBJECT(streamer->sink_), "remove-flush", fd, NULL); } struct client_struct { int fd_; streamer_t* streamer_; }; static gpointer client_thread_func(gpointer data) { struct client_struct *client = (struct client_struct*)data; char buf; int nlcnt = 0; for(;;) { int len = recv(client->fd_, &buf, 1, 0); if(len!=1) { free(client); return NULL; } if(buf == '\n') nlcnt++; else nlcnt = 0; if(nlcnt >= 2) break; } char* answer = "HTTP/1.0 200 OK\n\n"; int written = 0; for(;;) { written = send(client->fd_, &answer[written], sizeof(answer) - written, 0); if(written == sizeof(answer)) break; } add_fd(client->streamer_, client->fd_); free(client); return NULL; } static gpointer streamer_thread_func(gpointer data) { streamer_t *streamer = (streamer_t*)data; log_printf(NOTICE, "streamer thread started"); GstBuffer* buf = NULL; struct sockaddr_in remote_addr; memset (&remote_addr, 0, sizeof(remote_addr)); for(;;) { int alen=sizeof(remote_addr); int new_client = accept(streamer->fd_, (struct sockaddr *)&remote_addr, &alen); if(new_client==-1) { log_printf(ERROR, "accept() call failed"); break; } log_printf(INFO, "streamer: new connection %s:%d (fd=%d)", inet_ntoa(remote_addr.sin_addr), ntohs(remote_addr.sin_port), new_client); struct client_struct* client = malloc(sizeof(struct client_struct)); if(!client) { log_printf(ERROR, "streamer: memory error"); break; } client->fd_ = new_client; client->streamer_ = streamer; g_thread_create(client_thread_func, client, FALSE, NULL); } log_printf(NOTICE, "streamer thread stopped"); g_main_loop_quit(streamer->loop_); return NULL; } int streamer_start(streamer_t* streamer) { if(!streamer) return -1; streamer->thread_ = g_thread_create(streamer_thread_func, streamer, TRUE, NULL); if(!streamer->thread_) { log_printf(ERROR, "streamer thread could not be started"); return -1; } return 0; } void streamer_stop(streamer_t* streamer) { if(!streamer) return; shutdown(streamer->fd_, SHUT_RDWR); if(streamer->thread_) { log_printf(NOTICE, "waiting for streamer thread to stop"); g_thread_join(streamer->thread_); } close(streamer->fd_); }