FS-9903 WIP MSRP client mode support

This commit is contained in:
Seven Du 2017-01-02 10:24:53 +08:00
parent 0248d38a28
commit 7e24a79580
4 changed files with 318 additions and 122 deletions

View File

@ -91,14 +91,17 @@ typedef struct msrp_socket_s {
int secure;
} msrp_socket_t;
typedef struct msrp_client_socket_s {
struct msrp_client_socket_s {
switch_socket_t *sock;
int secure;
} msrp_client_socket_t;
int client_mode;
struct switch_msrp_session_s *msrp_session;
};
typedef struct {
struct switch_msrp_session_s{
switch_memory_pool_t *pool;
int secure;
int active;
char *remote_path;
char *remote_accept_types;
char *remote_accept_wrapped_types;
@ -117,20 +120,30 @@ typedef struct {
switch_size_t msrp_msg_buffer_size;
switch_size_t msrp_msg_count;
msrp_socket_t *msock;
msrp_client_socket_t *csock;
struct msrp_client_socket_s *csock;
switch_frame_t frame;
uint8_t frame_data[SWITCH_RTP_MAX_BUF_LEN];
} switch_msrp_session_t;
int running;
void *user_data;
};
typedef struct msrp_client_socket_s msrp_client_socket_t;
typedef struct switch_msrp_session_s switch_msrp_session_t;
SWITCH_DECLARE(switch_status_t) switch_msrp_init(void);
SWITCH_DECLARE(switch_status_t) switch_msrp_destroy(void);
SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, switch_bool_t secure);
SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, const char *call_id, switch_bool_t secure);
SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_t **ms);
// switch_status_t switch_msrp_session_push_msg(switch_msrp_session_t *ms, msrp_msg_t *msg);
SWITCH_DECLARE(switch_msrp_msg_t *)switch_msrp_session_pop_msg(switch_msrp_session_t *ms);
SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp_msg_t *msg);
SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, msrp_msg_t *msg, const char *file, const char *func, int line);
SWITCH_DECLARE(switch_status_t) switch_msrp_start_client(switch_msrp_session_t *msrp_session);
SWITCH_DECLARE(const char *) switch_msrp_listen_ip(void);
SWITCH_DECLARE(void) switch_msrp_load_apis_and_applications(switch_loadable_module_interface_t **moudle_interface);
#define switch_msrp_send(ms, msg) switch_msrp_perform_send(ms, msg, __FILE__, __SWITCH_FUNC__, __LINE__)
#endif
/* For Emacs:

View File

@ -972,9 +972,13 @@ static switch_status_t sofia_read_text_frame(switch_core_session_t *session, swi
rframe->data = msrp_session->frame_data;
rframe->buflen = sizeof(msrp_session->frame_data);
if (msrp_msg && msrp_msg->method == MSRP_METHOD_SEND && !switch_stristr("?OTRv3?", msrp_msg->payload) &&
(switch_stristr("text/plain", msrp_msg->payload) ||
switch_stristr("text/html", msrp_msg->payload))) {
if (msrp_msg && msrp_msg->method == MSRP_METHOD_SEND &&
!switch_stristr("?OTRv3?", msrp_msg->payload) &&
!switch_stristr("application/im-iscomposing", msrp_msg->payload) &&
msrp_msg->headers[MSRP_H_CONTENT_TYPE] &&
(switch_stristr("text/plain", msrp_msg->headers[MSRP_H_CONTENT_TYPE]) ||
switch_stristr("text/html", msrp_msg->headers[MSRP_H_CONTENT_TYPE]) ||
switch_stristr("message/cpim", msrp_msg->headers[MSRP_H_CONTENT_TYPE]))) {
rframe->datalen = msrp_msg->payload_bytes;
rframe->packetlen = msrp_msg->payload_bytes;
memcpy(rframe->data, msrp_msg->payload, msrp_msg->payload_bytes);
@ -983,14 +987,13 @@ static switch_status_t sofia_read_text_frame(switch_core_session_t *session, swi
*frame = rframe;
if (msrp_msg->headers[MSRP_H_CONTENT_TYPE] && !strcasecmp(msrp_msg->headers[MSRP_H_CONTENT_TYPE], "message/cpim")) {
if (msrp_msg->headers[MSRP_H_CONTENT_TYPE] && !strcasecmp(msrp_msg->headers[MSRP_H_CONTENT_TYPE], "message/cpim")) {
char *stripped_text = switch_html_strip((char *)rframe->data);
memcpy(rframe->data, stripped_text, strlen(stripped_text)+1);
rframe->datalen = strlen(stripped_text)+1;
free(stripped_text);
}
switch_safe_free(msrp_msg);
msrp_msg = NULL;
status = SWITCH_STATUS_SUCCESS;

View File

@ -4282,13 +4282,12 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s
if (got_msrp && m->m_type == sdp_media_message) {
if (!smh->msrp_session) {
smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), m->m_proto == sdp_proto_msrps);
smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), switch_core_session_get_uuid(session), m->m_proto == sdp_proto_msrps);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created %s\n", smh->msrp_session->call_id);
}
switch_assert(smh->msrp_session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created\n");
for (attr = m->m_attributes; attr; attr = attr->a_next) {
// switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[%s]=[%s]\n", attr->a_name, attr->a_value);
if (!strcasecmp(attr->a_name, "path") && attr->a_value) {
@ -4303,6 +4302,9 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s
} else if (!strcasecmp(attr->a_name, "setup") && attr->a_value) {
smh->msrp_session->remote_setup = switch_core_session_strdup(session, attr->a_value);
switch_channel_set_variable(session->channel, "sip_msrp_remote_setup", attr->a_value);
if (!strcmp(attr->a_value, "passive")) {
smh->msrp_session->active = 1;
}
} else if (!strcasecmp(attr->a_name, "file-selector") && attr->a_value) {
char *tmp = switch_mprintf("%s", attr->a_value);
char *argv[4] = { 0 };
@ -4353,7 +4355,6 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s
}
}
smh->msrp_session->call_id = switch_core_session_get_uuid(session);
smh->msrp_session->local_accept_types = smh->msrp_session->remote_accept_types;
smh->msrp_session->local_accept_wrapped_types = smh->msrp_session->remote_accept_types;
smh->msrp_session->local_setup = smh->msrp_session->remote_setup;
@ -4362,9 +4363,22 @@ SWITCH_DECLARE(uint8_t) switch_core_media_negotiate_sdp(switch_core_session_t *s
switch_channel_set_flag(session->channel, CF_TEXT_POSSIBLE);
switch_channel_set_flag(session->channel, CF_TEXT_LINE_BASED);
switch_channel_set_flag(session->channel, CF_MSRP);
if (m->m_proto == sdp_proto_msrps) {
switch_channel_set_flag(session->channel, CF_MSRPS);
}
if (smh->msrp_session->active) {
const char *ip = switch_msrp_listen_ip();
smh->msrp_session->local_path = switch_core_session_sprintf(session,
"msrp%s://%s:%d/%s;tcp",
smh->msrp_session->secure ? "s" : "",
ip, smh->msrp_session->local_port, smh->msrp_session->call_id);
switch_msrp_start_client(smh->msrp_session);
}
switch_core_session_start_text_thread(session);
tmatch = 1;
}
@ -10276,14 +10290,10 @@ SWITCH_DECLARE(void) switch_core_media_gen_local_sdp(switch_core_session_t *sess
}
if (want_msrp || want_msrps) {
smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), want_msrps);
smh->msrp_session = switch_msrp_session_new(switch_core_session_get_pool(session), switch_core_session_get_uuid(session), want_msrps);
switch_assert(smh->msrp_session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created\n");
smh->msrp_session->call_id = switch_core_session_get_uuid(session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP session created %s\n", smh->msrp_session->call_id);
switch_channel_set_flag(session->channel, CF_HAS_TEXT);
switch_channel_set_flag(session->channel, CF_TEXT_POSSIBLE);
@ -10313,15 +10323,22 @@ SWITCH_DECLARE(void) switch_core_media_gen_local_sdp(switch_core_session_t *sess
"a=path:%s\n"
"a=accept-types:%s\n"
"a=accept-wrapped-types:%s\n"
"a=setup:passive\n",
"a=setup:%s\n",
msrp_session->local_port,
msrp_session->secure ? "TLS/" : "",
msrp_session->local_path,
msrp_session->local_accept_types,
msrp_session->local_accept_wrapped_types);
msrp_session->local_accept_wrapped_types,
msrp_session->active ? "active" : "passive");
} else {
char *uuid = switch_core_session_get_uuid(session);
const char *file_selector = switch_channel_get_variable(session->channel, "sip_msrp_local_file_selector");
const char *msrp_offer_active = switch_channel_get_variable(session->channel, "sip_msrp_offer_active");
if (switch_true(msrp_offer_active)) {
msrp_session->active = 1;
// switch_msrp_start_client(msrp_session);
}
if (zstr(msrp_session->local_path)) {
msrp_session->local_path = switch_core_session_sprintf(session,
@ -10335,10 +10352,11 @@ SWITCH_DECLARE(void) switch_core_media_gen_local_sdp(switch_core_session_t *sess
"a=path:%s\n"
"a=accept-types:message/cpim text/* application/im-iscomposing+xml\n"
"a=accept-wrapped-types:*\n"
"a=setup:passive\n",
"a=setup:%s\n",
msrp_session->local_port,
msrp_session->secure ? "TLS/" : "",
msrp_session->local_path);
msrp_session->local_path,
msrp_session->active ? "active" : "passive");
if (!zstr(file_selector)) {
switch_snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf),

View File

@ -60,6 +60,7 @@ typedef struct worker_helper{
int debug;
switch_memory_pool_t *pool;
msrp_client_socket_t csock;
switch_msrp_session_t *msrp_session;
} worker_helper_t;
static void msrp_deinit_ssl()
@ -234,6 +235,11 @@ sock_fail:
return rv;
}
SWITCH_DECLARE(const char *) switch_msrp_listen_ip()
{
return globals.ip;
}
SWITCH_DECLARE(switch_status_t) switch_msrp_init()
{
switch_memory_pool_t *pool;
@ -308,7 +314,7 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_destroy()
return st;
}
SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, switch_bool_t secure)
SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_pool_t *pool, const char *call_id, switch_bool_t secure)
{
switch_msrp_session_t *ms;
ms = switch_core_alloc(pool, sizeof(switch_msrp_session_t));
@ -317,12 +323,32 @@ SWITCH_DECLARE(switch_msrp_session_t *)switch_msrp_session_new(switch_memory_poo
ms->secure = secure;
ms->local_port = secure ? globals.msock_ssl.port : globals.msock.port;
ms->msrp_msg_buffer_size = globals.message_buffer_size;
ms->call_id = switch_core_strdup(pool, call_id);
switch_mutex_init(&ms->mutex, SWITCH_MUTEX_NESTED, pool);
return ms;
}
SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_t **ms)
{
int sanity = 500;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroying MSRP session %s\n", (*ms)->call_id);
switch_mutex_lock((*ms)->mutex);
if ((*ms)->csock && (*ms)->csock->sock) {
close_socket(&(*ms)->csock->sock);
}
switch_mutex_unlock((*ms)->mutex);
switch_yield(20000);
while(sanity-- > 0 && (*ms)->running) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "waiting MSRP worker %s\n", (*ms)->call_id);
switch_yield(20000);
}
switch_mutex_destroy((*ms)->mutex);
ms = NULL;
return SWITCH_STATUS_SUCCESS;
@ -455,7 +481,7 @@ void dump_buffer(char *buf, switch_size_t len, int line)
}
buff[j] = '\0';
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d:%" SWITCH_SIZE_T_FMT "DUMP:%s:DUMP\n", line, len, buff);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d: [%" SWITCH_SIZE_T_FMT "] ::DUMP::%s::DUMP::\n", line, len, buff);
}
char *find_delim(char *buf, int len, char *delim)
@ -766,7 +792,7 @@ msrp_msg_t *msrp_parse_buffer(char *buf, int len, msrp_msg_t *msrp_msg, switch_m
int dlen = strlen(msrp_msg->delimiter);
if (msrp_msg->payload_bytes > len) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Waiting payload...%d < %d\n", (int)msrp_msg->payload_bytes, (int)len);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Waiting payload... %d < %d\n", (int)msrp_msg->payload_bytes, (int)len);
return msrp_msg; /*keep waiting ?*/
}
@ -789,8 +815,9 @@ msrp_msg_t *msrp_parse_buffer(char *buf, int len, msrp_msg_t *msrp_msg, switch_m
return msrp_msg;
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error here! code:%d\n", msrp_msg->state);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error here! code:%d\n", msrp_msg->state);
}
return msrp_msg;
}
@ -859,106 +886,197 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj)
switch_status_t status;
msrp_msg_t *msrp_msg = NULL;
char uuid[128] = { 0 };
switch_core_session_t *session = NULL;
switch_msrp_session_t *msrp_session = NULL;
switch_channel_t *channel = NULL;
int sanity = 10;
SSL *ssl = NULL;
int client_mode = helper->csock.client_mode;
switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_NODELAY, TRUE);
// switch_socket_opt_set(csock->sock, SWITCH_SO_NONBLOCK, TRUE);
if (client_mode) {
switch_sockaddr_t *sa = NULL;
switch_msrp_msg_t setup_msg = { 0 };
const char *remote_ip = NULL;
switch_port_t remote_port = 0;
char *dup = NULL;
char *p = NULL;
if (csock->secure) { // tls?
int secure_established = 0;
int sanity = 10;
switch_os_socket_t sockdes = SWITCH_SOCK_INVALID;
switch_assert(helper->msrp_session);
msrp_session = helper->msrp_session;
msrp_session->running = 1;
switch_os_sock_get(&sockdes, csock->sock);
// switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "socket: %d\n", sockdes);
switch_assert(sockdes != SWITCH_SOCK_INVALID);
switch_assert(msrp_session->remote_path);
dup = switch_core_strdup(pool, msrp_session->remote_path);
switch_assert(dup);
ssl = SSL_new(globals.ssl_ctx);
assert(ssl);
globals.ssl = ssl;
p = (char *)switch_stristr("msrp://", dup);
SSL_set_fd(ssl, sockdes);
if (p) {
p += 7;
} else {
p = (char *)switch_stristr("msrps://", dup);
do {
int code = SSL_accept(ssl);
if (p) p+= 8;
}
if (code == 1) {
secure_established = 1;
goto done;
if (p) {
remote_ip = p;
p = (char *)switch_stristr(":", p);
if (p) {
*p++ = '\0';
remote_port = atoi(p);
}
}
if (code == 0) {
goto err;
}
if (!remote_ip || remote_port <= 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error get remote MSRP ip:port from path: [%s]\n", msrp_session->remote_path);
}
if (code < 0) {
if (code == -1 && SSL_get_error(ssl, code) != SSL_ERROR_WANT_READ) {
if (switch_sockaddr_info_get(&sa, remote_ip, SWITCH_UNSPEC, remote_port, 0, pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error!\n");
goto end;
}
if (switch_socket_create(&csock->sock, switch_sockaddr_get_family(sa),
SOCK_STREAM, SWITCH_PROTO_TCP, pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error!\n");
goto end;
}
switch_socket_opt_set(csock->sock, SWITCH_SO_KEEPALIVE, 1);
switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_NODELAY, 1);
// switch_socket_opt_set(csock->sock, SWITCH_SO_NONBLOCK, TRUE);
switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_KEEPIDLE, 30);
switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_KEEPINTVL, 30);
switch_socket_timeout_set(csock->sock, 3000000); // abort connection 3 seconds than forever
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "MSRP %s Connecting to %s\n", msrp_session->call_id, msrp_session->remote_path);
if ((switch_socket_connect(csock->sock, sa)) != SWITCH_STATUS_SUCCESS) {
char errbuf[512] = {0};
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error: %s\n", switch_strerror(errno, errbuf, sizeof(errbuf)));
goto end;
}
switch_socket_timeout_set(csock->sock, -1);
if (msrp_session->secure) {
// todo setup tls ...
}
helper->msrp_session->csock = csock;
setup_msg.headers[MSRP_H_CONTENT_TYPE] = "text/plain";
setup_msg.payload = NULL;
setup_msg.payload_bytes = 0;
if (SWITCH_STATUS_SUCCESS != switch_msrp_send(msrp_session, &setup_msg)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MSRP initial setup send error!\n");
goto end;
}
} else { // server mode
switch_socket_opt_set(csock->sock, SWITCH_SO_TCP_NODELAY, TRUE);
// switch_socket_opt_set(csock->sock, SWITCH_SO_NONBLOCK, TRUE);
if (csock->secure) { // tls?
int secure_established = 0;
int sanity = 10;
switch_os_socket_t sockdes = SWITCH_SOCK_INVALID;
switch_os_sock_get(&sockdes, csock->sock);
// switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "socket: %d\n", sockdes);
switch_assert(sockdes != SWITCH_SOCK_INVALID);
ssl = SSL_new(globals.ssl_ctx);
assert(ssl);
globals.ssl = ssl;
SSL_set_fd(ssl, sockdes);
do {
int code = SSL_accept(ssl);
if (code == 1) {
secure_established = 1;
goto done;
}
if (code == 0) {
goto err;
}
}
} while(sanity--);
err:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SSL ERR\n");
goto end;
if (code < 0) {
if (code == -1 && SSL_get_error(ssl, code) != SSL_ERROR_WANT_READ) {
goto err;
}
}
} while(sanity--);
done:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SSL established = %d\n", secure_established);
}
err:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SSL ERR\n");
goto end;
len = MSRP_BUFF_SIZE;
status = msrp_socket_recv(csock, buf, &len);
if (helper->debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "status:%d, len:%" SWITCH_SIZE_T_FMT "\n", status, len);
}
if (status == SWITCH_STATUS_SUCCESS) {
msrp_msg = msrp_parse_buffer(buf, len, NULL, pool);
switch_assert(msrp_msg);
} else {
goto end;
}
if (helper->debug) {
msrp_msg_serialize(msrp_msg, buf);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s\n", buf);
}
if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_SEND) {
msrp_reply(csock, msrp_msg);
if (msrp_msg->headers[MSRP_H_SUCCESS_REPORT] &&
!strcmp(msrp_msg->headers[MSRP_H_SUCCESS_REPORT], "yes")) {
msrp_report(csock, msrp_msg, "200 OK");
done:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SSL established = %d\n", secure_established);
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse initial message error!\n");
goto end;
}
if (msrp_find_uuid(uuid, msrp_msg->headers[MSRP_H_TO_PATH]) != SWITCH_TRUE) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid MSRP to-path!\n");
goto end;
}
len = MSRP_BUFF_SIZE;
status = msrp_socket_recv(csock, buf, &len);
while (sanity-- && !(session = switch_core_session_locate(uuid))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "waiting for session\n");
switch_yield(1000000);
}
if (helper->debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "status:%d, len:%" SWITCH_SIZE_T_FMT "\n", status, len);
}
if(!session) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No such session %s\n", uuid);
goto end;
}
if (status == SWITCH_STATUS_SUCCESS) {
msrp_msg = msrp_parse_buffer(buf, len, NULL, pool);
switch_assert(msrp_msg);
} else {
goto end;
}
channel = switch_core_session_get_channel(session);
msrp_session = switch_core_media_get_msrp_session(session);
switch_assert(msrp_session);
msrp_session->csock = csock;
if (helper->debug) {
msrp_msg_serialize(msrp_msg, buf);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s\n", buf);
}
if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_SEND) {
msrp_reply(csock, msrp_msg);
if (msrp_msg->headers[MSRP_H_SUCCESS_REPORT] &&
!strcmp(msrp_msg->headers[MSRP_H_SUCCESS_REPORT], "yes")) {
msrp_report(csock, msrp_msg, "200 OK");
}
} else if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_AUTH) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "MSRP_METHOD_AUTH\n");
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse initial message error!\n");
goto end;
}
if (msrp_find_uuid(uuid, msrp_msg->headers[MSRP_H_TO_PATH]) != SWITCH_TRUE) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid MSRP to-path!\n");
}
{
switch_core_session_t *session = NULL;
while (sanity-- && !(session = switch_core_session_locate(uuid))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "waiting for session\n");
switch_yield(1000000);
}
if(!session) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No such session %s\n", uuid);
goto end;
}
msrp_session = switch_core_media_get_msrp_session(session);
switch_assert(msrp_session);
msrp_session->csock = csock;
msrp_session->running = 1;
switch_core_session_rwunlock(session);
}
}
len = MSRP_BUFF_SIZE;
p = buf;
@ -981,12 +1099,11 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj)
}
if (helper->debug) {
// char msg_buf[MSRP_BUFF_SIZE * 2];
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "state:%d, len:%" SWITCH_SIZE_T_FMT " payload_bytes:%" SWITCH_SIZE_T_FMT "\n", msrp_msg->state, len, msrp_msg->payload_bytes);
// {
// char bbb[MSRP_BUFF_SIZE * 2];
// msrp_msg_serialize(msrp_msg_tmp, bbb),
//
// }
}
if (msrp_msg->state == MSRP_ST_DONE && msrp_msg->method == MSRP_METHOD_SEND) {
@ -1007,7 +1124,7 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj)
}
while (msrp_session && msrp_session->msrp_msg_count > msrp_session->msrp_msg_buffer_size) {
if (!switch_channel_ready(channel)) break;
if (!msrp_session->running) break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s reading too fast, relax...\n", uuid);
switch_yield(100000);
}
@ -1054,6 +1171,7 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj)
msrp_msg->transaction_id = switch_core_strdup(pool, msrp_msg_old->transaction_id);
msrp_msg->delimiter = switch_core_strdup(pool, msrp_msg_old->delimiter);
msrp_msg->last_header = msrp_msg_old->last_header;
for (i = 0; i < msrp_msg->last_header; i++) {
msrp_msg->headers[i] = switch_core_strdup(pool, msrp_msg_old->headers[i]);
}
@ -1078,15 +1196,18 @@ static void *SWITCH_THREAD_FUNC msrp_worker(switch_thread_t *thread, void *obj)
len = MSRP_BUFF_SIZE;
last_p = buf;
}
if (!switch_channel_ready(channel)) break;
if (!msrp_session->running) break;
}
end:
if (session) switch_core_session_rwunlock(session);
switch_mutex_lock(msrp_session->mutex);
close_socket(&csock->sock);
switch_mutex_unlock(msrp_session->mutex);
switch_socket_close(csock->sock);
switch_core_destroy_memory_pool(&pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "msrp worker %s down\n", uuid);
if (!client_mode) switch_core_destroy_memory_pool(&pool);
msrp_session->running = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP worker down %s\n", msrp_session->call_id);
return NULL;
}
@ -1114,7 +1235,18 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj
worker_helper_t *helper;
if (globals.debug > 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Connection Open%s\n", msock->secure ? " SSL" : "");
switch_sockaddr_t *addr = NULL;
char remote_ip[128];
/* Get the remote address/port info */
switch_socket_addr_get(&addr, SWITCH_TRUE, sock);
if (addr) {
switch_get_addr(remote_ip, sizeof(remote_ip), addr);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Connection Open%s from %s:%d\n", msock->secure ? " SSL" : "", remote_ip, switch_sockaddr_get_port(addr));
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error get remote addr!\n");
}
}
if (switch_core_new_memory_pool(&worker_pool) != SWITCH_STATUS_SUCCESS) {
@ -1122,7 +1254,7 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj
return NULL;
}
switch_zmalloc(helper, sizeof(worker_helper_t));
helper = switch_core_alloc(worker_pool, sizeof(worker_helper_t));
switch_assert(helper != NULL);
helper->pool = worker_pool;
@ -1134,7 +1266,7 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, msrp_worker, helper, worker_pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "msrp worker new thread spawned!\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP worker new thread spawned!\n");
}
if (pool) switch_core_destroy_memory_pool(&pool);
@ -1144,13 +1276,38 @@ static void *SWITCH_THREAD_FUNC msrp_listener(switch_thread_t *thread, void *obj
return NULL;
}
SWITCH_DECLARE(switch_status_t) switch_msrp_start_client(switch_msrp_session_t *msrp_session)
{
worker_helper_t *helper;
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
helper = switch_core_alloc(msrp_session->pool, sizeof(worker_helper_t));
switch_assert(helper != NULL);
helper->pool = msrp_session->pool;
helper->debug = globals.debug;
helper->csock.sock = NULL; // client mode
helper->csock.secure = msrp_session->secure;
helper->csock.client_mode = 1;
helper->msrp_session = msrp_session;
switch_threadattr_create(&thd_attr, helper->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, msrp_worker, helper, helper->pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "MSRP new worker client started! %s\n", msrp_session->call_id);
return SWITCH_STATUS_SUCCESS;
}
void random_string(char *buf, uint16_t size)
{
switch_stun_random_string(buf, size, NULL);
}
#define MSRP_TRANS_ID_LEN 16
SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp_msg_t *msrp_msg)
SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
{
char transaction_id[MSRP_TRANS_ID_LEN + 1] = { 0 };
char buf[MSRP_BUFF_SIZE];
@ -1158,8 +1315,13 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp
char *to_path = msrp_msg->headers[MSRP_H_TO_PATH] ? msrp_msg->headers[MSRP_H_TO_PATH] : ms->remote_path;
char *from_path = msrp_msg->headers[MSRP_H_FROM_PATH] ? msrp_msg->headers[MSRP_H_FROM_PATH] : ms->local_path;
if (!ms->running) {
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Discard one message\n");
return SWITCH_STATUS_SUCCESS;
}
if (!from_path) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "NO FROM PATH\n");
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "NO FROM PATH\n");
return SWITCH_STATUS_SUCCESS;
}
@ -1180,7 +1342,7 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp
if (msrp_msg->payload) {
if (len + msrp_msg->payload_bytes >= MSRP_BUFF_SIZE) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "payload too large! %" SWITCH_SIZE_T_FMT "\n", len + msrp_msg->payload_bytes);
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_ERROR, "payload too large! %" SWITCH_SIZE_T_FMT "\n", len + msrp_msg->payload_bytes);
return SWITCH_STATUS_FALSE;
}
memcpy(buf + len, msrp_msg->payload, msrp_msg->payload_bytes);
@ -1188,7 +1350,7 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_send(switch_msrp_session_t *ms, msrp
}
sprintf(buf + len, "\r\n-------%s$\r\n", transaction_id);
len += (12 + strlen(transaction_id));
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "---------------------send: %" SWITCH_SIZE_T_FMT " bytes\n%s\n", len, buf);
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "---------------------send: %" SWITCH_SIZE_T_FMT " bytes---------------------\n%s\n", len, buf);
return ms->csock ? msrp_socket_send(ms->csock, buf, &len) : SWITCH_STATUS_FALSE;
}