/* * sydra * * sydra is a toolbox which allows you to set up multiple bidirectional * Video/Audio streams from external locations. * sydra has been written to be used for the Elevate Festival in Graz * Austria in order to involve external locations to present themselves * at the festival. * * * Copyright (C) 2014 Christian Pointner * * This file is part of sydra. * * sydra is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * sydra is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with sydra. If not, see . * * In addition, as a special exception, the copyright holders hereby * grant permission for non-GPL-compatible GStreamer plugins to be used * and distributed together with GStreamer and sydra. * This permission goes above and beyond the permissions granted by the * GPL license sydra is covered by. */ #include "datatypes.h" #include #include #include #include #include #include "utils.h" #include "log.h" #define UDP_PROTO_MAGIC "SYDRA:" #define UDP_PROTO_MAGIC_LEN (sizeof(UDP_PROTO_MAGIC)-1) #define UDP_PROTO_CMD_ADD_CLIENT "add_client\n" #define UDP_PROTO_CMD_ADD_CLIENT_LEN (sizeof(UDP_PROTO_CMD_ADD_CLIENT)-1) #define UDP_PROTO_CMD_RM_CLIENT "remove_client\n" #define UDP_PROTO_CMD_RM_CLIENT_LEN (sizeof(UDP_PROTO_CMD_RM_CLIENT)-1) static struct udp_client* udp_client_new(struct sockaddr_storage *addr, socklen_t addrlen) { struct udp_client* client = g_new(struct udp_client, 1); if(!client) return NULL; switch(addr->ss_family) { case AF_INET: client->host_.type_ = IPv4; client->host_.port_ = ntohs(((struct sockaddr_in*)addr)->sin_port); break; case AF_INET6: client->host_.type_ = IPv6; client->host_.port_ = ntohs(((struct sockaddr_in6*)addr)->sin6_port); break; default: g_free(client); return NULL; } int errcode = getnameinfo((struct sockaddr *)addr, addrlen, client->host_.addr_, sizeof(client->host_.addr_), NULL, 0, NI_NUMERICHOST | NI_NUMERICSERV); if (errcode != 0) { log_printf(WARNING, "getnameinfo() error: %s", gai_strerror(errcode)); g_free(client); return NULL; } return client; } static void udpsink_add_client(struct sockaddr_storage *addr, socklen_t addrlen, struct udp_sink* sink) { struct udp_client* client = udp_client_new(addr, addrlen); if(!client) return; GTimeVal now; g_get_current_time(&now); client->last_seen_ = GST_TIMEVAL_TO_TIME(now); GList* entry = g_list_find_custom(sink->clients_, client, cmp_udp_client); gchar* name = gst_element_get_name(sink->udp_); if(!entry) { log_printf(INFO, "adding host %s%c%d to list of receivers for element %s", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name); g_signal_emit_by_name(G_OBJECT(sink->udp_), "add", client->host_.addr_, client->host_.port_, NULL); sink->clients_ = g_list_append(sink->clients_, client); } else { log_printf(DEBUG, "not adding host %s%c%d to list of receivers for element %s since it already exists", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name); ((struct udp_client*)(entry->data))->last_seen_ = client->last_seen_; g_free(client); } g_free(name); } static void udpsink_remove_client(struct sockaddr_storage *addr, socklen_t addrlen, struct udp_sink* sink) { struct udp_client* client = udp_client_new(addr, addrlen); if(!client) return; GList* removee = g_list_find_custom(sink->clients_, client, cmp_udp_client); gchar* name = gst_element_get_name(sink->udp_); if(!removee) { log_printf(WARNING, "client %s%c%d not found in client list for %s - nothing removed", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name, name); } else { g_signal_emit_by_name(G_OBJECT(sink->udp_), "remove", client->host_.addr_, client->host_.port_, NULL); log_printf(INFO, "client %s%c%d removed from list of receivers for element %s (request)", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name, name); gpointer rdata = removee->data; sink->clients_ = g_list_remove(sink->clients_, removee->data); g_free(rdata); } g_free(name); g_free(client); } static gboolean on_udpsink_ready(gint fd, GIOCondition cond, gpointer user_data) { struct udp_sink* sink = (struct udp_sink*)user_data; if(!sink || !(sink->udp_)) { log_printf(WARNING, "File descriptor %d is ready for reading but the supplied element is NULL - removing callback", fd); return FALSE; } struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); u_int8_t buf[2048]; ssize_t bytes = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&addr, &addrlen); if(bytes < 1) { if(errno == EINTR) return TRUE; log_printf(WARNING, "Error while receiving UDP data on fd %d, will remove callback", fd); return FALSE; } if(bytes < UDP_PROTO_MAGIC_LEN || memcmp(UDP_PROTO_MAGIC, buf, UDP_PROTO_MAGIC_LEN)) { log_printf(DEBUG, "client discovery: ignoring invalid incoming packet"); return TRUE; } u_int8_t* cmd = &(buf[UDP_PROTO_MAGIC_LEN]); if(bytes == (UDP_PROTO_MAGIC_LEN+UDP_PROTO_CMD_ADD_CLIENT_LEN) && !memcmp(UDP_PROTO_CMD_ADD_CLIENT, cmd, UDP_PROTO_CMD_ADD_CLIENT_LEN)) { udpsink_add_client(&addr, addrlen, sink); } else if(bytes == (UDP_PROTO_MAGIC_LEN+UDP_PROTO_CMD_RM_CLIENT_LEN) && !memcmp(UDP_PROTO_CMD_RM_CLIENT, cmd, UDP_PROTO_CMD_RM_CLIENT_LEN)) { udpsink_remove_client(&addr, addrlen, sink); } else { log_printf(DEBUG, "client discovery: ignoring invalid command"); } return TRUE; } static void check_client_timeout(struct udp_sink* sink, int timeout) { GTimeVal now; g_get_current_time(&now); GstClockTime timeout_time = GST_TIMEVAL_TO_TIME(now) - (timeout * GST_SECOND); GList *c = sink->clients_; while(c != NULL) { GList *next = c->next; struct udp_client* client = (struct udp_client*)c->data; if(client->last_seen_ < timeout_time) { g_signal_emit_by_name(G_OBJECT(sink->udp_), "remove", client->host_.addr_, client->host_.port_, NULL); gchar* name = gst_element_get_name(sink->udp_); log_printf(INFO, "client %s%c%d removed from list of receivers for element %s (timeout)", client->host_.addr_, client->host_.type_ == IPv4 ? ':' : '.', client->host_.port_, name, name); g_free(name); g_free(c->data); sink->clients_ = g_list_delete_link(sink->clients_, c); } c = next; } } static gboolean check_all_client_timeouts(gpointer user_data) { struct udp_sinks* sinks = (struct udp_sinks*)user_data; check_client_timeout(&(sinks->rtp_video_), sinks->client_timeout_); check_client_timeout(&(sinks->rtcp_video_), sinks->client_timeout_); check_client_timeout(&(sinks->rtp_audio_), sinks->client_timeout_); check_client_timeout(&(sinks->rtcp_audio_), sinks->client_timeout_); return TRUE; } static gboolean attach_udpsink(struct udp_sink* sink, const char* name, const char* prop) { GSocket *sock; g_object_get(G_OBJECT(sink->udp_), prop, &sock, NULL); int fd = g_socket_get_fd(sock); guint id = g_unix_fd_add(fd, G_IO_IN, on_udpsink_ready, sink); if(id <= 0) { log_printf(ERROR, "Error while adding %s file descriptor (%d) to main loop", name, fd); return FALSE; } log_printf(DEBUG, "successfully installed callback for %s (fd: %d) for reading (id: %d)", name, fd, id); return TRUE; } gboolean attach_udpsinks(struct udp_sinks *sinks) { if(!sinks) return FALSE; log_printf(NOTICE, "enabling automatic client handling"); if(!attach_udpsink(&(sinks->rtp_video_), "RTP(video) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtp_video_), "RTP(video) IPv6", "used-socket-v6") || !attach_udpsink(&(sinks->rtcp_video_), "RTCP(video) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtcp_video_), "RTCP(video) IPv6", "used-socket-v6") || !attach_udpsink(&(sinks->rtp_audio_), "RTP(audio) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtp_audio_), "RTP(audio) IPv6", "used-socket-v6") || !attach_udpsink(&(sinks->rtcp_audio_), "RTCP(audio) IPv4", "used-socket") || !attach_udpsink(&(sinks->rtcp_audio_), "RTCP(audio) IPv6", "used-socket-v6")) { return FALSE; } if(sinks->client_timeout_ > 0) { if(!g_timeout_add_seconds(1, check_all_client_timeouts, sinks)) return FALSE; } return TRUE; } static gboolean send_udpsrc_buf(GstElement* source, const char* name, GInetAddress* remote, guint port, u_int8_t* buf, size_t buflen) { GSocket *sock; g_object_get(G_OBJECT(source), "used-socket", &sock, NULL); int fd = g_socket_get_fd(sock); struct sockaddr_storage addr; socklen_t addrlen = sizeof(struct sockaddr_storage); memset(&addr, 0, addrlen); switch(g_inet_address_get_family(remote)) { case G_SOCKET_FAMILY_IPV4: { addrlen = sizeof(struct sockaddr_in); size_t len = g_inet_address_get_native_size(remote); if(len > sizeof(struct in_addr)) len = sizeof(struct in_addr); memcpy(&(((struct sockaddr_in*)(&addr))->sin_addr), g_inet_address_to_bytes(remote), len); ((struct sockaddr_in*)&addr)->sin_port = htons(port); break; } case G_SOCKET_FAMILY_IPV6: { addrlen = sizeof(struct sockaddr_in6); size_t len = g_inet_address_get_native_size(remote); if(len > sizeof(struct in6_addr)) len = sizeof(struct in6_addr); memcpy(&(((struct sockaddr_in6*)&addr)->sin6_addr), g_inet_address_to_bytes(remote), len); ((struct sockaddr_in6*)&addr)->sin6_port = htons(port); break; } default: log_printf(ERROR, "remote address has unsupported protocol family - disabling keep alives"); return FALSE; } log_printf(DEBUG, "sending cmd out on to %s %d - %s (fd=%d)", g_inet_address_to_string(remote), port, name, fd); sendto(fd, buf, buflen, 0, (struct sockaddr *)&(addr), addrlen); return TRUE; } static void send_remove(GstElement* source, const char* name, GInetAddress* remote, guint port) { if(!remote) return; u_int8_t buf[] = UDP_PROTO_MAGIC UDP_PROTO_CMD_RM_CLIENT; size_t buflen = UDP_PROTO_MAGIC_LEN+UDP_PROTO_CMD_RM_CLIENT_LEN; send_udpsrc_buf(source, name, remote, port, buf, buflen); } static gboolean send_keepalive(GstElement* source, const char* name, GInetAddress* remote, guint port) { if(!remote) { log_printf(WARNING, "keepalives are enabled but no remote address is set... disabling keep alives"); return FALSE; } u_int8_t buf[] = UDP_PROTO_MAGIC UDP_PROTO_CMD_ADD_CLIENT; size_t buflen = UDP_PROTO_MAGIC_LEN+UDP_PROTO_CMD_ADD_CLIENT_LEN; return send_udpsrc_buf(source, name, remote, port, buf, buflen); } static gboolean send_keepalives(gpointer user_data) { struct udp_sources* sources = (struct udp_sources*)user_data; guint port = sources->port_base_; if(!send_keepalive(sources->rtp_video_, "RTP(video)", sources->remote_addr_, port++) || !send_keepalive(sources->rtcp_video_, "RTCP(video)", sources->remote_addr_, port++) || !send_keepalive(sources->rtp_audio_, "RTP(audio)", sources->remote_addr_, port++) || !send_keepalive(sources->rtcp_audio_, "RTCP(audio)", sources->remote_addr_, port++)) { return FALSE; } return TRUE; } void on_rtphost_resolved(GObject* resolver, GAsyncResult *res, gpointer user_data) { struct udp_sources* sources = (struct udp_sources*)user_data; GError *error = NULL; GList* addrs = g_resolver_lookup_by_name_finish ((GResolver *)resolver, res, &error); if(!addrs) { log_printf(ERROR, "Resolving of remote RTP host failed: %s", error ? error->message : "unknown error"); g_error_free(error); return; } GList *entry; guint idx; log_printf(DEBUG, "Resolving of remote RTP host succeded:"); for (entry = addrs, idx = 1; entry != NULL; entry = entry->next, ++idx) { GInetAddress* addr = (GInetAddress*)entry->data; if(!sources->remote_addr_) { sources->remote_addr_ = g_inet_address_new_from_bytes(g_inet_address_to_bytes(addr), g_inet_address_get_family(addr)); log_printf(DEBUG, " result %d: %s (selected)", idx, g_inet_address_to_string(addr)); } else { log_printf(DEBUG, " result %d: %s", idx, g_inet_address_to_string(addr)); } } g_resolver_free_addresses(addrs); log_printf(INFO, "Set address for remote RTP host: %s", g_inet_address_to_string(sources->remote_addr_)); if(send_keepalives(sources)) g_timeout_add_seconds(sources->keepalive_interval_, send_keepalives, sources); } gboolean attach_udpsources(struct udp_sources *sources, gchar* rtphost) { if(!sources) return FALSE; if(sources->keepalive_interval_ > 0 && rtphost) { GResolver* resolver = g_resolver_get_default(); g_resolver_lookup_by_name_async(resolver, rtphost, NULL, on_rtphost_resolved, sources); } return TRUE; } void detach_udpsources(struct udp_sources *sources) { guint port = sources->port_base_; send_remove(sources->rtp_video_, "RTP(video)", sources->remote_addr_, port++); send_remove(sources->rtcp_video_, "RTCP(video)", sources->remote_addr_, port++); send_remove(sources->rtp_audio_, "RTP(audio)", sources->remote_addr_, port++); send_remove(sources->rtcp_audio_, "RTCP(audio)", sources->remote_addr_, port++); }