close up some race conditions fit for the indy 500

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@7732 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2008-02-25 16:35:19 +00:00
parent d275a8dd3d
commit f565f7710a
6 changed files with 154 additions and 74 deletions

View File

@ -90,33 +90,20 @@ typedef enum {
struct switch_core_session {
switch_size_t id;
char name[80];
switch_session_flag_t flags;
int thread_running;
switch_memory_pool_t *pool;
switch_channel_t *channel;
switch_thread_t *thread;
const switch_endpoint_interface_t *endpoint_interface;
switch_size_t id;
switch_session_flag_t flags;
int thread_running;
switch_channel_t *channel;
switch_io_event_hooks_t event_hooks;
switch_codec_t *read_codec;
switch_codec_t *write_codec;
switch_codec_t *video_read_codec;
switch_codec_t *video_write_codec;
switch_buffer_t *raw_write_buffer;
switch_frame_t raw_write_frame;
switch_frame_t enc_write_frame;
uint8_t raw_write_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
uint8_t enc_write_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_buffer_t *raw_read_buffer;
switch_frame_t raw_read_frame;
switch_frame_t enc_read_frame;
uint8_t raw_read_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
uint8_t enc_read_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_audio_resampler_t *read_resampler;
switch_audio_resampler_t *write_resampler;
@ -138,6 +125,18 @@ struct switch_core_session {
switch_media_bug_t *bugs;
switch_app_log_t *app_log;
uint32_t stack_count;
switch_buffer_t *raw_write_buffer;
switch_frame_t raw_write_frame;
switch_frame_t enc_write_frame;
uint8_t raw_write_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
uint8_t enc_write_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_buffer_t *raw_read_buffer;
switch_frame_t raw_read_frame;
switch_frame_t enc_read_frame;
uint8_t raw_read_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
uint8_t enc_read_buf[SWITCH_RECOMMENDED_BUFFER_SIZE];
};
struct switch_media_bug {

View File

@ -705,16 +705,19 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
{
switch_channel_t *channel = switch_core_session_get_channel(session);
private_object_t *tech_pvt = switch_core_session_get_private(session);
switch_status_t status;
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_mutex_lock(tech_pvt->flag_mutex);
if (switch_channel_get_state(channel) >= CS_HANGUP || !tech_pvt) {
return SWITCH_STATUS_FALSE;
status = SWITCH_STATUS_FALSE;
goto end;
}
if (msg->message_id == SWITCH_MESSAGE_INDICATE_ANSWER || msg->message_id == SWITCH_MESSAGE_INDICATE_PROGRESS) {
const char *var;
if ((var = switch_channel_get_variable(channel, SOFIA_SECURE_MEDIA_VARIABLE)) && switch_true(var)) {
switch_set_flag_locked(tech_pvt, TFLAG_SECURE);
switch_set_flag(tech_pvt, TFLAG_SECURE);
}
}
@ -786,7 +789,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending media re-direct:\n%s\n", msg->string_arg);
tech_pvt->local_sdp_str = switch_core_session_strdup(session, msg->string_arg);
switch_set_flag_locked(tech_pvt, TFLAG_SENT_UPDATE);
switch_set_flag(tech_pvt, TFLAG_SENT_UPDATE);
sofia_glue_do_invite(session);
}
break;
@ -801,7 +804,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
sofia_glue_tech_prepare_codecs(tech_pvt);
if ((status = sofia_glue_tech_choose_port(tech_pvt, 0)) != SWITCH_STATUS_SUCCESS) {
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
return status;
goto end;
}
}
sofia_glue_set_local_sdp(tech_pvt, NULL, 0, NULL, 1);
@ -811,7 +814,8 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
tech_pvt->read_frame.datalen = 0;
while (switch_test_flag(tech_pvt, TFLAG_IO) && switch_channel_get_state(channel) < CS_HANGUP && !switch_rtp_ready(tech_pvt->rtp_session)) {
if (++count > 1000) {
return SWITCH_STATUS_FALSE;
status = SWITCH_STATUS_FALSE;
goto end;
}
if (!switch_rtp_ready(tech_pvt->rtp_session)) {
switch_yield(1000);
@ -824,7 +828,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
case SWITCH_MESSAGE_INDICATE_HOLD:
{
switch_set_flag_locked(tech_pvt, TFLAG_SIP_HOLD);
switch_set_flag(tech_pvt, TFLAG_SIP_HOLD);
sofia_glue_do_invite(session);
}
break;
@ -846,7 +850,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
private_object_t *a_tech_pvt = switch_core_session_get_private(a_session);
private_object_t *b_tech_pvt = switch_core_session_get_private(b_session);
switch_set_flag_locked(a_tech_pvt, TFLAG_REINVITE);
switch_set_flag(a_tech_pvt, TFLAG_REINVITE);
a_tech_pvt->remote_sdp_audio_ip = switch_core_session_strdup(a_session, b_tech_pvt->remote_sdp_audio_ip);
a_tech_pvt->remote_sdp_audio_port = b_tech_pvt->remote_sdp_audio_port;
a_tech_pvt->local_sdp_audio_ip = switch_core_session_strdup(a_session, b_tech_pvt->local_sdp_audio_ip);
@ -859,7 +863,8 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
}
msg->pointer_arg = NULL;
return SWITCH_STATUS_FALSE;
status = SWITCH_STATUS_FALSE;
goto end;
}
}
/*
@ -881,7 +886,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
if (msg->string_arg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Re-directing to %s\n", msg->string_arg);
nua_respond(tech_pvt->nh, SIP_302_MOVED_TEMPORARILY, SIPTAG_CONTACT_STR(msg->string_arg), TAG_END());
switch_set_flag_locked(tech_pvt, TFLAG_BYE);
switch_set_flag(tech_pvt, TFLAG_BYE);
}
break;
@ -958,7 +963,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
nua_respond(tech_pvt->nh, code, reason, TAG_IF(to_uri, SIPTAG_CONTACT_STR(to_uri)),
SIPTAG_SUPPORTED_STR(NULL), SIPTAG_ACCEPT_STR(NULL),
TAG_IF(!switch_strlen_zero(max_forwards), SIPTAG_MAX_FORWARDS_STR(max_forwards)), TAG_END());
switch_set_flag_locked(tech_pvt, TFLAG_BYE);
switch_set_flag(tech_pvt, TFLAG_BYE);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Responding with %d %s\n", code, reason);
@ -976,7 +981,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
} else {
nua_respond(tech_pvt->nh, code, reason, SIPTAG_CONTACT_STR(tech_pvt->reply_contact), TAG_END());
}
switch_set_flag_locked(tech_pvt, TFLAG_BYE);
switch_set_flag(tech_pvt, TFLAG_BYE);
}
}
@ -984,18 +989,18 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
case SWITCH_MESSAGE_INDICATE_RINGING:
if (!switch_channel_test_flag(channel, CF_RING_READY) &&
!switch_channel_test_flag(channel, CF_EARLY_MEDIA) && !switch_channel_test_flag(channel, CF_ANSWERED)) {
nua_respond(tech_pvt->nh, SIP_180_RINGING, SIPTAG_CONTACT_STR(tech_pvt->profile->url), TAG_END());
nua_respond(tech_pvt->nh, SIP_180_RINGING, SIPTAG_CONTACT_STR(tech_pvt->reply_contact), TAG_END());
switch_channel_mark_ring_ready(channel);
}
break;
case SWITCH_MESSAGE_INDICATE_ANSWER:
return sofia_answer_channel(session);
status = sofia_answer_channel(session);
break;
case SWITCH_MESSAGE_INDICATE_PROGRESS:
{
if (!switch_test_flag(tech_pvt, TFLAG_ANS) && !switch_test_flag(tech_pvt, TFLAG_EARLY_MEDIA)) {
switch_set_flag_locked(tech_pvt, TFLAG_EARLY_MEDIA);
switch_set_flag(tech_pvt, TFLAG_EARLY_MEDIA);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Asked to send early media by %s\n", msg->from);
/* Transmit 183 Progress with SDP */
@ -1007,7 +1012,8 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
if (switch_channel_test_flag(channel, CF_PROXY_MEDIA)) {
sofia_glue_tech_patch_sdp(tech_pvt);
if (sofia_glue_activate_rtp(tech_pvt, 0) != SWITCH_STATUS_SUCCESS) {
return SWITCH_STATUS_FALSE;
status = SWITCH_STATUS_FALSE;
goto end;
}
}
} else {
@ -1022,14 +1028,15 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
if (sofia_glue_tech_media(tech_pvt, r_sdp) != SWITCH_STATUS_SUCCESS) {
switch_channel_set_variable(channel, SWITCH_ENDPOINT_DISPOSITION_VARIABLE, "CODEC NEGOTIATION ERROR");
//nua_respond(tech_pvt->nh, SIP_488_NOT_ACCEPTABLE, TAG_END());
return SWITCH_STATUS_FALSE;
status = SWITCH_STATUS_FALSE;
goto end;
}
}
}
if ((status = sofia_glue_tech_choose_port(tech_pvt, 0)) != SWITCH_STATUS_SUCCESS) {
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
return status;
goto end;
}
sofia_glue_set_local_sdp(tech_pvt, NULL, 0, NULL, 0);
if (sofia_glue_activate_rtp(tech_pvt, 0) != SWITCH_STATUS_SUCCESS) {
@ -1052,7 +1059,14 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
break;
}
return SWITCH_STATUS_SUCCESS;
end:
//xxxbot
switch_mutex_unlock(tech_pvt->flag_mutex);
return status;
}
static switch_status_t sofia_receive_event(switch_core_session_t *session, switch_event_t *event)

