/* * sydra * * sydra is a toolbox which allows you to set up multiple bidirectional * Video/Audio streams from external locations. * sydra has been written to be used for the Elevate Festival in Graz * Austria in order to involve external locations to present themselves * at the festival. * Sydra is based on GStreamer and is written in C. * * * Copyright (C) 2014 Christian Pointner * * This file is part of sydra. * * sydra is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * sydra is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with sydra. If not, see . * * In addition, as a special exception, the copyright holders hereby * grant permission for non-GPL-compatible GStreamer plugins to be used * and distributed together with GStreamer and sydra. * This permission goes above and beyond the permissions granted by the * GPL license sydra is covered by. */ #include "datatypes.h" #include #include #include #include #include #include "utils.h" #include "log.h" #define UDP_PROTO_MAGIC "SYDRA:" #define UDP_PROTO_MAGIC_LEN (sizeof(UDP_PROTO_MAGIC)-1) #define UDP_PROTO_CMD_ADD_CLIENT "add_client\n" #define UDP_PROTO_CMD_ADD_CLIENT_LEN (sizeof(UDP_PROTO_CMD_ADD_CLIENT)-1) #define UDP_PROTO_CMD_RM_CLIENT "remove_client\n" #define UDP_PROTO_CMD_RM_CLIENT_LEN (sizeof(UDP_PROTO_CMD_RM_CLIENT)-1) static struct udp_client* udp_client_new(struct sockaddr_storage *addr, socklen_t addrlen) { struct udp_client* client = g_new(struct udp_client, 1); if(!client) return NULL; switch(addr->ss_family) { case AF_INET: client->host_.type_ = IPv4; client->host_.port_ = ntohs(((struct sockaddr_in*)addr)->sin_port); break; case AF_INET6: client->host_.type_ = IPv6; client->host_.port_ = ntohs(((struct sockaddr_in6*)addr)->sin6_port); break; default: g_free(client); return NULL; } int errcode = getnameinfo((struct sockaddr *)addr, addrlen, client->host_.addr_, sizeof(client->host_.addr_), NULL, 0, NI_NUMERICHOST | NI_NUMERICSERV); if (errcode != 0) { log_printf(WARNING, "getnameinfo() error: %s", gai_strerror(errcode)); g_free(client); return NULL; } return client; } static void udpsink_add_client(struct sockaddr_storage *addr, socklen_t addrlen, struct udp_sink* sink) { struct udp_client* client = udp_client_new(addr, addrlen); if(!client) return; GTimeVal now; g_get_current_time(&now); client->last_seen_ = GST_TIMEVAL_TO_TIME(now); GList* entry = g_list_find_custom(sink->clients_, client, cmp_udp_client); gchar* name = gst_element_get_name(sink->udp_); if(!entry) { log_printf(INFO, "adding host %s%c%d to list of receivers for element %s", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name); g_signal_emit_by_name(G_OBJECT(sink->udp_), "add", client->host_.addr_, client->host_.port_, NULL); sink->clients_ = g_list_append(sink->clients_, client); } else { log_printf(DEBUG, "not adding host %s%c%d to list of receivers for element %s since it already exists", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name); ((struct udp_client*)(entry->data))->last_seen_ = client->last_seen_; g_free(client); } g_free(name); } static void udpsink_remove_client(struct sockaddr_storage *addr, socklen_t addrlen, struct udp_sink* sink) { struct udp_client* client = udp_client_new(addr, addrlen); if(!client) return; GList* removee = g_list_find_custom(sink->clients_, client, cmp_udp_client); gchar* name = gst_element_get_name(sink->udp_); if(!removee) { log_printf(WARNING, "client %s%c%d not found in client list for %s - nothing removed", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name, name); } else { g_signal_emit_by_name(G_OBJECT(sink->udp_), "remove", client->host_.addr_, client->host_.port_, NULL); log_printf(INFO, "client %s%c%d removed from list of receivers for element %s (request)", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name, name); gpointer rdata = removee->data; sink->clients_ = g_list_remove(sink->clients_, removee->data); g_free(rdata); } g_free(name); g_free(client); } static gboolean on_udp_desc_ready(gint fd, GIOCondition cond, gpointer user_data) { struct udp_sink* sink = (struct udp_sink*)user_data; if(!sink || !(sink->udp_)) { log_printf(WARNING, "File descriptor %d is ready for reading - but supplied element is NULL removing callback", fd); return FALSE; } struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); u_int8_t buf[2048]; ssize_t bytes = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&addr, &addrlen); if(bytes < 1) { if(errno == EINTR) return TRUE; log_printf(WARNING, "Error while receiving UDP data on fd %d, will remove callback", fd); return FALSE; } if(bytes < UDP_PROTO_MAGIC_LEN || memcmp(UDP_PROTO_MAGIC, buf, UDP_PROTO_MAGIC_LEN)) { log_printf(DEBUG, "client discovery: ignoring invalid incoming packet"); return TRUE; } u_int8_t* cmd = &(buf[UDP_PROTO_MAGIC_LEN]); if(bytes == (UDP_PROTO_MAGIC_LEN+UDP_PROTO_CMD_ADD_CLIENT_LEN) && !memcmp(UDP_PROTO_CMD_ADD_CLIENT, cmd, UDP_PROTO_CMD_ADD_CLIENT_LEN)) { udpsink_add_client(&addr, addrlen, sink); } else if(bytes == (UDP_PROTO_MAGIC_LEN+UDP_PROTO_CMD_RM_CLIENT_LEN) && !memcmp(UDP_PROTO_CMD_RM_CLIENT, cmd, UDP_PROTO_CMD_RM_CLIENT_LEN)) { udpsink_remove_client(&addr, addrlen, sink); } else { log_printf(DEBUG, "client discovery: ignoring invalid command"); } return TRUE; } static void check_client_timeout(struct udp_sink* sink, int timeout) { GTimeVal now; g_get_current_time(&now); GstClockTime timeout_time = GST_TIMEVAL_TO_TIME(now) - (timeout * GST_SECOND); GList *c = sink->clients_; while(c != NULL) { GList *next = c->next; struct udp_client* client = (struct udp_client*)c->data; if(client->last_seen_ < timeout_time) { g_signal_emit_by_name(G_OBJECT(sink->udp_), "remove", client->host_.addr_, client->host_.port_, NULL); gchar* name = gst_element_get_name(sink->udp_); log_printf(INFO, "client %s%c%d removed from list of receivers for element %s (timeout)", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name, name); g_free(name); g_free(c->data); sink->clients_ = g_list_delete_link(sink->clients_, c); } c = next; } } static gboolean check_all_client_timeouts(gpointer user_data) { struct udp_sinks* sinks = (struct udp_sinks*)user_data; check_client_timeout(&(sinks->rtp_video_), sinks->client_timeout_); check_client_timeout(&(sinks->rtcp_video_), sinks->client_timeout_); check_client_timeout(&(sinks->rtp_audio_), sinks->client_timeout_); check_client_timeout(&(sinks->rtcp_audio_), sinks->client_timeout_); return TRUE; } static gboolean attach_udpsink(struct udp_sink* sink, const char* name, const char* prop) { GSocket *sock; g_object_get(G_OBJECT(sink->udp_), prop, &sock, NULL); int fd = g_socket_get_fd(sock); guint id = g_unix_fd_add(fd, G_IO_IN, on_udp_desc_ready, sink); if(id <= 0) { log_printf(ERROR, "Error while adding %s file descriptor (%d) to main loop", name, fd); return FALSE; } log_printf(DEBUG, "successfully installed callback for %s (fd: %d) for reading (id: %d)", name, fd, id); return TRUE; } gboolean attach_udpsinks(struct udp_sinks *sinks) { if(!sinks) return FALSE; if(!attach_udpsink(&(sinks->rtp_video_), "RTP(video) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtp_video_), "RTP(video) IPv6", "used-socket-v6") || !attach_udpsink(&(sinks->rtcp_video_), "RTCP(video) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtcp_video_), "RTCP(video) IPv6", "used-socket-v6") || !attach_udpsink(&(sinks->rtp_audio_), "RTP(audio) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtp_audio_), "RTP(audio) IPv6", "used-socket-v6") || !attach_udpsink(&(sinks->rtcp_audio_), "RTCP(audio) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtcp_audio_), "RTCP(audio) IPv6", "used-socket-v6")) { return FALSE; } if(sinks->client_timeout_ > 0) { if(!g_timeout_add_seconds(1, check_all_client_timeouts, sinks)) return FALSE; } return TRUE; } static gboolean send_keepalives(gpointer user_data) { log_printf(WARNING, "sending keep alives is not yet supporet!"); return TRUE; } gboolean attach_udpsources(struct udp_sources *sources) { if(!sources) return FALSE; if(sources->keepalive_interval_ > 0) { send_keepalives(sources); if(!g_timeout_add_seconds(sources->keepalive_interval_, send_keepalives, sources)) return FALSE; } return TRUE; }