/* * gstdvbbackend * * gstdvbbackend is a small programm which captures a given set of dvb * channels from one dvb device and provides the streams via minimal http. * * * Copyright (C) 2011-2016 Christian Pointner * * This file is part of gstdvbbackend. * * gstdvbbackend is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * gstdvbbackend is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with gstdvbbackend. If not, see . * * In addition, as a special exception, the copyright holders hereby * grant permission for non-GPL-compatible GStreamer plugins to be used * and distributed together with GStreamer and gstdvbbackend. * This permission goes above and beyond the permissions granted by the * GPL license gstdvbbackend is covered by. */ #include #include #include #include #include #include #include #include #include #include #include #include "streamer.h" #include "datatypes.h" #include "log.h" static void added_cb(GstElement* sink, gint fd, gpointer data) { gint num_fds; g_object_get(G_OBJECT(sink), "num_fds", &num_fds, NULL); log_printf(INFO, "fdsink: successfully added client %d (sink has now %d fds)", fd, num_fds); } static void removed_cb(GstElement* sink, gint fd, gpointer data) { gint num_fds; g_object_get(G_OBJECT(sink), "num_fds", &num_fds, NULL); log_printf(INFO, "fdsink: successfully removed client %d (sink has now %d fds)", fd, num_fds); } static void fdremoved_cb(GstElement* sink, gint fd, gpointer data) { gint num_fds; g_object_get(G_OBJECT(sink), "num_fds", &num_fds, NULL); log_printf(INFO, "fdsink: successfully removed fd %d (sink has now %d fds)", fd, num_fds); } int init_server(const char* host, const char* port) { struct addrinfo hints, *res; res = NULL; memset (&hints, 0, sizeof (hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; hints.ai_family = AF_UNSPEC; int errcode = getaddrinfo(host, port, &hints, &res); if (errcode != 0) { log_printf(ERROR, "Error resolving address (%s:%s): %s", (host) ? host : "*", (port) ? port : "0", gai_strerror(errcode)); return -1; } if(!res) { log_printf(ERROR, "getaddrinfo returned no address for %s:%s", (host) ? host : "*", (port) ? port : "0"); return -1; } struct sockaddr_in local_addr; memset((char *) &local_addr, 0, sizeof(local_addr)); memcpy(&local_addr, res->ai_addr, res->ai_addrlen); freeaddrinfo(res); int server = socket(AF_INET, SOCK_STREAM, 0); if(server < 0) { log_printf(ERROR, "streamer: socket() call failed"); return -1; } int on = 1; if (setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { log_printf(ERROR, "streamer: setsockopt(SO_REUSEADDR) failed"); return -1; } if(bind(server, (struct sockaddr *)&local_addr, sizeof(local_addr))==-1) { log_printf(ERROR, "streamer: bind() call failed"); return -1; } if(listen(server, 5)==-1) { log_printf(ERROR, "streamer: listen() call failed"); return -1; } return server; } int streamer_init(streamer_t* streamer, GMainLoop *loop, const char* host, const char* port) { if(!streamer) return -1; streamer->fd_ = init_server(host, port); if(streamer->fd_ < 0) { return streamer->fd_; } streamer->loop_ = loop; streamer->sink_ = gst_element_factory_make("multifdsink", "streamer"); if(!streamer->sink_) { log_printf(ERROR, "the streamer object could not be created. Exiting."); return -1; } // TODO: how the heck should we get the right value? 3 means keyframe... g_object_set(G_OBJECT(streamer->sink_), "recover-policy", 3, NULL); g_signal_connect(G_OBJECT(streamer->sink_), "client-added", G_CALLBACK(added_cb), streamer); g_signal_connect(G_OBJECT(streamer->sink_), "client-removed", G_CALLBACK(removed_cb), streamer); g_signal_connect(G_OBJECT(streamer->sink_), "client-fd-removed", G_CALLBACK(fdremoved_cb), streamer); streamer->thread_ = NULL; return 0; } static void add_fd(streamer_t* streamer, int fd) { log_printf(INFO, "adding fd %d to fdsink", fd); g_signal_emit_by_name(G_OBJECT(streamer->sink_), "add", fd, NULL); } /* static void remove_fd(streamer_t* streamer, int fd) { log_printf(INFO, "removing fd %d from fdsink", fd); g_signal_emit_by_name(G_OBJECT(streamer->sink_), "remove-flush", fd, NULL); } */ void client_thread_func(gpointer data, gpointer user_data) { streamer_t *streamer = (streamer_t*)user_data; int fd = *((int*)user_data); char buf; int nlcnt = 0; for(;;) { int len = recv(fd, &buf, 1, 0); if(len!=1) { return; } if(buf == '\n') nlcnt++; else if(buf != '\r') nlcnt = 0; if(nlcnt >= 2) break; } char* answer = "HTTP/1.0 200 OK\n\n"; int written = 0; for(;;) { int len = send(fd, &answer[written], strlen(answer) - written, 0); if(len < 0) { return; } written+=len; if(written >= strlen(answer)) break; } add_fd(streamer, fd); } static gpointer streamer_thread_func(gpointer data) { streamer_t *streamer = (streamer_t*)data; log_printf(NOTICE, "streamer thread started"); struct sockaddr_in remote_addr; memset (&remote_addr, 0, sizeof(remote_addr)); for(;;) { socklen_t alen=sizeof(remote_addr); int new_client = accept(streamer->fd_, (struct sockaddr *)&remote_addr, &alen); if(new_client==-1) { log_printf(ERROR, "accept() call failed"); break; } log_printf(INFO, "streamer: new connection %s:%d (fd=%d)", inet_ntoa(remote_addr.sin_addr), ntohs(remote_addr.sin_port), new_client); int *client_fd = malloc(sizeof(int)); if (!new_client) { log_printf(ERROR, "streamer: memory error"); break; } *client_fd = new_client; g_thread_pool_push(streamer->client_pool_, client_fd, NULL); } log_printf(NOTICE, "streamer thread stopped"); g_main_loop_quit(streamer->loop_); return NULL; } int streamer_start(streamer_t* streamer) { if(!streamer) return -1; streamer->thread_ = g_thread_new("streamer", streamer_thread_func, streamer); if(!streamer->thread_) { log_printf(ERROR, "streamer thread could not be started"); return -1; } streamer->client_pool_ = g_thread_pool_new (client_thread_func, streamer, 8, TRUE, NULL); if(!streamer->client_pool_) { log_printf(ERROR, "streamer thread could not be started"); return -1; } return 0; } void streamer_stop(streamer_t* streamer) { if(!streamer) return; shutdown(streamer->fd_, SHUT_RDWR); g_thread_pool_free(streamer->client_pool_, TRUE, TRUE); if(streamer->thread_) { log_printf(NOTICE, "waiting for streamer thread to stop"); g_thread_join(streamer->thread_); } close(streamer->fd_); }