Begin adding full support for GOOG-REMB\nFix iOS 'broken pipe' issue when UDP sockets are invalidated by the OS

This commit is contained in:
bossiel 2015-08-12 20:36:14 +00:00
parent 295de8efda
commit 7908865936
23 changed files with 1665 additions and 1306 deletions

View File

@ -58,7 +58,15 @@ typedef struct tdav_session_av_s
tmedia_srtp_type_t srtp_type;
tmedia_srtp_mode_t srtp_mode;
struct {
uint64_t count_last_time;
uint64_t count;
} bytes_in;
struct {
uint64_t count_last_time;
uint64_t count;
} bytes_out;
uint64_t time_last_frame_loss_report; // from jb
int32_t bandwidth_max_upload_kbps;
int32_t bandwidth_max_download_kbps;
int32_t fps;

View File

@ -164,6 +164,8 @@ static int tdav_codec_vp8_set(tmedia_codec_t* self, const tmedia_param_t* param)
else {
TMEDIA_CODEC(vp8)->bandwidth_max_upload = max_bw_new;
}
vp8->encoder.cfg.rc_target_bitrate = TSK_CLAMP(0, vp8->encoder.cfg.rc_target_bitrate, TMEDIA_CODEC(vp8)->bandwidth_max_upload);
TSK_DEBUG_INFO("New target bitrate = %d kbps", vp8->encoder.cfg.rc_target_bitrate);
reconf = tsk_true;
}
else if (tsk_striequals(param->key, "bandwidth-max-upload")) {

View File

@ -765,6 +765,9 @@ int tdav_session_av_stop(tdav_session_av_t* self)
}
}
self->bytes_in.count_last_time = self->bytes_out.count_last_time = 0;
self->bytes_in.count = self->bytes_out.count = 0;
return ret;
}

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,5 @@
/*
* Copyright (C) 2010-2011 Mamadou Diop.
*
* Contact: Mamadou Diop <diopmamadou(at)doubango.org>
* Copyright (C) 2010-2015 Mamadou DIOP
*
* This file is part of Open Source Doubango Framework.
*
@ -25,8 +23,5 @@
* http://tools.ietf.org/html/draft-ietf-mmusic-ice-19
* http://tools.ietf.org/html/draft-ietf-mmusic-ice-tcp-08
*
* @author Mamadou Diop <diopmamadou(at)doubango.org>
*
*/
#include "tnet_ice.h"

View File

@ -1,7 +1,5 @@
/*
* Copyright (C) 2010-2011 Mamadou Diop.
*
* Contact: Mamadou Diop <diopmamadou(at)doubango.org>
* Copyright (C) 2010-2015 Mamadou DIOP
*
* This file is part of Open Source Doubango Framework.
*
@ -25,9 +23,6 @@
* http://tools.ietf.org/html/draft-ietf-mmusic-ice-19
* http://tools.ietf.org/html/draft-ietf-mmusic-ice-tcp-08
*
* @author Mamadou Diop <diopmamadou(at)doubango.org>
*
*/
#ifndef TNET_ICE_H
#define TNET_ICE_H

View File

@ -997,6 +997,26 @@ int tnet_ice_ctx_send_turn_rtcp(struct tnet_ice_ctx_s* self, const void* data, t
: _tnet_ice_ctx_send_turn_raw(self, self->turn.ss_nominated_rtcp, self->turn.peer_id_rtcp, data, size);
}
int tnet_ice_ctx_turn_get_bytes_count(const struct tnet_ice_ctx_s* self, uint64_t* bytes_in, uint64_t* bytes_out)
{
int ret;
if (!self) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
ret = tnet_turn_session_get_bytes_count(self->turn.ss_nominated_rtp, bytes_in, bytes_out);
if (ret == 0 && !self->use_rtcpmux) {
uint64_t _bytes_in, _bytes_out;
ret = tnet_turn_session_get_bytes_count(self->turn.ss_nominated_rtcp, &_bytes_in, &_bytes_out);
if (ret == 0) {
if (bytes_in) *bytes_in += _bytes_in;
if (bytes_out) *bytes_out += _bytes_out;
}
}
return ret;
}
const char* tnet_ice_ctx_get_ufrag(const struct tnet_ice_ctx_s* self)
{
return (self && self->ufrag) ? self->ufrag : tsk_null;

View File

@ -1,7 +1,5 @@
/*
* Copyright (C) 2012 Doubango Telecom <http://www.doubango.org>.
*
* Contact: Mamadou Diop <diopmamadou(at)doubango[dot]org>
* Copyright (C) 2012-2015 Doubango Telecom <http://www.doubango.org>.
*
* This file is part of Open Source Doubango Framework.
*
@ -22,7 +20,6 @@
/**@file tnet_ice_ctx.h
* @brief Interactive Connectivity Establishment (ICE) implementation as per RFC 5245.
* @author Mamadou Diop <diopmamadou(at)doubango[dot]org>
*/
#ifndef TNET_ICE_CTX_H
@ -100,6 +97,7 @@ TINYNET_API int tnet_ice_ctx_recv_stun_message(struct tnet_ice_ctx_s* self, cons
TINYNET_API int tnet_ice_ctx_send_turn_rtp(struct tnet_ice_ctx_s* self, const void* data, tsk_size_t size);
TINYNET_API int tnet_ice_ctx_send_turn_rtcp(struct tnet_ice_ctx_s* self, const void* data, tsk_size_t size);
TINYNET_API int tnet_ice_ctx_turn_get_bytes_count(const struct tnet_ice_ctx_s* self, uint64_t* bytes_in, uint64_t* bytes_out);
TINYNET_API const char* tnet_ice_ctx_get_ufrag(const struct tnet_ice_ctx_s* self);
TINYNET_API const char* tnet_ice_ctx_get_pwd(const struct tnet_ice_ctx_s* self);

View File

@ -251,6 +251,42 @@ int tnet_socket_send_stream(tnet_socket_t* self, const void* data, tsk_size_t si
return (int)tnet_sockfd_send(self->fd, data, size, 0);
}
/**@ingroup tnet_socket_group
* @retval Zero if succeed and nonzero error code otherwise.
*/
int tnet_socket_handle_brokenpipe(tnet_socket_t* self)
{
int ret;
tnet_fd_t fd_old, fd_new;
if (!self || !TNET_SOCKET_TYPE_IS_DGRAM(self->type)) { // Must be UDP
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
fd_old = self->fd;
fd_new = TNET_INVALID_FD;
// close old fd
ret = tnet_sockfd_close(&self->fd);
// try to create an fd binding to the same address
if ((ret = tnet_sockfd_init(self->ip, self->port, self->type, &fd_new)) != 0) {
TNET_PRINT_LAST_ERROR("Find to bind to %s:%d", self->ip, self->port);
// TODO: Create completly new socket?
return ret;
}
#if TNET_UNDER_IPHONE || TNET_UNDER_IPHONE_SIMULATOR
/* disable SIGPIPE signal */
{
int yes = 1;
if (setsockopt(fd_new, SOL_SOCKET, SO_NOSIGPIPE, (char*)&yes, sizeof(int))){
TNET_PRINT_LAST_ERROR("setsockopt(%d, SO_NOSIGPIPE) have failed", fd_new);
}
}
#endif /* TNET_UNDER_IPHONE || TNET_UNDER_IPHONE_SIMULATOR */
TSK_DEBUG_INFO("Broken pipe result for {%s:%d}: %d -> %d", self->ip, self->port, fd_old, fd_new);
self->fd = fd_new;
return 0;
}
/**@ingroup tnet_socket_group
* Closes a socket.
* @param sock The socket to close.

View File

@ -190,6 +190,7 @@ typedef tsk_list_t tnet_sockets_L_t; /**< List of @ref tnet_socket_t elements. *
TINYNET_API tnet_socket_t* tnet_socket_create_2(const char*host, tnet_port_t port, tnet_socket_type_t type, tsk_bool_t nonblocking, tsk_bool_t bindsocket);
TINYNET_API tnet_socket_t* tnet_socket_create(const char* host, tnet_port_t port, tnet_socket_type_t type);
TINYNET_API int tnet_socket_send_stream(tnet_socket_t* self, const void* data, tsk_size_t size);
TINYNET_API int tnet_socket_handle_brokenpipe(tnet_socket_t* self);
TINYNET_GEXTERN const tsk_object_def_t *tnet_socket_def_t;

View File

@ -723,6 +723,17 @@ tnet_fd_t tnet_transport_get_master_fd(const tnet_transport_handle_t *handle)
return ((const tnet_transport_t *)handle)->master ? ((const tnet_transport_t *)handle)->master->fd : TNET_INVALID_FD;
}
int tnet_transport_get_bytes_count(const tnet_transport_handle_t *handle, uint64_t* bytes_in, uint64_t* bytes_out)
{
if (!handle){
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
if (bytes_in) *bytes_in = ((const tnet_transport_t *)handle)->bytes_in;
if (bytes_out) *bytes_out = ((const tnet_transport_t *)handle)->bytes_out;
return 0;
}
/**
* Connects a socket.
* @param handle The transport to use to connect() the socket. The new socket will be managed by this transport.

View File

@ -41,8 +41,6 @@ TNET_BEGIN_DECLS
#define TNET_TRANSPORT_CB_F(callback) ((tnet_transport_cb_f)callback)
typedef void tnet_transport_handle_t;
typedef enum tnet_transport_event_type_e
{
event_data,
@ -51,6 +49,7 @@ typedef enum tnet_transport_event_type_e
event_removed,
event_connected,
event_accepted,
event_brokenpipe, // iOS: UDP sockets closed, to be restored now that the app is on foreground
event_dtls_handshake_started,
event_dtls_handshake_succeed,
@ -121,6 +120,7 @@ TINYNET_API int tnet_transport_dtls_get_handshakingdata(tnet_transport_handle_t*
TINYNET_API tnet_socket_type_t tnet_transport_get_type(const tnet_transport_handle_t *handle);
TINYNET_API tnet_fd_t tnet_transport_get_master_fd(const tnet_transport_handle_t *handle);
TINYNET_API int tnet_transport_get_bytes_count(const tnet_transport_handle_t *handle, uint64_t* bytes_in, uint64_t* bytes_out);
TINYNET_API int tnet_transport_shutdown(tnet_transport_handle_t* handle);
typedef struct tnet_transport_s
@ -137,6 +137,9 @@ typedef struct tnet_transport_s
tsk_object_t *context;
tsk_bool_t prepared;
uint64_t bytes_out;
uint64_t bytes_in;
//unsigned connected:1;
void* mainThreadId[1];

View File

@ -490,7 +490,7 @@ bail:
tsk_size_t tnet_transport_sendto(const tnet_transport_handle_t *handle, tnet_fd_t from, const struct sockaddr *to, const void* buf, tsk_size_t size)
{
tnet_transport_t *transport = (tnet_transport_t*)handle;
int numberOfBytesSent = 0;
int numberOfBytesSent = 0, ret;
if (!transport) {
TSK_DEBUG_ERROR("Invalid server handle");
@ -502,9 +502,17 @@ tsk_size_t tnet_transport_sendto(const tnet_transport_handle_t *handle, tnet_fd_
goto bail;
}
if ((numberOfBytesSent = (int)sendto(from, buf, size, 0, to, tnet_get_sockaddr_size(to))) < size) {
TNET_PRINT_LAST_ERROR("sendto have failed");
goto bail;
while (numberOfBytesSent < size && (ret = (int)sendto(from, buf, size, 0, to, tnet_get_sockaddr_size(to))) >= 0) {
numberOfBytesSent += ret;
}
if (numberOfBytesSent < size) {
if (tnet_geterrno() == TNET_ERROR_BROKENPIPE) {
TSK_DEBUG_INFO("UDP socket with fd=%d returned EPIPE...alerting the sender with 'event_brokenpipe' event", from);
TSK_RUNNABLE_ENQUEUE(transport, event_brokenpipe, transport->callback_data, from);
}
else {
TNET_PRINT_LAST_ERROR("sendto(fd=%d) have failed", from);
}
}
bail:

View File

@ -237,6 +237,7 @@ tsk_size_t tnet_transport_send(const tnet_transport_handle_t *handle, tnet_fd_t
}
bail:
transport->bytes_out += numberOfBytesSent;
return numberOfBytesSent;
}
@ -261,6 +262,7 @@ tsk_size_t tnet_transport_sendto(const tnet_transport_handle_t *handle, tnet_fd_
}
bail:
transport->bytes_out += numberOfBytesSent;
return numberOfBytesSent;
}
@ -811,6 +813,7 @@ void *tnet_transport_mainthread(void *param)
}
if(len > 0){
transport->bytes_in += len;
e = tnet_transport_event_create(event_data, transport->callback_data, active_socket->fd);
e->data = buffer, buffer = tsk_null;
e->size = len;

View File

@ -285,6 +285,7 @@ try_again:
}
bail:
transport->bytes_out += sent;
return sent;
}
@ -312,6 +313,7 @@ tsk_size_t tnet_transport_sendto(const tnet_transport_handle_t *handle, tnet_fd_
}
bail:
transport->bytes_out += numberOfBytesSent;
return numberOfBytesSent;
}
@ -739,6 +741,7 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
else
{
tnet_transport_event_t* e = tnet_transport_event_create(event_data, transport->callback_data, active_socket->fd);
transport->bytes_in += wsaBuffer.len;
e->data = wsaBuffer.buf;
e->size = wsaBuffer.len;
e->remote_addr = remote_addr;

View File

@ -77,6 +77,8 @@ typedef char tnet_ip_t[INET6_ADDRSTRLEN];
typedef uint8_t tnet_mac_address[6];
typedef unsigned char tnet_fingerprint_t[TNET_FINGERPRINT_MAX + 1];
typedef void tnet_transport_handle_t;
typedef tsk_list_t tnet_interfaces_L_t; /**< List of @ref tnet_interface_t elements*/
typedef tsk_list_t tnet_addresses_L_t; /**< List of @ref tnet_address_t elements*/
@ -132,6 +134,7 @@ static const char* TNET_DTLS_HASH_NAMES[TNET_DTLS_HASH_TYPE_MAX] =
# define TNET_ERROR_INTR WSAEINTR
# define TNET_ERROR_ISCONN WSAEISCONN
# define TNET_ERROR_EAGAIN TNET_ERROR_WOULDBLOCK /* WinSock FIX */
# define TNET_ERROR_BROKENPIPE WSAECONNABORTED
# if (TNET_UNDER_WINDOWS_RT || TNET_UNDER_WINDOWS_CE) /* gai_strerrorA() links against FormatMessageA which is not allowed on the store */
# if !defined (WC_ERR_INVALID_CHARS)
# define WC_ERR_INVALID_CHARS 0
@ -163,6 +166,7 @@ static const char* TNET_DTLS_HASH_NAMES[TNET_DTLS_HASH_TYPE_MAX] =
# define TNET_ERROR_INTR EINTR
# define TNET_ERROR_ISCONN EISCONN
# define TNET_ERROR_EAGAIN EAGAIN
# define TNET_ERROR_BROKENPIPE EPIPE
# define tnet_gai_strerror gai_strerror
#endif
#define TNET_INVALID_FD TNET_INVALID_SOCKET

View File

@ -1804,14 +1804,14 @@ int tnet_sockfd_sendto(tnet_fd_t fd, const struct sockaddr *to, const void* buf,
#endif
if (ret <= 0) {
if (tnet_geterrno() == TNET_ERROR_WOULDBLOCK) {
TSK_DEBUG_INFO("SendUdp() - WouldBlock. Retrying...");
TSK_DEBUG_INFO("SendUdp(fd=%d) - WouldBlock. Retrying...", fd);
if (try_guard--) {
tsk_thread_sleep(10);
goto try_again;
}
}
else {
TNET_PRINT_LAST_ERROR("sendto() failed");
TNET_PRINT_LAST_ERROR("sendto(fd=%d) failed", fd);
}
goto bail;
}

View File

@ -780,6 +780,15 @@ int tnet_turn_session_get_req_transport(const struct tnet_turn_session_s* pc_sel
return 0;
}
int tnet_turn_session_get_bytes_count(const struct tnet_turn_session_s* pc_self, uint64_t* bytes_in, uint64_t* bytes_out)
{
if (!pc_self) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
return tnet_transport_get_bytes_count(pc_self->p_transport, bytes_in, bytes_out);
}
int tnet_turn_session_createpermission(struct tnet_turn_session_s* p_self, const char* pc_peer_addr, uint16_t u_peer_port, tnet_turn_peer_id_t* pu_id)
{
int ret = 0;
@ -2022,6 +2031,22 @@ static int _tnet_turn_session_transport_layer_process_cb(const tnet_transport_ev
switch(e->type){
case event_data:
break;
case event_brokenpipe:
tsk_safeobj_lock(p_ss);
if (p_ss->p_lcl_sock && e->local_fd == p_ss->p_lcl_sock->fd) {
tnet_fd_t broken_fd = e->local_fd;
tsk_bool_t registered_fd = !!tnet_transport_have_socket(p_ss->p_transport, broken_fd);
if (registered_fd) {
tnet_transport_remove_socket(p_ss->p_transport, &broken_fd);
}
if (tnet_socket_handle_brokenpipe(p_ss->p_lcl_sock) == 0) {
if (registered_fd) {
tnet_transport_add_socket(p_ss->p_transport, p_ss->p_lcl_sock->fd, p_ss->p_lcl_sock->type, tsk_false/* do not take ownership */, tsk_true/* only Meaningful for tls*/, tsk_null);
}
}
}
tsk_safeobj_unlock(p_ss);
return 0;
case event_connected:
if (p_ss->p_lcl_sock && p_ss->p_lcl_sock->fd == e->local_fd) {
tsk_safeobj_lock(p_ss);

View File

@ -87,6 +87,7 @@ TINYNET_API int tnet_turn_session_get_socket_local(struct tnet_turn_session_s* p
TINYNET_API int tnet_turn_session_get_state_createperm(const struct tnet_turn_session_s* pc_self, tnet_turn_peer_id_t u_peer_id, enum tnet_stun_state_e *pe_state);
TINYNET_API int tnet_turn_session_get_state_connbind(const struct tnet_turn_session_s* pc_self, tnet_turn_peer_id_t u_peer_id, enum tnet_stun_state_e *pe_state);
TINYNET_API int tnet_turn_session_get_req_transport(const struct tnet_turn_session_s* pc_self, enum tnet_turn_transport_e *pe_transport);
TINYNET_API int tnet_turn_session_get_bytes_count(const struct tnet_turn_session_s* pc_self, uint64_t* bytes_in, uint64_t* bytes_out);
TINYNET_API int tnet_turn_session_createpermission(struct tnet_turn_session_s* p_self, const char* pc_peer_addr, uint16_t u_peer_port, tnet_turn_peer_id_t* pu_peer_id);
TINYNET_API int tnet_turn_session_deletepermission(struct tnet_turn_session_s* p_self, tnet_turn_peer_id_t u_peer_id);
TINYNET_API int tnet_turn_session_chanbind(struct tnet_turn_session_s* p_self, tnet_turn_peer_id_t u_peer_id);

View File

@ -41,6 +41,7 @@ TRTP_BEGIN_DECLS
struct trtp_rtcp_packet_s;
struct trtp_rtp_packet_s;
struct tnet_ice_ctx_s;
struct tnet_transport_s;
typedef int (*trtp_rtcp_cb_f)(const void* callback_data, const struct trtp_rtcp_packet_s* packet);
@ -60,6 +61,10 @@ int trtp_rtcp_session_signal_pkt_loss(struct trtp_rtcp_session_s* self, uint32_t
int trtp_rtcp_session_signal_frame_corrupted(struct trtp_rtcp_session_s* self, uint32_t ssrc_media);
int trtp_rtcp_session_signal_jb_error(struct trtp_rtcp_session_s* self, uint32_t ssrc_media);
tnet_fd_t trtp_rtcp_session_get_local_fd(const struct trtp_rtcp_session_s* self);
int trtp_rtcp_session_set_local_fd(struct trtp_rtcp_session_s* self, tnet_fd_t local_fd);
int trtp_rtcp_session_set_net_transport(struct trtp_rtcp_session_s* self, struct tnet_transport_s* transport);
TRTP_END_DECLS
#endif /* TINYMEDIA_RTCP_SESSION_H */

View File

@ -106,6 +106,7 @@ typedef struct trtp_manager_s
struct{
void* ptr;
tsk_size_t size;
tsk_size_t index;
} serial_buffer;
} rtp;
@ -211,6 +212,7 @@ TINYRTP_API int trtp_manager_set_proxy_info(trtp_manager_t* self, enum tnet_prox
TINYRTP_API int trtp_manager_start(trtp_manager_t* self);
TINYRTP_API tsk_size_t trtp_manager_send_rtp(trtp_manager_t* self, const void* data, tsk_size_t size, uint32_t duration, tsk_bool_t marker, tsk_bool_t last_packet);
TINYRTP_API tsk_size_t trtp_manager_send_rtp_packet(trtp_manager_t* self, const struct trtp_rtp_packet_s* packet, tsk_bool_t bypass_encrypt);
TINYRTP_API int trtp_manager_get_bytes_count(trtp_manager_t* self, uint64_t* bytes_in, uint64_t* bytes_out);
TINYRTP_API tsk_size_t trtp_manager_send_rtp_raw(trtp_manager_t* self, const void* data, tsk_size_t size);
TINYRTP_API int trtp_manager_set_app_bandwidth_max(trtp_manager_t* self, int32_t bw_upload_kbps, int32_t bw_download_kbps);
TINYRTP_API int trtp_manager_signal_pkt_loss(trtp_manager_t* self, uint32_t ssrc_media, const uint16_t* seq_nums, tsk_size_t count);

View File

@ -38,6 +38,7 @@
#include "ice/tnet_ice_ctx.h"
#include "turn/tnet_turn_session.h"
#include "tnet_transport.h"
#include "tnet_utils.h"
@ -268,8 +269,9 @@ typedef struct trtp_rtcp_session_s
tsk_bool_t is_started;
tnet_fd_t local_fd;
struct tnet_transport_s* transport; // not starter -> do not stop
const struct sockaddr * remote_addr;
struct tnet_ice_ctx_s* ice_ctx;
struct tnet_ice_ctx_s* ice_ctx; // not starter -> do not stop
tsk_bool_t is_ice_turn_active;
const void* callback_data;
@ -355,8 +357,9 @@ static tsk_object_t* trtp_rtcp_session_dtor(tsk_object_t * self)
TSK_OBJECT_SAFE_FREE(session->sources);
TSK_OBJECT_SAFE_FREE(session->source_local);
TSK_OBJECT_SAFE_FREE(session->sdes);
TSK_OBJECT_SAFE_FREE(session->ice_ctx);
TSK_OBJECT_SAFE_FREE(session->ice_ctx); // not starter -> do not stop
TSK_FREE(session->cname);
TSK_OBJECT_SAFE_FREE(session->transport); // not starter -> do not stop
// release the handle for the global timer manager
tsk_timer_mgr_global_unref(&session->timer.handle_global);
@ -765,6 +768,36 @@ int trtp_rtcp_session_signal_jb_error(struct trtp_rtcp_session_s* self, uint32_t
return trtp_rtcp_session_signal_frame_corrupted(self, ssrc_media);
}
tnet_fd_t trtp_rtcp_session_get_local_fd(const struct trtp_rtcp_session_s* self)
{
if (!self) {
TSK_DEBUG_ERROR("Invalid parameter");
return TNET_INVALID_FD;
}
return self->local_fd;
}
int trtp_rtcp_session_set_local_fd(struct trtp_rtcp_session_s* self, tnet_fd_t local_fd)
{
if (!self) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
self->local_fd = local_fd;
return 0;
}
int trtp_rtcp_session_set_net_transport(struct trtp_rtcp_session_s* self, struct tnet_transport_s* transport)
{
if (!self) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
TSK_OBJECT_SAFE_FREE(self->transport);
self->transport = tsk_object_ref(transport);
return 0;
}
static tsk_bool_t _trtp_rtcp_session_have_source(trtp_rtcp_session_t* self, uint32_t ssrc)
{
tsk_list_item_t* item;
@ -925,9 +958,9 @@ static tsk_size_t _trtp_rtcp_session_send_raw(trtp_rtcp_session_t* self, const v
ret = (tnet_ice_ctx_send_turn_rtcp(self->ice_ctx, data, size) == 0) ? size : 0; // returns #0 if ok
}
else {
if (tnet_sockfd_sendto(self->local_fd, self->remote_addr, data, size) == size){ // returns number of sent bytes
ret = size;
}
ret = self->transport
? tnet_transport_sendto(self->transport, self->local_fd, self->remote_addr, data, size)
: tnet_sockfd_sendto(self->local_fd, self->remote_addr, data, size);
}
return ret;
}

View File

@ -93,6 +93,38 @@ static int _trtp_transport_layer_cb(const tnet_transport_event_t* e)
{
return _trtp_manager_recv_data(manager, e->data, e->size, e->local_fd, &e->remote_addr);
}
case event_brokenpipe:
{
tsk_safeobj_lock(manager);
tnet_fd_t broken_fd = e->local_fd;
tnet_socket_t* socket = tsk_null;
tsk_bool_t is_rtcp_socket = tsk_false;
if (manager->transport && manager->transport->master && manager->transport->master->fd == broken_fd) {
socket = manager->transport->master;
}
else if (manager->rtcp.local_socket && manager->rtcp.local_socket->fd == broken_fd) {
socket = manager->rtcp.local_socket;
is_rtcp_socket = tsk_true;
}
if (socket) {
tsk_bool_t registered_fd = !!tnet_transport_have_socket(manager->transport, broken_fd);
if (registered_fd) {
tnet_transport_remove_socket(manager->transport, &broken_fd); // broken_fd=-1
broken_fd = e->local_fd; // restore
}
if (tnet_socket_handle_brokenpipe(socket) == 0) {
if (registered_fd) {
tnet_transport_add_socket(manager->transport, socket->fd, socket->type, tsk_false/* do not take ownership */, tsk_true/* only Meaningful for tls*/, tsk_null);
}
if (manager->rtcp.session && trtp_rtcp_session_get_local_fd(manager->rtcp.session) == broken_fd) {
trtp_rtcp_session_set_local_fd(manager->rtcp.session, socket->fd);
}
}
}
tsk_safeobj_unlock(manager);
return 0;
}
#if HAVE_SRTP
/* DTLS - SRTP events */
case event_dtls_handshake_succeed:
@ -457,8 +489,15 @@ static int _trtp_manager_recv_data(const trtp_manager_t* self, const uint8_t* da
err_status_t status;
if(self->srtp_ctx_neg_remote){
if((status = srtp_unprotect(self->srtp_ctx_neg_remote->rtp.session, (void*)data_ptr, (int*)&data_size)) != err_status_ok){
TSK_DEBUG_ERROR("srtp_unprotect(RTP) failed with error code=%d, seq_num=%u", (int)status, (data_size > 4 ? tnet_ntohs_2(&data_ptr[2]) : 0x0000));
return -1;
if (status == err_status_replay_fail) {
// replay (because of RTCP-NACK nothing to worry about)
TSK_DEBUG_INFO("srtp_unprotect(RTP) returned 'err_status_replay_fail'");
return 0;
}
else {
TSK_DEBUG_ERROR("srtp_unprotect(RTP) failed with error code=%d, seq_num=%u", (int)status, (data_size > 4 ? tnet_ntohs_2(&data_ptr[2]) : 0x0000));
return -1;
}
}
}
#endif
@ -1463,6 +1502,7 @@ int trtp_manager_start(trtp_manager_t* self)
if(self->rtcp.session){
ret = trtp_rtcp_session_set_callback(self->rtcp.session, self->rtcp.cb.fun, self->rtcp.cb.usrdata);
ret = trtp_rtcp_session_set_app_bandwidth_max(self->rtcp.session, self->app_bw_max_upload, self->app_bw_max_download);
ret = trtp_rtcp_session_set_net_transport(self->rtcp.session, self->transport);
if((ret = trtp_rtcp_session_start(self->rtcp.session, local_rtcp_fd, (const struct sockaddr *)&self->rtcp.remote_addr))){
TSK_DEBUG_ERROR("Failed to start RTCP session");
goto bail;
@ -1568,6 +1608,9 @@ tsk_size_t trtp_manager_send_rtp_packet(trtp_manager_t* self, const struct trtp_
}
tsk_safeobj_lock(self);
// reset index
self->rtp.serial_buffer.index = 0;
/* check if transport is started */
if(!self->is_started || !self->transport || !self->transport->master){
@ -1598,7 +1641,7 @@ tsk_size_t trtp_manager_send_rtp_packet(trtp_manager_t* self, const struct trtp_
}
/* serialize and send over the network */
if((ret = (int)trtp_rtp_packet_serialize_to(packet, self->rtp.serial_buffer.ptr, xsize))){
if ((ret = (int)trtp_rtp_packet_serialize_to(packet, self->rtp.serial_buffer.ptr, xsize))) {
void* data_ptr = self->rtp.serial_buffer.ptr;
int data_size = ret;
#if HAVE_SRTP
@ -1610,6 +1653,7 @@ tsk_size_t trtp_manager_send_rtp_packet(trtp_manager_t* self, const struct trtp_
}
}
#endif
self->rtp.serial_buffer.index = data_size; // update index
if (/* number of bytes sent */(ret = (int)trtp_manager_send_rtp_raw(self, data_ptr, data_size)) > 0) {
// forward packet to the RTCP session
if (self->rtcp.session) {
@ -1642,12 +1686,35 @@ tsk_size_t trtp_manager_send_rtp_raw(trtp_manager_t* self, const void* data, tsk
ret = (tnet_ice_ctx_send_turn_rtp(self->ice_ctx, data, size) == 0) ? size : 0; // returns #0 if ok
}
else {
#if 1
ret = tnet_transport_sendto(self->transport, self->transport->master->fd, (const struct sockaddr *)&self->rtp.remote_addr, data, size); // returns number of sent bytes
#else
ret = tnet_sockfd_sendto(self->transport->master->fd, (const struct sockaddr *)&self->rtp.remote_addr, data, size); // returns number of sent bytes
#endif
}
tsk_safeobj_unlock(self);
return ret;
}
int trtp_manager_get_bytes_count(trtp_manager_t* self, uint64_t* bytes_in, uint64_t* bytes_out)
{
if (!self) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
if (!self->is_started) {
TSK_DEBUG_INFO("trtp_manager_get_bytes_count() called before starting RTP manager... returning zeros");
if (bytes_in) *bytes_in = 0;
if (bytes_out) *bytes_out = 0;
return 0;
}
if (self->is_ice_turn_active) {
return tnet_ice_ctx_turn_get_bytes_count(self->ice_ctx, bytes_in, bytes_out);
}
return tnet_transport_get_bytes_count(self->transport, bytes_in, bytes_out);
}
int trtp_manager_set_app_bandwidth_max(trtp_manager_t* self, int32_t bw_upload_kbps, int32_t bw_download_kbps)
{
if(self){
@ -1660,6 +1727,7 @@ int trtp_manager_set_app_bandwidth_max(trtp_manager_t* self, int32_t bw_upload_k
}
return -1;
}
int trtp_manager_signal_pkt_loss(trtp_manager_t* self, uint32_t ssrc_media, const uint16_t* seq_nums, tsk_size_t count)
{
if(self && self->rtcp.session){
@ -1667,6 +1735,7 @@ int trtp_manager_signal_pkt_loss(trtp_manager_t* self, uint32_t ssrc_media, cons
}
return -1;
}
int trtp_manager_signal_frame_corrupted(trtp_manager_t* self, uint32_t ssrc_media)
{
if(self && self->rtcp.session){
@ -1713,6 +1782,7 @@ int trtp_manager_stop(trtp_manager_t* self)
// Stop the RTCP session first (will send BYE)
if(self->rtcp.session){
ret = trtp_rtcp_session_stop(self->rtcp.session);
ret = trtp_rtcp_session_set_net_transport(self->rtcp.session, tsk_null);
}
// Free transport to force next call to start() to create new one with new sockets