diff --git a/src/include/switch_msrp.h b/src/include/switch_msrp.h index 7e23325862..b16efb18c9 100644 --- a/src/include/switch_msrp.h +++ b/src/include/switch_msrp.h @@ -91,14 +91,17 @@ typedef struct msrp_socket_s { int secure; } msrp_socket_t; -typedef struct msrp_client_socket_s { +struct msrp_client_socket_s { switch_socket_t *sock; int secure; -} msrp_client_socket_t; + int client_mode; + struct switch_msrp_session_s *msrp_session; +}; -typedef struct { +struct switch_msrp_session_s{ switch_memory_pool_t *pool; int secure; + int active; char *remote_path; char *remote_accept_types; char *remote_accept_wrapped_types; @@ -117,20 +120,30 @@ typedef struct { switch_size_t msrp_msg_buffer_size; switch_size_t msrp_msg_count; msrp_socket_t *msock; - msrp_client_socket_t *csock; + struct msrp_client_socket_s *csock; switch_frame_t frame; uint8_t frame_data[SWITCH_RTP_MAX_BUF_LEN]; -} switch_msrp_session_t; + int running; + void *user_data; +}; + +typedef struct msrp_client_socket_s msrp_client_socket_t; +typedef struct switch_msrp_session_s switch_msrp_session_t; SWITCH_DECLARE(switch_status_t) switch_msrp_init(void); SWITCH_DECLARE(switch_status_t) switch_msrp_destroy(void); -SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, switch_bool_t secure); +SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, const char *call_id, switch_bool_t secure); SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_t **ms); // switch_status_t switch_msrp_session_push_msg(switch_msrp_session_t *ms, msrp_msg_t *msg); SWITCH_DECLARE(switch_msrp_msg_t *)switch_msrp_session_pop_msg(switch_msrp_session_t *ms); -SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp_msg_t *msg); +SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, msrp_msg_t *msg, const char *file, const char *func, int line); +SWITCH_DECLARE(switch_status_t) switch_msrp_start_client(switch_msrp_session_t *msrp_session); +SWITCH_DECLARE(const char *) switch_msrp_listen_ip(void); SWITCH_DECLARE(void) switch_msrp_load_apis_and_applications(switch_loadable_module_interface_t **moudle_interface); + +#define switch_msrp_send(ms, msg) switch_msrp_perform_send(ms, msg, __FILE__, __SWITCH_FUNC__, __LINE__) + #endif /* For Emacs: diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index 12f086e709..c2e8a2517d 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -972,9 +972,13 @@ static switch_status_t sofia_read_text_frame(switch_core_session_t *session, swi rframe->data = msrp_session->frame_data; rframe->buflen = sizeof(msrp_session->frame_data); - if (msrp_msg && msrp_msg->method == MSRP_METHOD_SEND && !switch_stristr("?OTRv3?", msrp_msg->payload) && - (switch_stristr("text/plain", msrp_msg->payload) || - switch_stristr("text/html", msrp_msg->payload))) { + if (msrp_msg && msrp_msg->method == MSRP_METHOD_SEND && + !switch_stristr("?OTRv3?", msrp_msg->payload) && + !switch_stristr("application/im-iscomposing", msrp_msg->payload) && + msrp_msg->headers[MSRP_H_CONTENT_TYPE] && + (switch_stristr("text/plain", msrp_msg->headers[MSRP_H_CONTENT_TYPE]) || + switch_stristr("text/html", msrp_msg->headers[MSRP_H_CONTENT_TYPE]) || + switch_stristr("message/cpim", msrp_msg->headers[MSRP_H_CONTENT_TYPE]))) { rframe->datalen = msrp_msg->payload_bytes; rframe->packetlen = msrp_msg->payload_bytes; memcpy(rframe->data, msrp_msg->payload, msrp_msg->payload_bytes); @@ -983,14 +987,13 @@ static switch_status_t sofia_read_text_frame(switch_core_session_t *session, swi *frame = rframe; - if (msrp_msg->headers[MSRP_H_CONTENT_TYPE] && !strcasecmp(msrp_msg->headers[MSRP_H_CONTENT_TYPE], "message/cpim")) { + if (msrp_msg->headers[MSRP_H_CONTENT_TYPE] && !strcasecmp(msrp_msg->headers[MSRP_H_CONTENT_TYPE], "message/cpim")) { char *stripped_text = switch_html_strip((char *)rframe->data); memcpy(rframe->data, stripped_text, strlen(stripped_text)+1); rframe->datalen = strlen(stripped_text)+1; free(stripped_text); } - switch_safe_free(msrp_msg); msrp_msg = NULL; status = SWITCH_STATUS_SUCCESS; diff --git a/src/switch_core_media.c b/src/switch_core_media.c index df179c8dd2..da406d3ab7 100644 --- a/src/switch_core_media.c +++ b/src/switch_core_media.c @@ -4282,13 +4282,12 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s if (got_msrp && m->m_type == sdp_media_message) { if (!smh->msrp_session) { - smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), m->m_proto == sdp_proto_msrps); + smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), switch_core_session_get_uuid(session), m->m_proto == sdp_proto_msrps); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created %s\n", smh->msrp_session->call_id); } switch_assert(smh->msrp_session); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created\n"); - for (attr = m->m_attributes; attr; attr = attr->a_next) { // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[%s]=[%s]\n", attr->a_name, attr->a_value); if (!strcasecmp(attr->a_name, "path") && attr->a_value) { @@ -4303,6 +4302,9 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s } else if (!strcasecmp(attr->a_name, "setup") && attr->a_value) { smh->msrp_session->remote_setup = switch_core_session_strdup(session, attr->a_value); switch_channel_set_variable(session->channel, "sip_msrp_remote_setup", attr->a_value); + if (!strcmp(attr->a_value, "passive")) { + smh->msrp_session->active = 1; + } } else if (!strcasecmp(attr->a_name, "file-selector") && attr->a_value) { char *tmp = switch_mprintf("%s", attr->a_value); char *argv[4] = { 0 }; @@ -4353,7 +4355,6 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s } } - smh->msrp_session->call_id = switch_core_session_get_uuid(session); smh->msrp_session->local_accept_types = smh->msrp_session->remote_accept_types; smh->msrp_session->local_accept_wrapped_types = smh->msrp_session->remote_accept_types; smh->msrp_session->local_setup = smh->msrp_session->remote_setup; @@ -4362,9 +4363,22 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s switch_channel_set_flag(session->channel, CF_TEXT_POSSIBLE); switch_channel_set_flag(session->channel, CF_TEXT_LINE_BASED); switch_channel_set_flag(session->channel, CF_MSRP); + if (m->m_proto == sdp_proto_msrps) { switch_channel_set_flag(session->channel, CF_MSRPS); } + + if (smh->msrp_session->active) { + const char *ip = switch_msrp_listen_ip(); + + smh->msrp_session->local_path = switch_core_session_sprintf(session, + "msrp%s://%s:%d/%s;tcp", + smh->msrp_session->secure ? "s" : "", + ip, smh->msrp_session->local_port, smh->msrp_session->call_id); + + switch_msrp_start_client(smh->msrp_session); + } + switch_core_session_start_text_thread(session); tmatch = 1; } @@ -10276,14 +10290,10 @@ SWITCH_DECLARE(void) switch_core_media_gen_local_sdp(switch_core_session_t *sess } if (want_msrp || want_msrps) { - smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), want_msrps); + smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), switch_core_session_get_uuid(session), want_msrps); switch_assert(smh->msrp_session); - - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created\n"); - - smh->msrp_session->call_id = switch_core_session_get_uuid(session); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created %s\n", smh->msrp_session->call_id); switch_channel_set_flag(session->channel, CF_HAS_TEXT); switch_channel_set_flag(session->channel, CF_TEXT_POSSIBLE); @@ -10313,15 +10323,22 @@ SWITCH_DECLARE(void) switch_core_media_gen_local_sdp(switch_core_session_t *sess "a=path:%s\n" "a=accept-types:%s\n" "a=accept-wrapped-types:%s\n" - "a=setup:passive\n", + "a=setup:%s\n", msrp_session->local_port, msrp_session->secure ? "TLS/" : "", msrp_session->local_path, msrp_session->local_accept_types, - msrp_session->local_accept_wrapped_types); + msrp_session->local_accept_wrapped_types, + msrp_session->active ? "active" : "passive"); } else { char *uuid = switch_core_session_get_uuid(session); const char *file_selector = switch_channel_get_variable(session->channel, "sip_msrp_local_file_selector"); + const char *msrp_offer_active = switch_channel_get_variable(session->channel, "sip_msrp_offer_active"); + + if (switch_true(msrp_offer_active)) { + msrp_session->active = 1; + // switch_msrp_start_client(msrp_session); + } if (zstr(msrp_session->local_path)) { msrp_session->local_path = switch_core_session_sprintf(session, @@ -10335,10 +10352,11 @@ SWITCH_DECLARE(void) switch_core_media_gen_local_sdp(switch_core_session_t *sess "a=path:%s\n" "a=accept-types:message/cpim text/* application/im-iscomposing+xml\n" "a=accept-wrapped-types:*\n" - "a=setup:passive\n", + "a=setup:%s\n", msrp_session->local_port, msrp_session->secure ? "TLS/" : "", - msrp_session->local_path); + msrp_session->local_path, + msrp_session->active ? "active" : "passive"); if (!zstr(file_selector)) { switch_snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), diff --git a/src/switch_msrp.c b/src/switch_msrp.c index 870ec58555..643b2b918d 100644 --- a/src/switch_msrp.c +++ b/src/switch_msrp.c @@ -60,6 +60,7 @@ typedef struct worker_helper{ int debug; switch_memory_pool_t *pool; msrp_client_socket_t csock; + switch_msrp_session_t *msrp_session; } worker_helper_t; static void msrp_deinit_ssl() @@ -234,6 +235,11 @@ sock_fail: return rv; } +SWITCH_DECLARE(const char *) switch_msrp_listen_ip() +{ + return globals.ip; +} + SWITCH_DECLARE(switch_status_t) switch_msrp_init() { switch_memory_pool_t *pool; @@ -308,7 +314,7 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_destroy() return st; } -SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, switch_bool_t secure) +SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, const char *call_id, switch_bool_t secure) { switch_msrp_session_t *ms; ms = switch_core_alloc(pool, sizeof(switch_msrp_session_t)); @@ -317,12 +323,32 @@ SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_poo ms->secure = secure; ms->local_port = secure ? globals.msock_ssl.port : globals.msock.port; ms->msrp_msg_buffer_size = globals.message_buffer_size; + ms->call_id = switch_core_strdup(pool, call_id); switch_mutex_init(&ms->mutex, SWITCH_MUTEX_NESTED, pool); return ms; } SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_t **ms) { + int sanity = 500; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroying MSRP session %s\n", (*ms)->call_id); + + switch_mutex_lock((*ms)->mutex); + + if ((*ms)->csock && (*ms)->csock->sock) { + close_socket(&(*ms)->csock->sock); + } + + switch_mutex_unlock((*ms)->mutex); + + switch_yield(20000); + + while(sanity-- > 0 && (*ms)->running) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "waiting MSRP worker %s\n", (*ms)->call_id); + switch_yield(20000); + } + switch_mutex_destroy((*ms)->mutex); ms = NULL; return SWITCH_STATUS_SUCCESS; @@ -455,7 +481,7 @@ void dump_buffer(char *buf, switch_size_t len, int line) } buff[j] = '\0'; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d:%" SWITCH_SIZE_T_FMT "DUMP:%s:DUMP\n", line, len, buff); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d: [%" SWITCH_SIZE_T_FMT "] ::DUMP::%s::DUMP::\n", line, len, buff); } char *find_delim(char *buf, int len, char *delim) @@ -766,7 +792,7 @@ msrp_msg_t *msrp_parse_buffer(char *buf, int len, msrp_msg_t *msrp_msg, switch_m int dlen = strlen(msrp_msg->delimiter); if (msrp_msg->payload_bytes > len) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Waiting payload...%d < %d\n", (int)msrp_msg->payload_bytes, (int)len); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Waiting payload... %d < %d\n", (int)msrp_msg->payload_bytes, (int)len); return msrp_msg; /*keep waiting ?*/ } @@ -789,8 +815,9 @@ msrp_msg_t *msrp_parse_buffer(char *buf, int len, msrp_msg_t *msrp_msg, switch_m return msrp_msg; } } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error here! code:%d\n", msrp_msg->state); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error here! code:%d\n", msrp_msg->state); } + return msrp_msg; } @@ -859,106 +886,197 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj) switch_status_t status; msrp_msg_t *msrp_msg = NULL; char uuid[128] = { 0 }; - switch_core_session_t *session = NULL; switch_msrp_session_t *msrp_session = NULL; - switch_channel_t *channel = NULL; int sanity = 10; SSL *ssl = NULL; + int client_mode = helper->csock.client_mode; - switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_NODELAY, TRUE); - // switch_socket_opt_set(csock->sock, SWITCH_SO_NONBLOCK, TRUE); + if (client_mode) { + switch_sockaddr_t *sa = NULL; + switch_msrp_msg_t setup_msg = { 0 }; + const char *remote_ip = NULL; + switch_port_t remote_port = 0; + char *dup = NULL; + char *p = NULL; - if (csock->secure) { // tls? - int secure_established = 0; - int sanity = 10; - switch_os_socket_t sockdes = SWITCH_SOCK_INVALID; + switch_assert(helper->msrp_session); + msrp_session = helper->msrp_session; + msrp_session->running = 1; - switch_os_sock_get(&sockdes, csock->sock); - // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "socket: %d\n", sockdes); - switch_assert(sockdes != SWITCH_SOCK_INVALID); + switch_assert(msrp_session->remote_path); + dup = switch_core_strdup(pool, msrp_session->remote_path); + switch_assert(dup); - ssl = SSL_new(globals.ssl_ctx); - assert(ssl); - globals.ssl = ssl; + p = (char *)switch_stristr("msrp://", dup); - SSL_set_fd(ssl, sockdes); + if (p) { + p += 7; + } else { + p = (char *)switch_stristr("msrps://", dup); - do { - int code = SSL_accept(ssl); + if (p) p+= 8; + } - if (code == 1) { - secure_established = 1; - goto done; + if (p) { + remote_ip = p; + + p = (char *)switch_stristr(":", p); + + if (p) { + *p++ = '\0'; + remote_port = atoi(p); } + } - if (code == 0) { - goto err; - } + if (!remote_ip || remote_port <= 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error get remote MSRP ip:port from path: [%s]\n", msrp_session->remote_path); + } - if (code < 0) { - if (code == -1 && SSL_get_error(ssl, code) != SSL_ERROR_WANT_READ) { + if (switch_sockaddr_info_get(&sa, remote_ip, SWITCH_UNSPEC, remote_port, 0, pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error!\n"); + goto end; + } + + if (switch_socket_create(&csock->sock, switch_sockaddr_get_family(sa), + SOCK_STREAM, SWITCH_PROTO_TCP, pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error!\n"); + goto end; + } + + switch_socket_opt_set(csock->sock, SWITCH_SO_KEEPALIVE, 1); + switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_NODELAY, 1); + // switch_socket_opt_set(csock->sock, SWITCH_SO_NONBLOCK, TRUE); + switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_KEEPIDLE, 30); + switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_KEEPINTVL, 30); + switch_socket_timeout_set(csock->sock, 3000000); // abort connection 3 seconds than forever + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "MSRP %s Connecting to %s\n", msrp_session->call_id, msrp_session->remote_path); + + if ((switch_socket_connect(csock->sock, sa)) != SWITCH_STATUS_SUCCESS) { + char errbuf[512] = {0}; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error: %s\n", switch_strerror(errno, errbuf, sizeof(errbuf))); + goto end; + } + + switch_socket_timeout_set(csock->sock, -1); + + if (msrp_session->secure) { + // todo setup tls ... + } + + helper->msrp_session->csock = csock; + + setup_msg.headers[MSRP_H_CONTENT_TYPE] = "text/plain"; + setup_msg.payload = NULL; + setup_msg.payload_bytes = 0; + + if (SWITCH_STATUS_SUCCESS != switch_msrp_send(msrp_session, &setup_msg)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MSRP initial setup send error!\n"); + goto end; + } + } else { // server mode + switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_NODELAY, TRUE); + // switch_socket_opt_set(csock->sock, SWITCH_SO_NONBLOCK, TRUE); + + if (csock->secure) { // tls? + int secure_established = 0; + int sanity = 10; + switch_os_socket_t sockdes = SWITCH_SOCK_INVALID; + + switch_os_sock_get(&sockdes, csock->sock); + // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "socket: %d\n", sockdes); + switch_assert(sockdes != SWITCH_SOCK_INVALID); + + ssl = SSL_new(globals.ssl_ctx); + assert(ssl); + globals.ssl = ssl; + + SSL_set_fd(ssl, sockdes); + + do { + int code = SSL_accept(ssl); + + if (code == 1) { + secure_established = 1; + goto done; + } + + if (code == 0) { goto err; } - } - } while(sanity--); - err: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SSL ERR\n"); - goto end; + if (code < 0) { + if (code == -1 && SSL_get_error(ssl, code) != SSL_ERROR_WANT_READ) { + goto err; + } + } + } while(sanity--); - done: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SSL established = %d\n", secure_established); - } + err: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SSL ERR\n"); + goto end; - len = MSRP_BUFF_SIZE; - status = msrp_socket_recv(csock, buf, &len); - - if (helper->debug) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "status:%d, len:%" SWITCH_SIZE_T_FMT "\n", status, len); - } - - if (status == SWITCH_STATUS_SUCCESS) { - msrp_msg = msrp_parse_buffer(buf, len, NULL, pool); - switch_assert(msrp_msg); - } else { - goto end; - } - - if (helper->debug) { - msrp_msg_serialize(msrp_msg, buf); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s\n", buf); - } - - if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_SEND) { - msrp_reply(csock, msrp_msg); - if (msrp_msg->headers[MSRP_H_SUCCESS_REPORT] && - !strcmp(msrp_msg->headers[MSRP_H_SUCCESS_REPORT], "yes")) { - msrp_report(csock, msrp_msg, "200 OK"); + done: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SSL established = %d\n", secure_established); } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse initial message error!\n"); - goto end; - } - if (msrp_find_uuid(uuid, msrp_msg->headers[MSRP_H_TO_PATH]) != SWITCH_TRUE) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid MSRP to-path!\n"); - goto end; - } + len = MSRP_BUFF_SIZE; + status = msrp_socket_recv(csock, buf, &len); - while (sanity-- && !(session = switch_core_session_locate(uuid))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "waiting for session\n"); - switch_yield(1000000); - } + if (helper->debug) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "status:%d, len:%" SWITCH_SIZE_T_FMT "\n", status, len); + } - if(!session) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No such session %s\n", uuid); - goto end; - } + if (status == SWITCH_STATUS_SUCCESS) { + msrp_msg = msrp_parse_buffer(buf, len, NULL, pool); + switch_assert(msrp_msg); + } else { + goto end; + } - channel = switch_core_session_get_channel(session); - msrp_session = switch_core_media_get_msrp_session(session); - switch_assert(msrp_session); - msrp_session->csock = csock; + if (helper->debug) { + msrp_msg_serialize(msrp_msg, buf); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s\n", buf); + } + + if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_SEND) { + msrp_reply(csock, msrp_msg); + if (msrp_msg->headers[MSRP_H_SUCCESS_REPORT] && + !strcmp(msrp_msg->headers[MSRP_H_SUCCESS_REPORT], "yes")) { + msrp_report(csock, msrp_msg, "200 OK"); + } + } else if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_AUTH) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MSRP_METHOD_AUTH\n"); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse initial message error!\n"); + goto end; + } + + if (msrp_find_uuid(uuid, msrp_msg->headers[MSRP_H_TO_PATH]) != SWITCH_TRUE) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid MSRP to-path!\n"); + } + + { + switch_core_session_t *session = NULL; + + while (sanity-- && !(session = switch_core_session_locate(uuid))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "waiting for session\n"); + switch_yield(1000000); + } + + if(!session) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No such session %s\n", uuid); + goto end; + } + + msrp_session = switch_core_media_get_msrp_session(session); + switch_assert(msrp_session); + msrp_session->csock = csock; + msrp_session->running = 1; + + switch_core_session_rwunlock(session); + } + } len = MSRP_BUFF_SIZE; p = buf; @@ -981,12 +1099,11 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj) } if (helper->debug) { - // char msg_buf[MSRP_BUFF_SIZE * 2]; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "state:%d, len:%" SWITCH_SIZE_T_FMT " payload_bytes:%" SWITCH_SIZE_T_FMT "\n", msrp_msg->state, len, msrp_msg->payload_bytes); // { // char bbb[MSRP_BUFF_SIZE * 2]; // msrp_msg_serialize(msrp_msg_tmp, bbb), - // + // } } if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_SEND) { @@ -1007,7 +1124,7 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj) } while (msrp_session && msrp_session->msrp_msg_count > msrp_session->msrp_msg_buffer_size) { - if (!switch_channel_ready(channel)) break; + if (!msrp_session->running) break; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s reading too fast, relax...\n", uuid); switch_yield(100000); } @@ -1054,6 +1171,7 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj) msrp_msg->transaction_id = switch_core_strdup(pool, msrp_msg_old->transaction_id); msrp_msg->delimiter = switch_core_strdup(pool, msrp_msg_old->delimiter); msrp_msg->last_header = msrp_msg_old->last_header; + for (i = 0; i < msrp_msg->last_header; i++) { msrp_msg->headers[i] = switch_core_strdup(pool, msrp_msg_old->headers[i]); } @@ -1078,15 +1196,18 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj) len = MSRP_BUFF_SIZE; last_p = buf; } - if (!switch_channel_ready(channel)) break; + if (!msrp_session->running) break; } end: - if (session) switch_core_session_rwunlock(session); + switch_mutex_lock(msrp_session->mutex); + close_socket(&csock->sock); + switch_mutex_unlock(msrp_session->mutex); - switch_socket_close(csock->sock); - switch_core_destroy_memory_pool(&pool); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "msrp worker %s down\n", uuid); + if (!client_mode) switch_core_destroy_memory_pool(&pool); + + msrp_session->running = 0; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP worker down %s\n", msrp_session->call_id); return NULL; } @@ -1114,7 +1235,18 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj worker_helper_t *helper; if (globals.debug > 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Connection Open%s\n", msock->secure ? " SSL" : ""); + switch_sockaddr_t *addr = NULL; + char remote_ip[128]; + + /* Get the remote address/port info */ + switch_socket_addr_get(&addr, SWITCH_TRUE, sock); + + if (addr) { + switch_get_addr(remote_ip, sizeof(remote_ip), addr); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Connection Open%s from %s:%d\n", msock->secure ? " SSL" : "", remote_ip, switch_sockaddr_get_port(addr)); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error get remote addr!\n"); + } } if (switch_core_new_memory_pool(&worker_pool) != SWITCH_STATUS_SUCCESS) { @@ -1122,7 +1254,7 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj return NULL; } - switch_zmalloc(helper, sizeof(worker_helper_t)); + helper = switch_core_alloc(worker_pool, sizeof(worker_helper_t)); switch_assert(helper != NULL); helper->pool = worker_pool; @@ -1134,7 +1266,7 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, msrp_worker, helper, worker_pool); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "msrp worker new thread spawned!\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP worker new thread spawned!\n"); } if (pool) switch_core_destroy_memory_pool(&pool); @@ -1144,13 +1276,38 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj return NULL; } +SWITCH_DECLARE(switch_status_t) switch_msrp_start_client(switch_msrp_session_t *msrp_session) +{ + worker_helper_t *helper; + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + + helper = switch_core_alloc(msrp_session->pool, sizeof(worker_helper_t)); + + switch_assert(helper != NULL); + helper->pool = msrp_session->pool; + helper->debug = globals.debug; + helper->csock.sock = NULL; // client mode + helper->csock.secure = msrp_session->secure; + helper->csock.client_mode = 1; + helper->msrp_session = msrp_session; + + switch_threadattr_create(&thd_attr, helper->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, msrp_worker, helper, helper->pool); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP new worker client started! %s\n", msrp_session->call_id); + + return SWITCH_STATUS_SUCCESS; +} + void random_string(char *buf, uint16_t size) { switch_stun_random_string(buf, size, NULL); } #define MSRP_TRANS_ID_LEN 16 -SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp_msg_t *msrp_msg) +SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, msrp_msg_t *msrp_msg, const char *file, const char *func, int line) { char transaction_id[MSRP_TRANS_ID_LEN + 1] = { 0 }; char buf[MSRP_BUFF_SIZE]; @@ -1158,8 +1315,13 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp char *to_path = msrp_msg->headers[MSRP_H_TO_PATH] ? msrp_msg->headers[MSRP_H_TO_PATH] : ms->remote_path; char *from_path = msrp_msg->headers[MSRP_H_FROM_PATH] ? msrp_msg->headers[MSRP_H_FROM_PATH] : ms->local_path; + if (!ms->running) { + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Discard one message\n"); + return SWITCH_STATUS_SUCCESS; + } + if (!from_path) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "NO FROM PATH\n"); + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "NO FROM PATH\n"); return SWITCH_STATUS_SUCCESS; } @@ -1180,7 +1342,7 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp if (msrp_msg->payload) { if (len + msrp_msg->payload_bytes >= MSRP_BUFF_SIZE) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "payload too large! %" SWITCH_SIZE_T_FMT "\n", len + msrp_msg->payload_bytes); + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_ERROR, "payload too large! %" SWITCH_SIZE_T_FMT "\n", len + msrp_msg->payload_bytes); return SWITCH_STATUS_FALSE; } memcpy(buf + len, msrp_msg->payload, msrp_msg->payload_bytes); @@ -1188,7 +1350,7 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp } sprintf(buf + len, "\r\n-------%s$\r\n", transaction_id); len += (12 + strlen(transaction_id)); - if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "---------------------send: %" SWITCH_SIZE_T_FMT " bytes\n%s\n", len, buf); + if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "---------------------send: %" SWITCH_SIZE_T_FMT " bytes---------------------\n%s\n", len, buf); return ms->csock ? msrp_socket_send(ms->csock, buf, &len) : SWITCH_STATUS_FALSE; }