Fix issue 367

This commit is contained in:
bossiel 2014-04-13 21:43:54 +00:00
parent 471e3369bb
commit 58458f0d4e
12 changed files with 193 additions and 94 deletions

View File

@ -399,6 +399,24 @@ static const transport_socket_xt* getSocket(transport_context_t *context, tnet_f
return ret;
}
static const transport_socket_xt* getSocketByStream(transport_context_t *context, void* cf_stream)
{
tsk_size_t i;
transport_socket_xt* ret = tsk_null;
if (context) {
tsk_safeobj_lock(context);
for(i=0; i<context->count; i++) {
if (context->sockets[i]->cf_read_stream == cf_stream || context->sockets[i]->cf_write_stream == cf_stream) {
ret = context->sockets[i];
break;
}
}
tsk_safeobj_unlock(context);
}
return ret;
}
/*== Add new socket ==*/
int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *transport, tsk_bool_t take_ownership, tsk_bool_t is_client)
@ -621,17 +639,24 @@ void __CFReadStreamClientCallBack(CFReadStreamRef stream, CFStreamEventType even
// Extract the native socket
CFDataRef data = CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle);
if(!data) goto bail;
CFSocketNativeHandle fd;
CFDataGetBytes(data, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*) &fd);
CFRelease(data);
transport_socket_xt *sock = (transport_socket_xt *) getSocket(context, fd);
if(!sock) goto bail;
transport_socket_xt *sock = tsk_null;
if(data){
CFSocketNativeHandle fd;
CFDataGetBytes(data, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*) &fd);
CFRelease(data);
sock = (transport_socket_xt *) getSocket(context, fd);
} else if (eventType == kCFStreamEventErrorOccurred) { // this event returns null data
sock = (transport_socket_xt *) getSocketByStream(context, stream);
}
if(!sock) {
goto bail;
}
switch(eventType) {
case kCFStreamEventOpenCompleted:
{
TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> kCFStreamEventOpenCompleted(fd=%d)", fd);
TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> kCFStreamEventOpenCompleted(fd=%d)", sock->fd);
sock->readable = tsk_true;
if(sock->writable){
TSK_RUNNABLE_ENQUEUE(transport, event_connected, transport->callback_data, sock->fd);
@ -640,8 +665,8 @@ void __CFReadStreamClientCallBack(CFReadStreamRef stream, CFStreamEventType even
}
case kCFStreamEventEndEncountered:
{
TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> kCFStreamEventEndEncountered(fd=%d)", fd);
TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, fd);
TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> kCFStreamEventEndEncountered(fd=%d)", sock->fd);
TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, sock->fd);
removeSocket(sock, context);
break;
}
@ -658,7 +683,7 @@ void __CFReadStreamClientCallBack(CFReadStreamRef stream, CFStreamEventType even
CFIndex index = CFErrorGetCode(error);
CFRelease(error);
TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> Error=%lu, fd=%d", index, fd);
TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> Error=%lu, fd=%d", index, sock->fd);
}
TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, sock->fd);
@ -688,24 +713,31 @@ void __CFWriteStreamClientCallBack(CFWriteStreamRef stream, CFStreamEventType ev
// Extract the native socket
CFDataRef data = CFWriteStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle);
if(!data) goto bail;
CFSocketNativeHandle fd;
CFDataGetBytes(data, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*) &fd);
CFRelease(data);
transport_socket_xt *sock = (transport_socket_xt *) getSocket(context, fd);
if(!sock) goto bail;
transport_socket_xt *sock = tsk_null;
if(data){
CFSocketNativeHandle fd;
CFDataGetBytes(data, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*) &fd);
CFRelease(data);
sock = (transport_socket_xt *) getSocket(context, fd);
} else if (eventType == kCFStreamEventErrorOccurred) { // this event returns null data
sock = (transport_socket_xt *) getSocketByStream(context, stream);
}
if(!sock) {
goto bail;
}
switch(eventType) {
case kCFStreamEventOpenCompleted:
{
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventOpenCompleted(fd=%d)", fd);
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventOpenCompleted(fd=%d)", sock->fd);
// still not connected, see kCFStreamEventCanAcceptBytes
break;
}
case kCFStreamEventCanAcceptBytes:
{
// To avoid blocking, call this function only if CFWriteStreamCanAcceptBytes returns true or after the streams client (set with CFWriteStreamSetClient) is notified of a kCFStreamEventCanAcceptBytes event.
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventCanAcceptBytes(fd=%d)", fd);
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventCanAcceptBytes(fd=%d)", sock->fd);
sock->writable = tsk_true;
if(sock->readable){
TSK_RUNNABLE_ENQUEUE(transport, event_connected, transport->callback_data, sock->fd);
@ -714,8 +746,8 @@ void __CFWriteStreamClientCallBack(CFWriteStreamRef stream, CFStreamEventType ev
}
case kCFStreamEventEndEncountered:
{
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventEndEncountered(fd=%d)", fd);
TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, fd);
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventEndEncountered(fd=%d)", sock->fd);
TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, sock->fd);
removeSocket(sock, context);
break;
}
@ -727,7 +759,7 @@ void __CFWriteStreamClientCallBack(CFWriteStreamRef stream, CFStreamEventType ev
CFIndex index = CFErrorGetCode(error);
CFRelease(error);
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> Error=%lu, fd=%d", index, fd);
TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> Error=%lu, fd=%d", index, sock->fd);
}
TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, sock->fd);
@ -893,8 +925,14 @@ int wrapSocket(tnet_transport_t *transport, transport_socket_xt *sock)
CFReadStreamScheduleWithRunLoop(sock->cf_read_stream, context->cf_run_loop, kCFRunLoopDefaultMode);
CFWriteStreamScheduleWithRunLoop(sock->cf_write_stream, context->cf_run_loop, kCFRunLoopDefaultMode);
CFReadStreamOpen(sock->cf_read_stream);
CFWriteStreamOpen(sock->cf_write_stream);
if (!CFReadStreamOpen(sock->cf_read_stream)) {
TSK_DEBUG_ERROR("CFReadStreamOpen(fd=%d) failed", sock->fd);
return -1;
}
if (!CFWriteStreamOpen(sock->cf_write_stream)) {
TSK_DEBUG_ERROR("CFWriteStreamOpen(fd=%d) failed", sock->fd);
return -1;
}
}
return 0;