View File

@ -222,6 +222,10 @@ void sofia_event_callback(nua_event_t event,
nua_event_name(event), status, phrase, session ? switch_channel_get_name(channel) : "n/a");
}
if (tech_pvt) {
switch_mutex_lock(tech_pvt->flag_mutex);
}
if ((profile->pflags & PFLAG_AUTH_ALL) && tech_pvt && tech_pvt->key && sip) {
sip_authorization_t const *authorization = NULL;
@ -343,10 +347,15 @@ void sofia_event_callback(nua_event_t event,
}
done:
if (gateway) {
sofia_reg_release_gateway(gateway);
}
if (tech_pvt) {
switch_mutex_unlock(tech_pvt->flag_mutex);
}
if (session) {
switch_core_session_rwunlock(session);
}

View File

@ -1424,16 +1424,20 @@ switch_status_t sofia_glue_activate_rtp(private_object_t *tech_pvt, switch_rtp_f
switch_assert(tech_pvt != NULL);
switch_mutex_lock(tech_pvt->flag_mutex);
if (switch_channel_test_flag(tech_pvt->channel, CF_PROXY_MODE)) {
return SWITCH_STATUS_SUCCESS;
status = SWITCH_STATUS_SUCCESS;
goto end;
}
if (switch_rtp_ready(tech_pvt->rtp_session) && !switch_test_flag(tech_pvt, TFLAG_REINVITE)) {
return SWITCH_STATUS_SUCCESS;
status = SWITCH_STATUS_SUCCESS;
goto end;
}
if ((status = sofia_glue_tech_set_codec(tech_pvt, 0)) != SWITCH_STATUS_SUCCESS) {
return status;
goto end;
}
bw = tech_pvt->read_codec.implementation->bits_per_second;
@ -1503,7 +1507,7 @@ switch_status_t sofia_glue_activate_rtp(private_object_t *tech_pvt, switch_rtp_f
if (switch_channel_test_flag(tech_pvt->channel, CF_PROXY_MEDIA)) {
if ((status = sofia_glue_tech_proxy_remote_addr(tech_pvt)) != SWITCH_STATUS_SUCCESS) {
return status;
goto end;
}
flags = (switch_rtp_flag_t) (SWITCH_RTP_FLAG_PROXY_MEDIA | SWITCH_RTP_FLAG_AUTOADJ | SWITCH_RTP_FLAG_DATAWAIT);
timer_name = NULL;
@ -1543,12 +1547,12 @@ switch_status_t sofia_glue_activate_rtp(private_object_t *tech_pvt, switch_rtp_f
}
tech_pvt->ssrc = switch_rtp_get_ssrc(tech_pvt->rtp_session);
switch_set_flag_locked(tech_pvt, TFLAG_RTP);
switch_set_flag_locked(tech_pvt, TFLAG_IO);
switch_set_flag(tech_pvt, TFLAG_RTP);
switch_set_flag(tech_pvt, TFLAG_IO);
if ((vad_in && inb) || (vad_out && !inb)) {
switch_rtp_enable_vad(tech_pvt->rtp_session, tech_pvt->session, &tech_pvt->read_codec, SWITCH_VAD_FLAG_TALKING);
switch_set_flag_locked(tech_pvt, TFLAG_VAD);
switch_set_flag(tech_pvt, TFLAG_VAD);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AUDIO RTP Engage VAD for %s ( %s %s )\n",
switch_channel_get_name(switch_core_session_get_channel(tech_pvt->session)), vad_in ? "in" : "", vad_out ? "out" : "");
}
@ -1651,11 +1655,18 @@ switch_status_t sofia_glue_activate_rtp(private_object_t *tech_pvt, switch_rtp_f
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTP REPORTS ERROR: [%s]\n", switch_str_nil(err));
switch_channel_hangup(tech_pvt->channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_clear_flag_locked(tech_pvt, TFLAG_IO);
return SWITCH_STATUS_FALSE;
status = SWITCH_STATUS_FALSE;
goto end;
}
switch_set_flag_locked(tech_pvt, TFLAG_IO);
return SWITCH_STATUS_SUCCESS;
switch_set_flag(tech_pvt, TFLAG_IO);
status = SWITCH_STATUS_SUCCESS;
end:
switch_mutex_unlock(tech_pvt->flag_mutex);
return status;
}
switch_status_t sofia_glue_tech_media(private_object_t *tech_pvt, const char *r_sdp)

View File

@ -343,6 +343,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_receive_message(switch_core_
switch_assert(session != NULL);
if ((status = switch_core_session_read_lock(session)) != SWITCH_STATUS_SUCCESS) {
return status;
}
if (session->endpoint_interface->io_routines->receive_message) {
status = session->endpoint_interface->io_routines->receive_message(session, message);
}
@ -356,6 +360,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_receive_message(switch_core_
}
switch_core_session_kill_channel(session, SWITCH_SIG_BREAK);
switch_core_session_rwunlock(session);
return status;
}
@ -831,8 +837,6 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request(const switch
switch_queue_create(&session->message_queue, SWITCH_MESSAGE_QUEUE_LEN, session->pool);
switch_queue_create(&session->event_queue, SWITCH_EVENT_QUEUE_LEN, session->pool);
switch_queue_create(&session->private_event_queue, SWITCH_EVENT_QUEUE_LEN, session->pool);
switch_snprintf(session->name, sizeof(session->name), "%"SWITCH_SIZE_T_FMT, session->id);
switch_mutex_lock(runtime.throttle_mutex);
session->id = session_manager.session_id++;
switch_core_hash_insert(session_manager.session_table, session->uuid_str, session);

View File

@ -242,9 +242,18 @@ static void handle_ice(switch_rtp_t *rtp_session, void *data, switch_size_t len)
unsigned char buf[512] = { 0 };
switch_size_t cpylen = len;
if (!switch_rtp_ready(rtp_session)) {
return;
}
READ_INC(rtp_session);
WRITE_INC(rtp_session);
if (!switch_rtp_ready(rtp_session)) {
goto end;
}
if (cpylen > 512) {
cpylen = 512;
}
@ -288,6 +297,8 @@ static void handle_ice(switch_rtp_t *rtp_session, void *data, switch_size_t len)
switch_socket_sendto(rtp_session->sock, rtp_session->from_addr, 0, (void *) rpacket, &bytes);
}
end:
READ_DEC(rtp_session);
WRITE_DEC(rtp_session);
}
@ -401,8 +412,19 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s
int x;
#endif
WRITE_INC(rtp_session);
READ_INC(rtp_session);
if (rtp_session->ready != 1) {
if (!switch_rtp_ready(rtp_session)) {
return SWITCH_STATUS_FALSE;
}
WRITE_INC(rtp_session);
READ_INC(rtp_session);
if (!switch_rtp_ready(rtp_session)) {
goto done;
}
}
*err = NULL;
@ -482,8 +504,10 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s
switch_socket_close(old_sock);
}
WRITE_DEC(rtp_session);
READ_DEC(rtp_session);
if (rtp_session->ready != 1) {
WRITE_DEC(rtp_session);
READ_DEC(rtp_session);
}
return status;
}
@ -635,7 +659,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session
rtp_session->pool = pool;
rtp_session->te = 101;
rtp_session->ready = 1;
switch_mutex_init(&rtp_session->flag_mutex, SWITCH_MUTEX_NESTED, pool);
switch_mutex_init(&rtp_session->read_mutex, SWITCH_MUTEX_NESTED, pool);
@ -697,6 +721,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session
}
}
rtp_session->ready = 1;
*new_rtp_session = rtp_session;
return SWITCH_STATUS_SUCCESS;
@ -724,7 +749,7 @@ SWITCH_DECLARE(switch_rtp_t *) switch_rtp_new(const char *rx_host,
rtp_session = NULL;
goto end;
}
if (switch_rtp_set_local_address(rtp_session, rx_host, rx_port, err) != SWITCH_STATUS_SUCCESS) {
rtp_session = NULL;
}
@ -793,31 +818,41 @@ SWITCH_DECLARE(void) switch_rtp_kill_socket(switch_rtp_t *rtp_session)
SWITCH_DECLARE(uint8_t) switch_rtp_ready(switch_rtp_t *rtp_session)
{
return (rtp_session != NULL &&
switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) && rtp_session->sock && rtp_session->remote_addr && rtp_session->ready == 2) ? 1 : 0;
uint8_t ret;
if (!rtp_session || !rtp_session->flag_mutex) {
return 0;
}
switch_mutex_lock(rtp_session->flag_mutex);
ret = (rtp_session != NULL &&
switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) && rtp_session->sock && rtp_session->remote_addr && rtp_session->ready == 2) ? 1 : 0;
switch_mutex_unlock(rtp_session->flag_mutex);
return ret;
}
SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session)
{
void *pop;
switch_socket_t *sock;
int sanity = 0;
switch_mutex_lock((*rtp_session)->flag_mutex);
if (!rtp_session || !*rtp_session || !(*rtp_session)->ready) {
switch_mutex_unlock((*rtp_session)->flag_mutex);
return;
}
READ_INC((*rtp_session));
WRITE_INC((*rtp_session));
(*rtp_session)->ready = 0;
while((*rtp_session)->reading || (*rtp_session)->writing) {
switch_yield(10000);
if (++sanity > 1000) {
break;
}
}
READ_DEC((*rtp_session));
WRITE_DEC((*rtp_session));
switch_rtp_kill_socket(*rtp_session);
@ -1027,6 +1062,10 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
stfu_frame_t *jb_frame;
int ret = -1;
if (!switch_rtp_ready(rtp_session)) {
return -1;
}
if (!rtp_session->timer.interval) {
rtp_session->last_time = switch_time_now();
}
@ -1307,18 +1346,22 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
switch_yield(5000);
}
*payload_type = (switch_payload_t) rtp_session->recv_msg.header.pt;
if (switch_rtp_ready(rtp_session)) {
*payload_type = (switch_payload_t) rtp_session->recv_msg.header.pt;
if (*payload_type == SWITCH_RTP_CNG_PAYLOAD) {
*flags |= SFF_CNG;
if (*payload_type == SWITCH_RTP_CNG_PAYLOAD) {
*flags |= SFF_CNG;
}
if (bytes > 0) {
do_2833(rtp_session);
}
ret = (int) bytes;
} else {
ret = -1;
}
if (bytes > 0) {
do_2833(rtp_session);
}
ret = (int) bytes;
end:
READ_DEC(rtp_session);