Fix segmentation fault on telepresence

This commit is contained in:
bossiel 2014-12-08 11:12:26 +00:00
parent 818f03c753
commit 583d9babc2
6 changed files with 128 additions and 88 deletions

View File

@ -69,73 +69,82 @@ extern const tsk_object_def_t *tdav_session_audio_dtmfe_def_t;
static int tdav_session_audio_rtp_cb(const void* callback_data, const struct trtp_rtp_packet_s* packet)
{
tdav_session_audio_t* audio = (tdav_session_audio_t*)callback_data;
tmedia_codec_t* codec = tsk_null;
tdav_session_av_t* base = (tdav_session_av_t*)callback_data;
int ret = -1;
if(!audio || !packet || !packet->header){
if (!audio || !packet || !packet->header) {
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
goto bail;
}
if(audio->is_started && base->consumer && base->consumer->is_started){
if (audio->is_started && base->consumer && base->consumer->is_started) {
tsk_size_t out_size = 0;
// Find the codec to use to decode the RTP payload
if(!audio->decoder.codec || audio->decoder.payload_type != packet->header->payload_type){
if (!audio->decoder.codec || audio->decoder.payload_type != packet->header->payload_type) {
tsk_istr_t format;
TSK_OBJECT_SAFE_FREE(audio->decoder.codec);
tsk_itoa(packet->header->payload_type, &format);
if(!(audio->decoder.codec = tmedia_codec_find_by_format(TMEDIA_SESSION(audio)->neg_codecs, format)) || !audio->decoder.codec->plugin || !audio->decoder.codec->plugin->decode){
if (!(audio->decoder.codec = tmedia_codec_find_by_format(TMEDIA_SESSION(audio)->neg_codecs, format)) || !audio->decoder.codec->plugin || !audio->decoder.codec->plugin->decode){
TSK_DEBUG_ERROR("%s is not a valid payload for this session", format);
return -2;
ret = -2;
goto bail;
}
audio->decoder.payload_type = packet->header->payload_type;
}
// ref() the codec to be able to use it short time after stop(SAFE_FREE(codec))
if (!(codec = tsk_object_ref(TSK_OBJECT(audio->decoder.codec)))) {
TSK_DEBUG_ERROR("Failed to get decoder codec");
goto bail;
}
// Open codec if not already done
if(!TMEDIA_CODEC(audio->decoder.codec)->opened){
int ret;
if (!TMEDIA_CODEC(codec)->opened) {
tsk_safeobj_lock(base);
if((ret = tmedia_codec_open(audio->decoder.codec))){
if ((ret = tmedia_codec_open(codec))) {
tsk_safeobj_unlock(base);
TSK_DEBUG_ERROR("Failed to open [%s] codec", audio->decoder.codec->plugin->desc);
TSK_DEBUG_ERROR("Failed to open [%s] codec", codec->plugin->desc);
TSK_OBJECT_SAFE_FREE(audio->decoder.codec);
return ret;
goto bail;
}
tsk_safeobj_unlock(base);
}
// Decode data
out_size = audio->decoder.codec->plugin->decode(audio->decoder.codec, packet->payload.data, packet->payload.size, &audio->decoder.buffer, &audio->decoder.buffer_size, packet->header);
if(out_size){
out_size = codec->plugin->decode(codec, packet->payload.data, packet->payload.size, &audio->decoder.buffer, &audio->decoder.buffer_size, packet->header);
if (out_size && audio->is_started) { // check "is_started" again ...to be sure stop() not called by another thread
void* buffer = audio->decoder.buffer;
tsk_size_t size = out_size;
// resample if needed
if((base->consumer->audio.out.rate && base->consumer->audio.out.rate != audio->decoder.codec->in.rate) || (base->consumer->audio.out.channels && base->consumer->audio.out.channels != TMEDIA_CODEC_AUDIO(audio->decoder.codec)->in.channels)){
if ((base->consumer->audio.out.rate && base->consumer->audio.out.rate != codec->in.rate) || (base->consumer->audio.out.channels && base->consumer->audio.out.channels != TMEDIA_CODEC_AUDIO(codec)->in.channels)) {
tsk_size_t resampler_result_size = 0;
int bytesPerSample = (base->consumer->audio.bits_per_sample >> 3);
if(!audio->decoder.resampler.instance){
if (!audio->decoder.resampler.instance) {
TSK_DEBUG_INFO("Create audio resampler(%s) for consumer: rate=%d->%d, channels=%d->%d, bytesPerSample=%d",
audio->decoder.codec->plugin->desc,
audio->decoder.codec->in.rate, base->consumer->audio.out.rate,
TMEDIA_CODEC_AUDIO(audio->decoder.codec)->in.channels, base->consumer->audio.out.channels,
codec->plugin->desc,
codec->in.rate, base->consumer->audio.out.rate,
TMEDIA_CODEC_AUDIO(codec)->in.channels, base->consumer->audio.out.channels,
bytesPerSample);
audio->decoder.resampler.instance = _tdav_session_audio_resampler_create(
bytesPerSample,
audio->decoder.codec->in.rate, base->consumer->audio.out.rate,
codec->in.rate, base->consumer->audio.out.rate,
base->consumer->audio.ptime,
TMEDIA_CODEC_AUDIO(audio->decoder.codec)->in.channels, base->consumer->audio.out.channels,
TMEDIA_CODEC_AUDIO(codec)->in.channels, base->consumer->audio.out.channels,
TDAV_AUDIO_RESAMPLER_DEFAULT_QUALITY,
&audio->decoder.resampler.buffer, &audio->decoder.resampler.buffer_size
);
}
if(!audio->decoder.resampler.instance){
if (!audio->decoder.resampler.instance) {
TSK_DEBUG_ERROR("No resampler to handle data");
return -5;
ret = -5;
goto bail;
}
if(!(resampler_result_size = tmedia_resampler_process(audio->decoder.resampler.instance, buffer, size/bytesPerSample, audio->decoder.resampler.buffer, audio->decoder.resampler.buffer_size/bytesPerSample))){
TSK_DEBUG_ERROR("Failed to process audio resampler input buffer");
return -6;
ret = -6;
goto bail;
}
buffer = audio->decoder.resampler.buffer;
@ -143,17 +152,23 @@ static int tdav_session_audio_rtp_cb(const void* callback_data, const struct trt
}
// adjust the gain
if(base->consumer->audio.gain){
if (base->consumer->audio.gain) {
_tdav_session_audio_apply_gain(buffer, size, base->consumer->audio.bits_per_sample, base->consumer->audio.gain);
}
// consume the frame
tmedia_consumer_consume(base->consumer, buffer, size, packet->header);
}
}
else{
else {
TSK_DEBUG_INFO("Session audio not ready");
}
return 0;
// everything is ok
ret = 0;
bail:
tsk_object_unref(TSK_OBJECT(codec));
return ret;
}
// Producer callback (From the producer to the network). Will encode() data before sending
@ -164,23 +179,23 @@ static int tdav_session_audio_producer_enc_cb(const void* callback_data, const v
tdav_session_audio_t* audio = (tdav_session_audio_t*)callback_data;
tdav_session_av_t* base = (tdav_session_av_t*)callback_data;
if(!audio){
if (!audio) {
TSK_DEBUG_ERROR("Null session");
return 0;
}
// do nothing if session is held
// when the session is held the end user will get feedback he also has possibilities to put the consumer and producer on pause
if(TMEDIA_SESSION(audio)->lo_held){
if (TMEDIA_SESSION(audio)->lo_held) {
return 0;
}
// get best negotiated codec if not already done
// the encoder codec could be null when session is renegotiated without re-starting (e.g. hold/resume)
if(!audio->encoder.codec){
if (!audio->encoder.codec) {
const tmedia_codec_t* codec;
tsk_safeobj_lock(base);
if(!(codec = tdav_session_av_get_best_neg_codec(base))){
if (!(codec = tdav_session_av_get_best_neg_codec(base))) {
TSK_DEBUG_ERROR("No codec matched");
tsk_safeobj_unlock(base);
return -2;
@ -189,14 +204,14 @@ static int tdav_session_audio_producer_enc_cb(const void* callback_data, const v
tsk_safeobj_unlock(base);
}
if(audio->is_started && base->rtp_manager && base->rtp_manager->is_started){
if (audio->is_started && base->rtp_manager && base->rtp_manager->is_started) {
/* encode */
tsk_size_t out_size = 0;
// Open codec if not already done
if(!audio->encoder.codec->opened){
if (!audio->encoder.codec->opened) {
tsk_safeobj_lock(base);
if((ret = tmedia_codec_open(audio->encoder.codec))){
if ((ret = tmedia_codec_open(audio->encoder.codec))) {
tsk_safeobj_unlock(base);
TSK_DEBUG_ERROR("Failed to open [%s] codec", audio->encoder.codec->plugin->desc);
return -4;
@ -204,8 +219,8 @@ static int tdav_session_audio_producer_enc_cb(const void* callback_data, const v
tsk_safeobj_unlock(base);
}
// check if we're sending DTMF or not
if(audio->is_sending_dtmf_events){
if(base->rtp_manager){
if (audio->is_sending_dtmf_events) {
if (base->rtp_manager) {
// increment the timestamp
base->rtp_manager->rtp.timestamp += TMEDIA_CODEC_PCM_FRAME_SIZE_AUDIO_ENCODING(audio->encoder.codec)/*duration*/;
}

View File

@ -906,7 +906,7 @@ static void* TSK_STDCALL run(void* self)
tsk_list_item_t *curr;
tnet_transport_t *transport = self;
TSK_DEBUG_INFO("Transport::run() - enter");
TSK_DEBUG_INFO("Transport::run(%s) - enter", transport->description);
/* create main thread */
if((ret = tsk_thread_create(transport->mainThreadId, tnet_transport_mainthread, transport))){ /* More important than "tsk_runnable_start" ==> start it first. */
@ -962,7 +962,6 @@ static tsk_object_t* tnet_transport_dtor(tsk_object_t * self)
TSK_OBJECT_SAFE_FREE(transport->master);
TSK_OBJECT_SAFE_FREE(transport->context);
TSK_OBJECT_SAFE_FREE(transport->natt_ctx);
TSK_FREE(transport->description);
TSK_FREE(transport->local_ip);
TSK_FREE(transport->local_host);
@ -971,6 +970,9 @@ static tsk_object_t* tnet_transport_dtor(tsk_object_t * self)
TSK_FREE(transport->tls.pbk);
TSK_FREE(transport->tls.pvk);
_tnet_transport_ssl_deinit(transport); // openssl contexts
TSK_DEBUG_INFO("*** Transport (%s) destroyed ***", transport->description);
TSK_FREE(transport->description);
}
return self;

View File

@ -333,10 +333,10 @@ static transport_socket_xt* getSocket(transport_context_t *context, tnet_fd_t fd
tsk_size_t i;
transport_socket_xt* ret = 0;
if(context){
if (context) {
tsk_safeobj_lock(context);
for(i=0; i<context->count; i++){
if(context->sockets[i]->fd == fd){
for (i=0; i<context->count; i++) {
if (context->sockets[i]->fd == fd) {
ret = context->sockets[i];
break;
}
@ -352,21 +352,21 @@ static int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *tr
{
transport_context_t *context;
if(TNET_SOCKET_TYPE_IS_TLS(type) || TNET_SOCKET_TYPE_IS_WSS(type)){
if (TNET_SOCKET_TYPE_IS_TLS(type) || TNET_SOCKET_TYPE_IS_WSS(type)) {
#if !HAVE_OPENSSL
TSK_DEBUG_ERROR("Cannot create TLS socket: OpenSSL missing");
return -2;
#endif
}
if((context = transport ? transport->context : tsk_null)){
if ((context = transport ? transport->context : tsk_null)) {
transport_socket_xt *sock = tsk_calloc(1, sizeof(transport_socket_xt));
sock->fd = fd;
sock->type = type;
sock->owner = take_ownership ? 1 : 0;
if((TNET_SOCKET_TYPE_IS_TLS(sock->type) || TNET_SOCKET_TYPE_IS_WSS(sock->type)) && transport->tls.enabled){
if(tlsHandle){
if ((TNET_SOCKET_TYPE_IS_TLS(sock->type) || TNET_SOCKET_TYPE_IS_WSS(sock->type)) && transport->tls.enabled) {
if (tlsHandle) {
sock->tlshandle = tsk_object_ref(tlsHandle);
}
else{
@ -388,7 +388,7 @@ static int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *tr
return 0;
}
else{
else {
TSK_DEBUG_ERROR("Context is Null.");
return -1;
}
@ -401,15 +401,15 @@ static int removeSocket(int index, transport_context_t *context)
tsk_safeobj_lock(context);
if(index < (int)context->count){
if (index < (int)context->count) {
/* Close the socket if we are the owner. */
if(context->sockets[index]->owner){
if (context->sockets[index]->owner) {
tnet_sockfd_close(&(context->sockets[index]->fd));
}
/* Free tls context */
if(context->sockets[index]->tlshandle){
if (context->sockets[index]->tlshandle) {
TSK_OBJECT_SAFE_FREE(context->sockets[index]->tlshandle);
}
// Free socket
@ -418,7 +418,7 @@ static int removeSocket(int index, transport_context_t *context)
// Close event
WSACloseEvent(context->events[index]);
for(i=index ; i<context->count-1; i++){
for (i=index ; i<context->count-1; i++) {
context->sockets[i] = context->sockets[i+1];
context->events[i] = context->events[i+1];
}
@ -558,15 +558,14 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
TSK_DEBUG_INFO("Starting [%s] server with IP {%s} on port {%d} with type {%d}...", transport->description, transport->master->ip, transport->master->port, transport->master->type);
while(TSK_RUNNABLE(transport)->running || TSK_RUNNABLE(transport)->started)
{
while (TSK_RUNNABLE(transport)->running || TSK_RUNNABLE(transport)->started) {
/* Wait for multiple events */
if((evt = WSAWaitForMultipleEvents(context->count, context->events, FALSE, WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED){
if ((evt = WSAWaitForMultipleEvents(context->count, context->events, FALSE, WSA_INFINITE, FALSE)) == WSA_WAIT_FAILED) {
TNET_PRINT_LAST_ERROR("WSAWaitForMultipleEvents have failed.");
goto bail;
}
if(!TSK_RUNNABLE(transport)->running && !TSK_RUNNABLE(transport)->started){
if (!TSK_RUNNABLE(transport)->running && !TSK_RUNNABLE(transport)->started) {
goto bail;
}
@ -576,12 +575,12 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
/* Get active event and socket */
index = (evt - WSA_WAIT_EVENT_0);
active_event = context->events[index];
if(!(active_socket = context->sockets[index])){
if (!(active_socket = context->sockets[index])) {
goto done;
}
/* Get the network events flags */
if (WSAEnumNetworkEvents(active_socket->fd, active_event, &networkEvents) == SOCKET_ERROR){
if (WSAEnumNetworkEvents(active_socket->fd, active_event, &networkEvents) == SOCKET_ERROR) {
TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, active_socket->fd);
TNET_PRINT_LAST_ERROR("WSAEnumNetworkEvents have failed.");
@ -590,13 +589,12 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
}
/*================== FD_ACCEPT ==================*/
if(networkEvents.lNetworkEvents & FD_ACCEPT)
{
if (networkEvents.lNetworkEvents & FD_ACCEPT) {
tnet_fd_t fd;
TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- FD_ACCEPT", transport->description);
if(networkEvents.iErrorCode[FD_ACCEPT_BIT]){
if (networkEvents.iErrorCode[FD_ACCEPT_BIT]) {
TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, active_socket->fd);
TNET_PRINT_LAST_ERROR("ACCEPT FAILED.");
goto done;
@ -616,14 +614,14 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
}
}
}
if(WSAEventSelect(fd, context->events[context->count - 1], FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR){
if (WSAEventSelect(fd, context->events[context->count - 1], FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) {
tnet_transport_remove_socket(transport, &fd);
TNET_PRINT_LAST_ERROR("WSAEventSelect() have failed.");
goto done;
}
TSK_RUNNABLE_ENQUEUE(transport, event_accepted, transport->callback_data, fd);
}
else{
else {
TNET_PRINT_LAST_ERROR("ACCEPT FAILED.");
goto done;
}
@ -634,8 +632,7 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
}
/*================== FD_CONNECT ==================*/
if(networkEvents.lNetworkEvents & FD_CONNECT)
{
if (networkEvents.lNetworkEvents & FD_CONNECT) {
TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- FD_CONNECT", transport->description);
if (networkEvents.iErrorCode[FD_CONNECT_BIT]) {
@ -653,8 +650,7 @@ void* TSK_STDCALL tnet_transport_mainthread(void *param)
/*================== FD_READ ==================*/
if(networkEvents.lNetworkEvents & FD_READ)
{
if (networkEvents.lNetworkEvents & FD_READ) {
DWORD readCount = 0;
WSABUF wsaBuffer;
@ -754,11 +750,10 @@ FD_READ_DONE:;
/*================== FD_WRITE ==================*/
if(networkEvents.lNetworkEvents & FD_WRITE)
{
if (networkEvents.lNetworkEvents & FD_WRITE) {
TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- FD_WRITE", transport->description);
if(networkEvents.iErrorCode[FD_WRITE_BIT]){
if (networkEvents.iErrorCode[FD_WRITE_BIT]) {
TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, active_socket->fd);
TNET_PRINT_LAST_ERROR("WRITE FAILED.");
goto done;
@ -768,7 +763,7 @@ FD_READ_DONE:;
/*================== FD_CLOSE ==================*/
if(networkEvents.lNetworkEvents & FD_CLOSE){
if (networkEvents.lNetworkEvents & FD_CLOSE) {
TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- FD_CLOSE", transport->description);
TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, active_socket->fd);
@ -786,7 +781,6 @@ FD_READ_DONE:;
done:
/* unlock context */
tsk_safeobj_unlock(context);
} /* while(transport->running) */
@ -820,8 +814,8 @@ static tsk_object_t* transport_context_ctor(tsk_object_t * self, va_list * app)
static tsk_object_t* transport_context_dtor(tsk_object_t * self)
{
transport_context_t *context = self;
if(context){
while(context->count){
if (context) {
while(context->count) {
removeSocket(0, context);
}
tsk_safeobj_deinit(context);

View File

@ -35,6 +35,8 @@
# include <windows.h>
#endif
static void* TSK_STDCALL __async_join(void* self);
/**@defgroup tsk_runnable_group Base class for runnable object.
*/
@ -196,27 +198,27 @@ int tsk_runnable_set_priority(tsk_runnable_t *self, int32_t priority)
int tsk_runnable_stop(tsk_runnable_t *self)
{
int ret = -1;
if(self){
if (self) {
tsk_thread_id_t id_curr_thread;
if(!self->initialized) {
if(!self->running){
if (!self->initialized) {
if (!self->running) {
/* already deinitialized */
return 0;
}
else{
else {
/* should never happen */
TSK_DEBUG_ERROR("Not initialized.");
return -2;
}
}
else if(!self->running) {
else if (!self->running) {
if(self->started){
if (self->started) {
tsk_size_t count = 0;
/* Thread is started but not running ==> Give it time.*/
while(++count <= 5){
while (++count <= 5) {
tsk_thread_sleep(count * 200);
if(self->running){
if (self->running) {
goto stop;
}
}
@ -233,10 +235,21 @@ stop:
// To avoid deadlock we don't join() the thread if this funcion is called from the "run()" function
// setting "self::running" to false is enough to exit the thread after the call to "TSK_RUNNABLE_RUN_BEGIN(self)"
id_curr_thread = tsk_thread_get_id();
if(tsk_thread_id_equals(&self->id_thread, &id_curr_thread)){
ret = tsk_thread_destroy(&(self->h_thread[0]));
if (tsk_thread_id_equals(&self->id_thread, &id_curr_thread)) {
tsk_runnable_t* copy = tsk_object_ref(TSK_OBJECT(self)); // "copy" will be null if this function is called in the "dtor()" because "refCount" is already equal to "zero".
TSK_DEBUG_INFO("tsk_thread_join(%s) called inside the thread(%lu) itself...delaying", copy ? "NOT null" : "null", id_curr_thread);
if (!copy || self->h_thread[1]) {
if (self->h_thread[1]) { // must never happen
TSK_DEBUG_ERROR("Join already delayed");
}
ret = tsk_thread_destroy(&(self->h_thread[0]));
tsk_object_unref(TSK_OBJECT(copy));
}
else {
ret = tsk_thread_create(&(self->h_thread[1]), __async_join, copy);
}
}
else if((ret = tsk_thread_join(&(self->h_thread[0])))){
else if ((ret = tsk_thread_join(&(self->h_thread[0])))) {
self->running = tsk_true;
TSK_DEBUG_ERROR("Failed to join the thread.");
return ret;
@ -248,13 +261,23 @@ stop:
return ret;
}
static void* TSK_STDCALL __async_join(void* arg)
{
tsk_runnable_t *self = (tsk_runnable_t *)arg;
if (self) {
tsk_thread_join(&self->h_thread[0]);
return tsk_object_unref(TSK_OBJECT(self));
}
return self;
}
//=================================================================================================
// Runnable object definition
//
static tsk_object_t* tsk_runnable_ctor(tsk_object_t * self, va_list * app)
{
tsk_runnable_t* runnable = (tsk_runnable_t*)self;
if(runnable){
if (runnable) {
}
return self;
@ -263,9 +286,13 @@ static tsk_object_t* tsk_runnable_ctor(tsk_object_t * self, va_list * app)
static tsk_object_t* tsk_runnable_dtor(tsk_object_t * self)
{
tsk_runnable_t* runnable = (tsk_runnable_t*)self;
if(runnable){
if (runnable) {
/* stops runnable object (if running or started) */
tsk_runnable_stop(runnable);
tsk_runnable_stop(runnable); // join(runnable->h_thread[0])
if (runnable->h_thread[1]) {
tsk_thread_destroy(&(runnable->h_thread[1])); // must not be join()
}
TSK_DEBUG_INFO("*** tsk_runnable_t destroyed ***");
}
return self;
}

View File

@ -56,7 +56,7 @@ typedef struct tsk_runnable_s
const tsk_object_def_t *objdef;
tsk_thread_handle_t* h_thread[1];
tsk_thread_handle_t* h_thread[2/*0=default,1=delayed*/];
tsk_runnable_func_run run;
tsk_thread_id_t id_thread; // no way to get this value from "h_thread" on WINXP
tsk_semaphore_handle_t *semaphore;

View File

@ -164,14 +164,16 @@ int tsk_semaphore_increment(tsk_semaphore_handle_t* handle)
int tsk_semaphore_decrement(tsk_semaphore_handle_t* handle)
{
int ret = EINVAL;
if(handle){
if (handle) {
#if TSK_UNDER_WINDOWS
# if TSK_UNDER_WINDOWS_RT
ret = (WaitForSingleObjectEx((SEMAPHORE_T)handle, INFINITE, TRUE) == WAIT_OBJECT_0) ? 0 : -1;
# else
ret = (WaitForSingleObject((SEMAPHORE_T)handle, INFINITE) == WAIT_OBJECT_0) ? 0 : -1;
#endif
if(ret) TSK_DEBUG_ERROR("sem_wait function failed: %d", ret);
if (ret) {
TSK_DEBUG_ERROR("sem_wait function failed: %d", ret);
}
#else
do {
ret = sem_wait((SEMAPHORE_T)GET_SEM(handle));