View File

@ -212,7 +212,7 @@ int tsk_condwait_timedwait(tsk_condwait_handle_t* handle, uint64_t ms)
tsk_mutex_lock(condwait->mutex);
if((ret = pthread_cond_timedwait(condwait->pcond, (pthread_mutex_t*)condwait->mutex, &ts))){
if(ret == TIMED_OUT){
/* TSK_DEBUG_INFO("pthread_cond_timedwait function timedout: %d", ret); */
TSK_DEBUG_INFO("pthread_cond_timedwait function timedout: %d", ret);
}
else{
TSK_DEBUG_ERROR("pthread_cond_timedwait function failed: %d", ret);

View File

@ -50,6 +50,8 @@ typedef struct tsip_dialog_invite
tsk_bool_t refersub;
tsk_bool_t use_rtcp;
tsk_bool_t use_rtcpmux;
tsk_bool_t is_initial_iack_pending; // we're waiting for the initial incoming ACK (for the 200 OK) to ensure the session
tsk_bool_t is_cancelling; // whether we're cancelling the outgoing INVITE
uint32_t rseq;
tsip_timer_t timershutdown;

View File

@ -140,6 +140,8 @@ tsip_uri_t* tsip_transport_get_uri(const tsip_transport_t *self, int lr);
int tsip_transport_add_stream_peer_2(tsip_transport_t *self, tnet_fd_t local_fd, enum tnet_socket_type_e type, tsk_bool_t connected, const char* remote_host, tnet_port_t remote_port);
#define tsip_transport_add_stream_peer(self, local_fd, type, connected) tsip_transport_add_stream_peer_2((self), (local_fd), (type), (connected), tsk_null, 0)
#define tsip_transport_stream_peers_lock(self) tsk_list_lock((self)->stream_peers)
#define tsip_transport_stream_peers_unlock(self) tsk_list_unlock((self)->stream_peers)
tsip_transport_stream_peer_t* tsip_transport_find_stream_peer_by_local_fd(tsip_transport_t *self, tnet_fd_t local_fd);
tsip_transport_stream_peer_t* tsip_transport_pop_stream_peer_by_local_fd(tsip_transport_t *self, tnet_fd_t local_fd);
tsip_transport_stream_peer_t* tsip_transport_find_stream_peer_by_remote_ip(tsip_transport_t *self, const char* remote_ip, tnet_port_t remote_port, enum tnet_socket_type_e type);

View File

@ -1084,7 +1084,7 @@ int tsip_dialog_init(tsip_dialog_t *self, tsip_dialog_type_t type, const char* c
}
self->state = tsip_initial;
self->type = type;
self->type = type;
self->id = ++unique_id;
self->connected_fd = TNET_INVALID_FD;
if(!self->record_routes){

View File

@ -443,6 +443,11 @@ int tsip_dialog_invite_process_ro(tsip_dialog_invite_t *self, const tsip_message
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
if (self->is_cancelling) {
TSK_DEBUG_INFO("Cancelling the INVITE...ignore the incoming SDP");
return 0;
}
/* Parse SDP content */
if(TSIP_MESSAGE_HAS_CONTENT(message)){
@ -582,6 +587,9 @@ int x0000_Connected_2_Connected_X_iACK(va_list *app)
int ret;
// Nothing to do (in future will be used to ensure the session)
/* No longer waiting for the initial ACK */
self->is_initial_iack_pending = tsk_false;
/* Process remote offer */
if((ret = tsip_dialog_invite_process_ro(self, rACK))){

View File

@ -309,6 +309,8 @@ int c0000_Outgoing_2_Cancelling_X_oCANCEL(va_list *app)
{
tsip_dialog_invite_t *self = va_arg(*app, tsip_dialog_invite_t *);
self->is_cancelling = tsk_true;
/* Alert the user */
TSIP_DIALOG_SIGNAL(self, tsip_event_code_dialog_terminating, "Terminating dialog");

View File

@ -204,7 +204,7 @@ static tsk_bool_t _fsm_cond_prack_match(tsip_dialog_invite_t* self, tsip_message
static tsk_bool_t _fsm_cond_negociates_preconditions(tsip_dialog_invite_t* self, tsip_message_t* rPRACK)
{
//tsip_message_supported(self->last_iInvite, "precondition") || tsip_message_required(self->last_iInvite, "precondition")
if(tsip_message_required(self->last_iInvite, "precondition") || (self->msession_mgr && self->msession_mgr->qos.type == tmedia_qos_strength_mandatory)){
if(tsip_message_required(self->last_iInvite, "precondition") || (self->msession_mgr && self->msession_mgr->qos.strength == tmedia_qos_strength_mandatory)){
return tsk_true;
}
return tsk_false;
@ -219,6 +219,13 @@ static tsk_bool_t _fsm_cond_cannotresume(tsip_dialog_invite_t* self, tsip_messag
}
}
static tsk_bool_t _fsm_cond_initial_iack_pending(tsip_dialog_invite_t* self, tsip_message_t* rACK)
{
return self->is_initial_iack_pending;
}
/* Init FSM */
int tsip_dialog_invite_server_init(tsip_dialog_invite_t *self)
{
@ -265,6 +272,12 @@ int tsip_dialog_invite_server_init(tsip_dialog_invite_t *self)
TSK_FSM_ADD_ALWAYS(_fsm_state_Ringing, _fsm_action_reject, _fsm_state_Terminated, s0000_Ringing_2_Terminated_X_Reject, "s0000_Ringing_2_Terminated_X_Reject"),
// Ringing ->(iCANCEL) -> Terminated
TSK_FSM_ADD_ALWAYS(_fsm_state_Ringing, _fsm_action_iCANCEL, _fsm_state_Terminated, s0000_Ringing_2_Terminated_X_iCANCEL, "s0000_Ringing_2_Terminated_X_iCANCEL"),
/*=======================
* === FRESH CONNECTED ===
*/
// Fresh Connected [ACK is pending] ->(iCANCEL) -> Terminated
TSK_FSM_ADD(_fsm_state_Connected, _fsm_action_iCANCEL, _fsm_cond_initial_iack_pending, _fsm_state_Terminated, s0000_Ringing_2_Terminated_X_iCANCEL, "s0000_FreshConnected_2_Terminated_X_iCANCEL"),
/*=======================
* === ANY ===
@ -617,6 +630,9 @@ int s0000_Ringing_2_Connected_X_Accept(va_list *app)
/* send 2xx OK */
ret = send_RESPONSE(self, self->last_iInvite, 200, "OK", tsk_true);
/* say we're waiting for the incoming ACK */
self->is_initial_iack_pending = tsk_true;
/* do not start the session until we get the ACK message
* http://code.google.com/p/doubango/issues/detail?id=157

View File

@ -118,14 +118,14 @@ tsip_dialog_t* tsip_dialog_layer_find_by_callid(tsip_dialog_layer_t *self, const
else{
tsip_dialog_t *dialog = tsk_null;
tsk_list_item_t *item;
tsk_safeobj_lock(self);
//--tsk_safeobj_lock(self);
tsk_list_foreach(item, self->dialogs){
if(tsk_striequals(TSIP_DIALOG(item->data)->callid, callid)){
dialog = tsk_object_ref(item->data);
break;
}
}
tsk_safeobj_unlock(self);
//--tsk_safeobj_unlock(self);
return dialog;
}
}
@ -302,32 +302,38 @@ done:
return -1;
}
static void* TSK_STDCALL _tsip_dialog_signal_transport_error_async(void* dialog)
{
tsip_dialog_signal_transport_error(TSIP_DIALOG(dialog));
return tsk_null;
}
int tsip_dialog_layer_signal_stack_disconnected(tsip_dialog_layer_t *self)
{
tsk_list_item_t *item;
int dialogs_count;
// use copy for lock-free code and faster code. also fix issue 172 (https://code.google.com/p/idoubs/issues/detail?id=172)
tsip_dialogs_L_t *dialogs_copy;
tsip_dialog_t *dialog;
if(!self){
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
if (!(dialogs_copy = tsk_list_create())) {
TSK_DEBUG_ERROR("Failed to create list");
return -1;
}
tsk_safeobj_lock(self);
dialogs_count = tsk_list_count(self->dialogs, tsk_null, tsk_null);
again:
tsk_list_foreach(item, self->dialogs){
if(item->data){
// if "tsip_dialog_signal_transport_error()" removes the dialog, then
// "self->dialogs" will became unsafe while looping
tsip_dialog_signal_transport_error(TSIP_DIALOG(item->data));
if(--dialogs_count <= 0){ // guard against endless loops
break;
}
goto again;
}
}
tsk_safeobj_unlock(self);
tsk_list_pushback_list(dialogs_copy, self->dialogs);
tsk_safeobj_unlock(self);
tsk_list_foreach(item, dialogs_copy){
if((dialog = TSIP_DIALOG(item->data))){
tsip_dialog_signal_transport_error(dialog);
}
}
TSK_OBJECT_SAFE_FREE(dialogs_copy);
return 0;
}
@ -340,8 +346,10 @@ int tsip_dialog_layer_signal_peer_disconnected(tsip_dialog_layer_t *self, const
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
//!\ must not lock the entire layer
tsk_safeobj_lock(self);
// tsk_safeobj_lock(self);
tsk_list_lock(peer->dialogs_cids);
tsk_list_foreach(item, peer->dialogs_cids){
@ -354,10 +362,9 @@ int tsip_dialog_layer_signal_peer_disconnected(tsip_dialog_layer_t *self, const
TSK_DEBUG_WARN("Stream peer holds call-id='%s' but the dialog layer doesn't know it", TSK_STRING_STR(item->data));
}
}
tsk_list_unlock(peer->dialogs_cids);
tsk_safeobj_unlock(self);
// tsk_safeobj_unlock(self);
return 0;
}
@ -565,7 +572,7 @@ int tsip_dialog_layer_handle_incoming_msg(const tsip_dialog_layer_t *self, tsip_
TSIP_MESSAGE(message)->update = tsk_true; // update AoR and Via
if((dst = tsip_transac_dst_net_create(TSIP_STACK(self->stack)))){
if((transac = tsip_transac_layer_new(self->stack->layer_transac, isCT, message, dst))){
if((transac = tsip_transac_layer_new(self->stack->layer_transac, isCT, message, dst))){
ret = tsip_transac_start(transac, message);
TSK_OBJECT_SAFE_FREE(transac);
}

View File

@ -120,7 +120,7 @@ tsip_transport_t* tsip_transport_create(tsip_stack_t* stack, const char* host, t
return transport;
}
/* add Via header using the transport config
/* add Via header using the transport config
must be called after update_aor()
*/
int tsip_transport_addvia(const tsip_transport_t* self, const char *branch, tsip_message_t *msg)
@ -366,14 +366,26 @@ tsk_size_t tsip_transport_send_raw(const tsip_transport_t* self, const char* dst
tnet_fd_t fd;
TSK_DEBUG_INFO("Cannot find peer with remote IP/Port=%s/%d, connecting to the destination...", dst_ip, dst_port);
// connect to the destination
if((fd = tsip_transport_connectto_2(TSIP_TRANSPORT(self), dst_ip, dst_port)) == TNET_INVALID_FD){
// stream with the new "fd" will be added later, make sure that no other thread (e.g. network callback) will manipulate the peers
tsip_transport_stream_peers_lock(TSIP_TRANSPORT(self));
if((fd = tnet_transport_connectto_2(TSIP_TRANSPORT(self)->net_transport, dst_ip, dst_port)) == TNET_INVALID_FD){
TSK_DEBUG_ERROR("Failed to connect to %s/%d", dst_ip, dst_port);
tsip_transport_stream_peers_unlock(TSIP_TRANSPORT(self));
return 0;
}
// only clients will have connected fd == EVAL. For servers, it will be equal to master's fd
// connected fd value will be set to EVAL when "disconnected" event is received
if (TSIP_TRANSPORT(self)->connectedFD == TNET_INVALID_FD) {
TSIP_TRANSPORT(self)->connectedFD = fd;
}
if(tsip_transport_add_stream_peer_2(TSIP_TRANSPORT(self), fd, self->type, tsk_false, dst_ip, dst_port) != 0){
TSK_DEBUG_ERROR("Failed to add stream peer local fd = %d, remote IP/Port=%s/%d", fd, dst_ip, dst_port);
tsip_transport_stream_peers_unlock(TSIP_TRANSPORT(self));
return 0;
}
tsip_transport_stream_peers_unlock(TSIP_TRANSPORT(self));
// retrieve the peer
if(!(peer = tsip_transport_find_stream_peer_by_local_fd(TSIP_TRANSPORT(self), fd))){
TSK_DEBUG_INFO("Cannot find peer with remote IP/Port=%s/%d. Cancel data sending", dst_ip, dst_port);
@ -632,7 +644,7 @@ int tsip_transport_add_stream_peer_2(tsip_transport_t *self, tnet_fd_t local_fd,
return -1;
}
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
if(tsip_transport_have_stream_peer_with_local_fd(self, local_fd)){
// could happen if the closed socket haven't raise "close event" yet and new own added : Windows only
@ -665,13 +677,13 @@ int tsip_transport_add_stream_peer_2(tsip_transport_t *self, tnet_fd_t local_fd,
peer->remote_port = remote_port;
memcpy(peer->remote_ip, remote_ip, sizeof(remote_ip));
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
peer->time_latest_activity = tsk_time_now();
peer->time_added = peer->time_latest_activity;
tsk_list_push_back_data(self->stream_peers, (void**)&peer);
++self->stream_peers_count;
TSK_DEBUG_INFO("#%d peers in the '%s' transport", self->stream_peers_count, tsip_transport_get_description(self));
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
// Cleanup streams
if (self->stream_peers_count > TSIP_TRANSPORT_STREAM_PEERS_COUNT_BEFORE_CHECKING_TIMEOUT && self->stack->network.mode == tsip_stack_mode_webrtc2sip) {
@ -680,7 +692,7 @@ int tsip_transport_add_stream_peer_2(tsip_transport_t *self, tnet_fd_t local_fd,
bail:
TSK_OBJECT_SAFE_FREE(peer);
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
return ret;
}
@ -695,14 +707,14 @@ tsip_transport_stream_peer_t* tsip_transport_find_stream_peer_by_local_fd(tsip_t
return tsk_null;
}
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
tsk_list_foreach(item, self->stream_peers){
if(((tsip_transport_stream_peer_t*)item->data)->local_fd == local_fd){
peer = tsk_object_ref(item->data);
break;
}
}
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
return peer;
}
@ -713,14 +725,14 @@ tsip_transport_stream_peer_t* tsip_transport_pop_stream_peer_by_local_fd(tsip_tr
if(self){
tsip_transport_stream_peer_t* peer = tsk_null;
tsk_list_item_t *item;
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
if((item = tsk_list_pop_item_by_pred(self->stream_peers, _pred_find_stream_peer_by_local_fd, &local_fd))){
peer = tsk_object_ref(item->data);
TSK_OBJECT_SAFE_FREE(item);
--self->stream_peers_count;
TSK_DEBUG_INFO("#%d peers in the '%s' transport", self->stream_peers_count, tsip_transport_get_description(self));
}
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
return peer;
}
return tsk_null;
@ -737,14 +749,14 @@ tsip_transport_stream_peer_t* tsip_transport_find_stream_peer_by_remote_ip(tsip_
return tsk_null;
}
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
tsk_list_foreach(item, self->stream_peers){
if(((tsip_transport_stream_peer_t*)item->data)->type == type && ((tsip_transport_stream_peer_t*)item->data)->remote_port == remote_port && tsk_striequals(((tsip_transport_stream_peer_t*)item->data)->remote_ip, remote_ip)){
peer = tsk_object_ref(item->data);
break;
}
}
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
return peer;
}
@ -774,12 +786,12 @@ int tsip_transport_remove_stream_peer_by_local_fd(tsip_transport_t *self, tnet_f
TSK_DEBUG_ERROR("Invalid parameter");
return -1;
}
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
if (tsk_list_remove_item_by_pred(self->stream_peers, _pred_find_stream_peer_by_local_fd, &local_fd)) {
--self->stream_peers_count;
TSK_DEBUG_INFO("#%d peers in the '%s' transport", self->stream_peers_count, tsip_transport_get_description(self));
}
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
return 0;
}
@ -793,14 +805,14 @@ int tsip_transport_remove_callid_from_stream_peers(tsip_transport_t *self, const
*removed = tsk_false;
if(TNET_SOCKET_TYPE_IS_STREAM(self->type)){
tsk_list_item_t *item;
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
tsk_list_foreach(item, self->stream_peers){
if(tsip_transport_stream_peer_remove_callid((tsip_transport_stream_peer_t*)item->data, callid, removed) == 0 && *removed){
TSK_DEBUG_INFO("[Transport] Removed call-id = '%s' from transport with type = %d", callid, self->type);
break;
}
}
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
}
return 0;
@ -832,7 +844,7 @@ int tsip_transport_stream_peer_add_callid(tsip_transport_stream_peer_t* self, co
tsk_string_t* cid = tsk_string_create(callid);
if(cid){
TSK_DEBUG_INFO("Add call-id = '%s' to peer with local fd = %d", callid, self->local_fd);
tsk_list_push_back_data(self->dialogs_cids, &cid);
tsk_list_push_back_data(self->dialogs_cids, (void**)&cid);
TSK_OBJECT_SAFE_FREE(cid);
}
}
@ -870,7 +882,7 @@ int tsip_transport_stream_peers_cleanup(tsip_transport_t *self)
tnet_fd_t fd;
tsk_bool_t close;
uint64_t now = tsk_time_now();
tsk_list_lock(self->stream_peers);
tsip_transport_stream_peers_lock(self);
tsk_list_foreach(item, self->stream_peers) {
if ((peer = (item->data))) {
close = ((now - TSIP_TRANSPORT_STREAM_PEER_TIMEOUT) > peer->time_latest_activity);
@ -890,7 +902,7 @@ int tsip_transport_stream_peers_cleanup(tsip_transport_t *self)
}
}
}
tsk_list_unlock(self->stream_peers);
tsip_transport_stream_peers_unlock(self);
}
return 0;
@ -934,6 +946,9 @@ int tsip_transport_init(tsip_transport_t* self, tnet_socket_type_t type, const s
/* Stream buffer */
self->stream_peers = tsk_list_create();
if (!self->stream_peers) {
return -1;
}
}
else{
if(TNET_SOCKET_TYPE_IS_DTLS(type)){
@ -962,7 +977,7 @@ int tsip_transport_deinit(tsip_transport_t* self)
TSK_OBJECT_SAFE_FREE(self->net_transport);
TSK_OBJECT_SAFE_FREE(self->stream_peers);
self->initialized = 0;
return 0;
}
@ -1035,13 +1050,18 @@ static tsk_object_t* tsip_transport_stream_peer_ctor(tsk_object_t * self, va_lis
{
tsip_transport_stream_peer_t *peer = self;
if(peer){
peer->rcv_buff_stream = tsk_buffer_create_null();
peer->snd_buff_stream = tsk_buffer_create_null();
peer->dialogs_cids = tsk_list_create();
if (!(peer->rcv_buff_stream = tsk_buffer_create_null())) {
return tsk_null;
}
if (!(peer->snd_buff_stream = tsk_buffer_create_null())) {
return tsk_null;
}
if (!(peer->dialogs_cids = tsk_list_create())){
return tsk_null;
}
}
return self;
}
static tsk_object_t* tsip_transport_stream_peer_dtor(tsk_object_t * self)
{
tsip_transport_stream_peer_t *peer = self;
@ -1059,7 +1079,6 @@ static tsk_object_t* tsip_transport_stream_peer_dtor(tsk_object_t * self)
}
return self;
}
static int tsip_transport_stream_peer_cmp(const tsk_object_t *obj1, const tsk_object_t *obj2)
{
const tsip_transport_stream_peer_t *peer1 = obj1;
@ -1069,7 +1088,6 @@ static int tsip_transport_stream_peer_cmp(const tsk_object_t *obj1, const tsk_ob
}
return -1;
}
static const tsk_object_def_t tsip_transport_stream_peer_def_s =
{
sizeof(tsip_transport_stream_peer_t),
@ -1077,4 +1095,4 @@ static const tsk_object_def_t tsip_transport_stream_peer_def_s =
tsip_transport_stream_peer_dtor,
tsip_transport_stream_peer_cmp,
};
const tsk_object_def_t *tsip_transport_stream_peer_def_t = &tsip_transport_stream_peer_def_s;
const tsk_object_def_t *tsip_transport_stream_peer_def_t = &tsip_transport_stream_peer_def_s;

View File

@ -148,8 +148,17 @@ static int tsip_transport_layer_stream_cb(const tnet_transport_event_t* e)
{
tsip_transport_stream_peer_t* peer;
TSK_DEBUG_INFO("Stream Peer closed - %d", e->local_fd);
if(transport->connectedFD == e->local_fd){
TSK_DEBUG_INFO("SIP socket closed");
// signal "peer disconnected" before "stack disconnected"
if((peer = tsip_transport_pop_stream_peer_by_local_fd(transport, e->local_fd))){
tsip_dialog_layer_signal_peer_disconnected(TSIP_STACK(transport->stack)->layer_dialog, peer);
TSK_OBJECT_SAFE_FREE(peer);
}
else {
TSK_DEBUG_INFO("Closed peer with fd=%d not registered yet", e->local_fd);
}
// connectedFD== master's fd for servers
if(transport->connectedFD == e->local_fd || transport->connectedFD == TNET_INVALID_FD){
TSK_DEBUG_INFO("SIP 'connectedFD' (%d) closed", transport->connectedFD);
if(transport->stack){
tsip_event_t* e;
// signal to all dialogs that transport error raised
@ -159,10 +168,7 @@ static int tsip_transport_layer_stream_cb(const tnet_transport_event_t* e)
TSK_RUNNABLE_ENQUEUE_OBJECT(TSK_RUNNABLE(transport->stack), e);
}
}
}
if((peer = tsip_transport_pop_stream_peer_by_local_fd(transport, e->local_fd))){
tsip_dialog_layer_signal_peer_disconnected(TSIP_STACK(transport->stack)->layer_dialog, peer);
TSK_OBJECT_SAFE_FREE(peer);
transport->connectedFD = TNET_INVALID_FD;
}
return 0;
}
@ -1261,7 +1267,6 @@ int tsip_transport_layer_start(tsip_transport_layer_t* self)
/* connect() */
tsk_list_foreach(item, self->transports){
tnet_fd_t fd = TNET_INVALID_FD;
transport = item->data;
// set callback
@ -1284,25 +1289,25 @@ int tsip_transport_layer_start(tsip_transport_layer_t* self)
if(!TSIP_STACK_MODE_IS_SERVER(transport->stack)){
// Between "tsip_transport_connectto_2()" and "tsip_transport_add_stream_peer_2()" the net callback could be called and
// off cource peer will not be found in the list. This is why the list is locked.
tsk_list_lock(transport->stream_peers);
if((fd = tsip_transport_connectto_2(transport, self->stack->network.proxy_cscf[transport_idx], self->stack->network.proxy_cscf_port[transport_idx])) == TNET_INVALID_FD){
tsip_transport_stream_peers_lock(transport);
if((transport->connectedFD = tsip_transport_connectto_2(transport, self->stack->network.proxy_cscf[transport_idx], self->stack->network.proxy_cscf_port[transport_idx])) == TNET_INVALID_FD){
TSK_DEBUG_ERROR("Failed to connect the SIP transport");
tsk_list_unlock(transport->stream_peers);
tsip_transport_stream_peers_unlock(transport);
return -3;
}
TSK_DEBUG_INFO("SIP transport fd=%d", fd);
TSK_DEBUG_INFO("SIP transport fd=%d", transport->connectedFD);
// store peer
tsip_transport_add_stream_peer_2(transport, fd, transport->type, tsk_false, self->stack->network.proxy_cscf[transport_idx], self->stack->network.proxy_cscf_port[transport_idx]);
tsk_list_unlock(transport->stream_peers);
tsip_transport_add_stream_peer_2(transport, transport->connectedFD, transport->type, tsk_false, self->stack->network.proxy_cscf[transport_idx], self->stack->network.proxy_cscf_port[transport_idx]);
tsip_transport_stream_peers_unlock(transport);
// give the socket chance to connect
if((ret = tnet_sockfd_waitUntilWritable(fd, TSIP_CONNECT_TIMEOUT)) || (ret = tnet_sockfd_waitUntilReadable(fd, TSIP_CONNECT_TIMEOUT))){
if((ret = tnet_sockfd_waitUntilWritable(transport->connectedFD, TSIP_CONNECT_TIMEOUT)) || (ret = tnet_sockfd_waitUntilReadable(transport->connectedFD, TSIP_CONNECT_TIMEOUT))){
TSK_DEBUG_INFO("%d milliseconds elapsed and the socket is still not connected.", TSIP_CONNECT_TIMEOUT);
// dot not exit, store the outgoing data until connection succeed
}
}
transport->connectedFD = fd;
}
// set connectedFD=master for servers
if(transport->connectedFD == TNET_INVALID_FD){
transport->connectedFD = tnet_transport_get_master_fd(transport->net_transport);
}

View File

@ -117,7 +117,7 @@ int __tsip_ssession_set(tsip_ssession_t *self, va_list *app)
{
tsip_ssession_param_type_t sscurr;
tsip_msession_param_type_t mscurr;
tmedia_session_mgr_t* mgr;
tmedia_session_mgr_t* mgr = tsk_null;
int ret = 0;
@ -126,8 +126,6 @@ int __tsip_ssession_set(tsip_ssession_t *self, va_list *app)
return -1;
}
mgr = tsip_session_get_mediamgr(self);
while((sscurr = va_arg(*app, tsip_ssession_param_type_t)) != sstype_null){
switch(sscurr){
//=======
@ -271,6 +269,9 @@ int __tsip_ssession_set(tsip_ssession_t *self, va_list *app)
//=========
// Media
//=========
if (!mgr) {
mgr = tsip_session_get_mediamgr(self);
}
while((mscurr = va_arg(*app, tsip_msession_param_type_t)) != mstype_null){
switch(mscurr){
case mstype_set_profile: