/* * mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * Copyright (C) 2011-2012, Barracuda Networks Inc. * * Version: MPL 1.1 * * The contents of this file are subject to the Mozilla Public License Version * 1.1 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Original Code is mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * * The Initial Developer of the Original Code is Barracuda Networks Inc. * Portions created by the Initial Developer are Copyright (C) * the Initial Developer. All Rights Reserved. * * Contributor(s): * * Mathieu Rene * Joao Mesquita * William King * Seven Du * Da Xiong * rtmp.c -- RTMP Protocol Handler * */ #include "mod_rtmp.h" #include "handshake.h" typedef struct { unsigned char *buf; size_t pos; size_t len; } buffer_helper_t; size_t my_buffer_read(void * out_buffer, size_t size, void * user_data) { buffer_helper_t *helper = (buffer_helper_t*)user_data; size_t len = (helper->len - helper->pos) < size ? (helper->len - helper->pos) : size; if (len <= 0) { return 0; } memcpy(out_buffer, helper->buf + helper->pos, len); helper->pos += len; return len; } size_t my_buffer_write(const void *buffer, size_t size, void * user_data) { buffer_helper_t *helper = (buffer_helper_t*)user_data; size_t len = (helper->len - helper->pos) < size ? (helper->len - helper->pos) : size; if (len <= 0) { return 0; } memcpy(helper->buf + helper->pos, buffer, len); helper->pos += len; return len; } void rtmp_handle_control(rtmp_session_t *rsession, int amfnumber) { rtmp_state_t *state = &rsession->amfstate[amfnumber]; char buf[200] = { 0 }; char *p = buf; int type = state->buf[0] << 8 | state->buf[1]; int i; for (i = 2; i < state->origlen; i++) { p += sprintf(p, "%02x ", state->buf[i] & 0xFF); } switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Control (%d): %s\n", type, buf); switch(type) { case RTMP_CTRL_STREAM_BEGIN: break; case RTMP_CTRL_PING_REQUEST: { unsigned char buf[] = { INT16(RTMP_CTRL_PING_RESPONSE), state->buf[2], state->buf[3], state->buf[4], state->buf[5] }; rtmp_send_message(rsession, amfnumber, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Ping request\n"); } break; case RTMP_CTRL_PING_RESPONSE: { uint32_t now = ((switch_micro_time_now()/1000) & 0xFFFFFFFF); uint32_t sent = state->buf[2] << 24 | state->buf[3] << 16 | state->buf[4] << 8 | state->buf[5]; switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Ping reply: %d ms\n", (int)(now - sent)); } break; case RTMP_CTRL_SET_BUFFER_LENGTH: { uint32_t stream_id = state->buf[2] << 24 | state->buf[3] << 16 | state->buf[4] << 8 | state->buf[5]; uint32_t length = state->buf[6] << 24 | state->buf[7] << 16 | state->buf[8] << 8 | state->buf[9]; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "stream=%u Client buffer set to %ums\n", stream_id, length); } break; default: switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "[amfnumber=%d] Unhandled control packet (type=0x%x)\n", amfnumber, type); } } void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber) { rtmp_state_t *state = &rsession->amfstate[amfnumber]; #ifdef RTMP_DEBUG_IO amf0_data *dump; #endif int i = 0; buffer_helper_t helper = { state->buf, 0, state->origlen }; int64_t transaction_id; const char *command; int argc = 0; amf0_data *argv[100] = { 0 }; rtmp_invoke_function_t function; #ifdef RTMP_DEBUG_IO printf(">>>>> BEGIN INVOKE MSG (num=0x%02x, type=0x%02x, stream_id=0x%x)\n", amfnumber, state->type, state->stream_id); while((dump = amf0_data_read(my_buffer_read, &helper))) { amf0_data *dump2; printf("ELM> "); amf0_data_dump(stdout, dump, 0); printf("\n"); while ((dump2 = amf0_data_read(my_buffer_read, &helper))) { printf("ELM> "); amf0_data_dump(stdout, dump2, 0); printf("\n"); amf0_data_free(dump2); } amf0_data_free(dump); } printf("<<<<< END AMF MSG\n"); #endif #ifdef RTMP_DEBUG_IO { helper.pos = 0; fprintf(rsession->io_debug_in, ">>>>> BEGIN INVOKE MSG (chunk_stream=0x%02x, type=0x%02x, stream_id=0x%x)\n", amfnumber, state->type, state->stream_id); while((dump = amf0_data_read(my_buffer_read, &helper))) { amf0_data *dump2; fprintf(rsession->io_debug_in, "ELM> "); amf0_data_dump(rsession->io_debug_in, dump, 0); fprintf(rsession->io_debug_in, "\n"); while ((dump2 = amf0_data_read(my_buffer_read, &helper))) { fprintf(rsession->io_debug_in, "ELM> "); amf0_data_dump(rsession->io_debug_in, dump2, 0); fprintf(rsession->io_debug_in, "\n"); amf0_data_free(dump2); } amf0_data_free(dump); } fprintf(rsession->io_debug_in, "<<<<< END AMF MSG\n"); fflush(rsession->io_debug_in); } #endif helper.pos = 0; while (argc < switch_arraylen(argv) && (argv[argc++] = amf0_data_read(my_buffer_read, &helper))); if (!(command = amf0_get_string(argv[i++]))) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Bogus INVOKE request\n"); return; } transaction_id = amf0_get_number(argv[i++]); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[amfnumber=%d] Got INVOKE for %s\n", amfnumber, command); if ((function = (rtmp_invoke_function_t)(intptr_t)switch_core_hash_find(rtmp_globals.invoke_hash, command))) { function(rsession, state, amfnumber, transaction_id, argc - 2, argv + 2); } else { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Unhandled invoke for \"%s\"\n", command); } /* Free all the AMF data we've read */ for (i = 0; i < argc; i++) { amf0_data_free(argv[i]); } } switch_status_t rtmp_check_auth(rtmp_session_t *rsession, const char *user, const char *domain, const char *authmd5) { switch_status_t status = SWITCH_STATUS_FALSE; char *auth; char md5[SWITCH_MD5_DIGEST_STRING_SIZE]; switch_xml_t xml = NULL, x_param, x_params; switch_bool_t allow_empty_password = SWITCH_FALSE; const char *passwd = NULL; switch_bool_t disallow_multiple_registration = SWITCH_FALSE; switch_event_t *locate_params; switch_event_create(&locate_params, SWITCH_EVENT_GENERAL); switch_assert(locate_params); switch_event_add_header_string(locate_params, SWITCH_STACK_BOTTOM, "source", "mod_rtmp"); /* Locate user */ if (switch_xml_locate_user_merged("id", user, domain, NULL, &xml, locate_params) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed. No such user %s@%s\n", user, domain); goto done; } if ((x_params = switch_xml_child(xml, "params"))) { for (x_param = switch_xml_child(x_params, "param"); x_param; x_param = x_param->next) { const char *var = switch_xml_attr_soft(x_param, "name"); const char *val = switch_xml_attr_soft(x_param, "value"); if (!strcasecmp(var, "password")) { passwd = val; } if (!strcasecmp(var, "allow-empty-password")) { allow_empty_password = switch_true(val); } if (!strcasecmp(var, "disallow-multiple-registration")) { disallow_multiple_registration = switch_true(val); } } } if (zstr(passwd)) { if (allow_empty_password) { status = SWITCH_STATUS_SUCCESS; } else { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s: empty password not allowed\n", user, switch_str_nil(domain)); } goto done; } auth = switch_core_sprintf(rsession->pool, "%s:%s@%s:%s", rsession->uuid, user, domain, passwd); switch_md5_string(md5, auth, strlen(auth)); if (!strncmp(md5, authmd5, SWITCH_MD5_DIGEST_STRING_SIZE)) { status = SWITCH_STATUS_SUCCESS; } else { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s\n", user, domain); } if (disallow_multiple_registration) { switch_hash_index_t *hi; switch_thread_rwlock_rdlock(rsession->profile->session_rwlock); for (hi = switch_core_hash_first(rsession->profile->session_hash); hi; hi = switch_core_hash_next(&hi)) { void *val; const void *key; switch_ssize_t keylen; rtmp_session_t *item; switch_core_hash_this(hi, &key, &keylen, &val); item = (rtmp_session_t *)val; if (rtmp_session_check_user(item, user, domain) == SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Logging out %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid); if (rtmp_session_logout(item, user, domain) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Unable to logout %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid); } } } switch_thread_rwlock_unlock(rsession->profile->session_rwlock); } done: if (xml) { switch_xml_free(xml); } switch_event_destroy(&locate_params); return status; } switch_status_t amf_object_to_event(amf0_data *obj, switch_event_t **event) { switch_status_t status = SWITCH_STATUS_SUCCESS; if (obj && obj->type == AMF0_TYPE_OBJECT) { amf0_node *node; if (!*event) { if ((status = switch_event_create(event, SWITCH_EVENT_CUSTOM)) != SWITCH_STATUS_SUCCESS) { return status; } } for (node = amf0_object_first(obj); node; node = amf0_object_next(node)) { const char *name = amf0_get_string(amf0_object_get_name(node)); const char *value = amf0_get_string(amf0_object_get_data(node)); if (!zstr(name) && !zstr(value)) { if (!strcmp(name, "_body")) { switch_event_add_body(*event, "%s", value); } else { switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, name, value); } } } } else { status = SWITCH_STATUS_FALSE; } return status; } switch_status_t amf_event_to_object(amf0_data **obj, switch_event_t *event) { switch_event_header_t *hp; const char *body; switch_assert(event); switch_assert(obj); if (!*obj) { *obj = amf0_object_new(); } for (hp = event->headers; hp; hp = hp->next) { amf0_object_add(*obj, hp->name, amf0_str(hp->value)); } body = switch_event_get_body(event); if (!zstr(body)) { amf0_object_add(*obj, "_body", amf0_str(body)); } return SWITCH_STATUS_SUCCESS; } void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize) { if (rsession->out_chunksize != chunksize) { unsigned char buf[] = { INT32(chunksize) }; rtmp_send_message(rsession, 2 /*amfnumber*/, 0, RTMP_TYPE_CHUNKSIZE, 0, buf, sizeof(buf), MSG_FULLHEADER); rsession->out_chunksize = chunksize; } } void rtmp_get_user_variables(switch_event_t **event, switch_core_session_t *session) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_event_header_t *he; if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) { return; } if ((he = switch_channel_variable_first(channel))) { for (; he; he = he->next) { if (!strncmp(he->name, RTMP_USER_VARIABLE_PREFIX, strlen(RTMP_USER_VARIABLE_PREFIX))) { switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, he->name, he->value); } } switch_channel_variable_last(channel); } } void rtmp_get_user_variables_event(switch_event_t **event, switch_event_t *var_event) { switch_event_header_t *he; if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) { return; } if ((he = var_event->headers)) { for (; he; he = he->next) { if (!strncmp(he->name, RTMP_USER_VARIABLE_PREFIX, strlen(RTMP_USER_VARIABLE_PREFIX))) { switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, he->name, he->value); } } } } void rtmp_session_send_onattach(rtmp_session_t *rsession) { const char *uuid = ""; if (rsession->tech_pvt) { uuid = switch_core_session_get_uuid(rsession->tech_pvt->session); } rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("onAttach"), amf0_number_new(0), amf0_null_new(), amf0_str(uuid), NULL); } void rtmp_send_display_update(switch_core_session_t *session) { rtmp_private_t *tech_pvt = switch_core_session_get_private(session); rtmp_session_t *rsession = tech_pvt->rtmp_session; rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("displayUpdate"), amf0_number_new(0), amf0_null_new(), amf0_str(switch_core_session_get_uuid(session)), amf0_str(switch_str_nil(tech_pvt->display_callee_id_name)), amf0_str(switch_str_nil(tech_pvt->display_callee_id_number)), NULL); } void rtmp_send_incoming_call(switch_core_session_t *session, switch_event_t *var_event) { rtmp_private_t *tech_pvt = switch_core_session_get_private(session); rtmp_session_t *rsession = tech_pvt->rtmp_session; switch_channel_t *channel = switch_core_session_get_channel(session); switch_caller_profile_t *caller_profile = switch_channel_get_caller_profile(channel); switch_event_t *event = NULL; amf0_data *obj = NULL; if (var_event) { rtmp_get_user_variables_event(&event, var_event); } else { rtmp_get_user_variables(&event, session); } if (event) { if (tech_pvt->has_video) { switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "want_video", "true"); } amf_event_to_object(&obj, event); switch_event_destroy(&event); } rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("incomingCall"), amf0_number_new(0), amf0_null_new(), amf0_str(switch_core_session_get_uuid(session)), amf0_str(switch_str_nil(caller_profile->caller_id_name)), amf0_str(switch_str_nil(caller_profile->caller_id_number)), !zstr(tech_pvt->auth) ? amf0_str(tech_pvt->auth) : amf0_null_new(), obj ? obj : amf0_null_new(), NULL); } void rtmp_send_onhangup(switch_core_session_t *session) { rtmp_private_t *tech_pvt = switch_core_session_get_private(session); rtmp_session_t *rsession = tech_pvt->rtmp_session; switch_channel_t *channel = switch_core_session_get_channel(session); rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("onHangup"), amf0_number_new(0), amf0_null_new(), amf0_str(switch_core_session_get_uuid(session)), amf0_str(switch_channel_cause2str(switch_channel_get_cause(channel))), NULL); } void rtmp_send_event(rtmp_session_t *rsession, switch_event_t *event) { amf0_data *obj = NULL; switch_assert(event != NULL); switch_assert(rsession != NULL); if (amf_event_to_object(&obj, event) == SWITCH_STATUS_SUCCESS) { rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("event"), amf0_number_new(0), amf0_null_new(), obj, NULL); } } void rtmp_ping(rtmp_session_t *rsession) { uint32_t now = (uint32_t)((switch_micro_time_now() / 1000) & 0xFFFFFFFF); unsigned char buf[] = { INT16(RTMP_CTRL_PING_REQUEST), INT32(now) }; rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0); } void rtmp_notify_call_state(switch_core_session_t *session) { switch_channel_t *channel = switch_core_session_get_channel(session); const char *state = switch_channel_callstate2str(switch_channel_get_callstate(channel)); rtmp_private_t *tech_pvt = switch_core_session_get_private(session); rtmp_session_t *rsession = tech_pvt->rtmp_session; rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("callState"), amf0_number_new(0), amf0_null_new(), amf0_str(switch_core_session_get_uuid(session)), amf0_str(state), NULL); } switch_status_t rtmp_send_invoke(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint32_t stream_id, ...) { switch_status_t s; va_list list; va_start(list, stream_id); s = rtmp_send_invoke_v(rsession, amfnumber, RTMP_TYPE_INVOKE, timestamp, stream_id, list, SWITCH_FALSE); va_end(list); return s; } switch_status_t rtmp_send_invoke_free(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint32_t stream_id, ...) { switch_status_t s; va_list list; va_start(list, stream_id); s = rtmp_send_invoke_v(rsession, amfnumber, RTMP_TYPE_INVOKE, timestamp, stream_id, list, SWITCH_TRUE); va_end(list); return s; } switch_status_t rtmp_send_notify(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint32_t stream_id, ...) { switch_status_t s; va_list list; va_start(list, stream_id); s = rtmp_send_invoke_v(rsession, amfnumber, RTMP_TYPE_NOTIFY, timestamp, stream_id, list, SWITCH_FALSE); va_end(list); return s; } switch_status_t rtmp_send_notify_free(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint32_t stream_id, ...) { switch_status_t s; va_list list; va_start(list, stream_id); s = rtmp_send_invoke_v(rsession, amfnumber, RTMP_TYPE_NOTIFY, timestamp, stream_id, list, SWITCH_TRUE); va_end(list); return s; } switch_status_t rtmp_send_invoke_v(rtmp_session_t *rsession, uint8_t amfnumber, uint8_t type, uint32_t timestamp, uint32_t stream_id, va_list list, switch_bool_t freethem) { amf0_data *data; unsigned char buf[AMF_MAX_SIZE]; buffer_helper_t helper = { buf, 0, AMF_MAX_SIZE }; while ((data = va_arg(list, amf0_data*))) { //amf0_data_dump(stdout, data, 0); //printf("\n"); amf0_data_write(data, my_buffer_write, &helper); if (freethem) { amf0_data_free(data); } } return rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, buf, helper.pos, 0); } /* Break message down into 128 bytes chunks, add the appropriate headers and send it out */ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags) { switch_size_t pos = 0; uint8_t header[12] = { amfnumber & 0x3F, INT24(0), INT24(len), type, INT32_LE(stream_id) }; switch_size_t chunksize; uint8_t microhdr = (3 << 6) | amfnumber; switch_size_t hdrsize = 1; switch_status_t status = SWITCH_STATUS_SUCCESS; rtmp_state_t *state = &rsession->amfstate_out[amfnumber]; // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d send_ack=%d send=%d window=%d wait_ack=%d\n", // type, rsession->send_ack, rsession->send, rsession->send_ack_window, rsession->send + 3073 - rsession->send_ack); if (type == RTMP_TYPE_VIDEO) { uint32_t window = rsession->send_ack_window; if (rsession->media_debug & RTMP_MD_VIDEO_WRITE) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W V ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len); } /* start to drop video frame on window/2 if the frame is a non-IDR video frame start to drop video frame on window * 3/4 if the frame is a IDR frame start to drop audio frame on widnow full */ if (*message == 0x17) { window = window / 4 * 3; } else { window /= 2; } if ((rsession->send_ack + window) < (rsession->send + 3073)) { /* We're sending too fast, drop the frame */ rsession->dropped_video_frame++; switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "DROP VIDEO FRAME [amfnumber=%d type=0x%x stream_id=0x%x ftype=0x%x] len=%"SWITCH_SIZE_T_FMT " dropped=%"SWITCH_SIZE_T_FMT"\n", amfnumber, type, stream_id, *message, len, rsession->dropped_video_frame); return SWITCH_STATUS_SUCCESS; } if (rsession->dropped_video_frame) { if (*message != 0x17) { rsession->dropped_video_frame++; switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "DROP VIDEO FRAME [amfnumber=%d type=0x%x stream_id=0x%x ftype=0x%x] len=%"SWITCH_SIZE_T_FMT " dropped=%"SWITCH_SIZE_T_FMT" waiting for the next IDR\n", amfnumber, type, stream_id, *message, len, rsession->dropped_video_frame); return SWITCH_STATUS_SUCCESS; } else { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Got IDR frame after %"SWITCH_SIZE_T_FMT" frame(s) dropped\n", rsession->dropped_video_frame); rsession->dropped_video_frame = 0; } } } if (type == RTMP_TYPE_AUDIO && (rsession->media_debug & RTMP_MD_AUDIO_WRITE)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W A ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len); } if (type == RTMP_TYPE_AUDIO && (rsession->send_ack + rsession->send_ack_window) < (rsession->send + 3073)) { /* We're sending too fast, drop the frame */ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "DROP %s FRAME [amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", "AUDIO", amfnumber, type, stream_id, len); return SWITCH_STATUS_SUCCESS; } if (type != RTMP_TYPE_AUDIO && type != RTMP_TYPE_VIDEO && type != RTMP_TYPE_ACK) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len); } #ifdef RTMP_DEBUG_IO { fprintf(rsession->io_debug_out, "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len); if (type == RTMP_TYPE_INVOKE || type == RTMP_TYPE_NOTIFY) { buffer_helper_t helper = { (unsigned char*)message, 0, len }; amf0_data *dump; while((dump = amf0_data_read(my_buffer_read, &helper))) { amf0_data *dump2; fprintf(rsession->io_debug_out, "ELM> "); amf0_data_dump(rsession->io_debug_out, dump, 0); fprintf(rsession->io_debug_out, "\n"); while ((dump2 = amf0_data_read(my_buffer_read, &helper))) { fprintf(rsession->io_debug_out, "ELM> "); amf0_data_dump(rsession->io_debug_out, dump2, 0); fprintf(rsession->io_debug_out, "\n"); amf0_data_free(dump2); } amf0_data_free(dump); } fprintf(rsession->io_debug_out, "<<<<< END AMF MSG\n"); } fflush(rsession->io_debug_out); } #endif /* Find out what is the smallest header we can use */ if (!(flags & MSG_FULLHEADER) && stream_id > 0 && state->stream_id == stream_id && timestamp >= state->ts) { if (state->type == type && state->origlen == (int)len) { if (state->ts == timestamp) { /* Type 3: no header! */ hdrsize = 1; header[0] |= 3 << 6; } else { uint32_t delta = timestamp - state->ts; /* Type 2: timestamp delta */ hdrsize = 4; header[0] |= 2 << 6; header[1] = (delta >> 16) & 0xFF; header[2] = (delta >> 8) & 0xFF; header[3] = delta & 0xFF; } } else { /* Type 1: ts delta + msg len + type */ uint32_t delta = timestamp - state->ts; hdrsize = 8; header[0] |= 1 << 6; header[1] = (delta >> 16) & 0xFF; header[2] = (delta >> 8) & 0xFF; header[3] = delta & 0xFF; } } else { hdrsize = 12; /* Type 0, full header */ header[1] = (timestamp >> 16) & 0xFF; header[2] = (timestamp >> 8) & 0xFF; header[3] = timestamp & 0xFF; } state->ts = timestamp; state->type = type; state->origlen = len; state->stream_id = stream_id; switch_mutex_lock(rsession->socket_mutex); chunksize = (len - pos) < rsession->out_chunksize ? (len - pos) : rsession->out_chunksize; if (rsession->profile->io->write(rsession, (unsigned char*)header, &hdrsize) != SWITCH_STATUS_SUCCESS) { switch_goto_status(SWITCH_STATUS_FALSE, end); } rsession->send += hdrsize; /* Write one chunk of data */ if (rsession->profile->io->write(rsession, (unsigned char*)message, &chunksize) != SWITCH_STATUS_SUCCESS) { switch_goto_status(SWITCH_STATUS_FALSE, end); } rsession->send += chunksize; pos += chunksize; /* Send more chunks if we need to */ while (((signed)len - (signed)pos) > 0) { switch_mutex_unlock(rsession->socket_mutex); /* Let other threads send data on the socket */ switch_cond_next(); switch_mutex_lock(rsession->socket_mutex); hdrsize = 1; if (rsession->profile->io->write(rsession, (unsigned char*)µhdr, &hdrsize) != SWITCH_STATUS_SUCCESS) { switch_goto_status(SWITCH_STATUS_FALSE, end); } rsession->send += hdrsize; chunksize = (len - pos) < rsession->out_chunksize ? (len - pos) : rsession->out_chunksize; if (rsession->profile->io->write(rsession, message + pos, &chunksize) != SWITCH_STATUS_SUCCESS) { switch_goto_status(SWITCH_STATUS_FALSE, end); } rsession->send += chunksize; pos += chunksize; } end: switch_mutex_unlock(rsession->socket_mutex); return status; } /* Returns SWITCH_STATUS_SUCCESS of the connection is still active or SWITCH_STATUS_FALSE to tear it down */ switch_status_t rtmp_handle_data(rtmp_session_t *rsession) { uint8_t buf[RTMP_TCP_READ_BUF]; switch_size_t s = RTMP_TCP_READ_BUF; if (rsession->state == RS_HANDSHAKE) { s = 1537 - rsession->hspos; if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); return SWITCH_STATUS_FALSE; } rsession->hspos += s; /* Receive C0 and C1 */ if (rsession->hspos < 1537) { /* Not quite there yet */ return SWITCH_STATUS_SUCCESS; } /* Send reply (S0 + S1) */ memset(buf, 0, sizeof(buf)); //*buf = '\x03'; /* fix handshake for h264 */ { handshake_helper_t shake_helper; shake_helper.r_buf = rsession->hsbuf; shake_helper.r_len = 2048; shake_helper.r_pos = 0; shake_helper.w_buf = buf; shake_helper.w_len = sizeof(buf); shake_helper.w_pos = 0; SHandShake0(&shake_helper); } s = 1537; rsession->profile->io->write(rsession, (unsigned char*)buf, &s); /* Send S2 */ s = 1536; //rsession->profile->io->write(rsession, rsession->hsbuf, &s); rsession->profile->io->write(rsession, (unsigned char*)buf + 1537, &s); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Sent handshake response\n"); rsession->state++; rsession->hspos = 0; } else if (rsession->state == RS_HANDSHAKE2) { s = 1536 - rsession->hspos; /* Receive C2 */ if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); return SWITCH_STATUS_FALSE; } rsession->hspos += s; if (rsession->hspos < 1536) { /* Not quite there yet */ return SWITCH_STATUS_SUCCESS; } rsession->state++; //s = 1536; //rsession->profile->io->write(rsession, (char*)buf, &s); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Done with handshake\n"); return SWITCH_STATUS_SUCCESS; } else if (rsession->state == RS_ESTABLISHED) { /* Process RTMP packet */ switch(rsession->parse_state) { case 0: // Read the header's first byte s = 1; if (rsession->profile->io->read(rsession, (unsigned char*)buf, &s) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); return SWITCH_STATUS_FALSE; } rsession->recv += s; switch(buf[0] >> 6) { case 0: rsession->hdrsize = 12; break; case 1: rsession->hdrsize = 8; break; case 2: rsession->hdrsize = 4; break; case 3: rsession->hdrsize = 1; break; default: rsession->hdrsize = 0; switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_CRIT, "WTF hdrsize 0x%02x %d\n", *buf, *buf >> 6); return SWITCH_STATUS_FALSE; } rsession->amfnumber = buf[0] & 0x3F; /* Get rid of the 2 first bits */ if (rsession->amfnumber > 64) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error\n"); return SWITCH_STATUS_FALSE; } //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Header size: %d AMF Number: %d\n", rsession->hdrsize, rsession->amfnumber); rsession->parse_state++; if (rsession->hdrsize == 1) { /* Skip header fetch on one-byte headers since we have it already */ rsession->parse_state++; } rsession->parse_remain = 0; break; case 1: { /* Read full header and decode */ rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber]; uint8_t *hdr = (uint8_t*)state->header.sz; unsigned char *readbuf = (unsigned char*)hdr; if (!rsession->parse_remain) { rsession->parse_remain = s = rsession->hdrsize - 1; } else { s = rsession->parse_remain; readbuf += (rsession->hdrsize - 1) - s; } if ( !(s < 12 && s > 0) ) { /** XXX **/ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Protocol error: Invalid header size\n"); return SWITCH_STATUS_FALSE; } if (rsession->profile->io->read(rsession, readbuf, &s) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); return SWITCH_STATUS_FALSE; } rsession->parse_remain -= s; if (rsession->parse_remain > 0) { /* More data please */ return SWITCH_STATUS_SUCCESS; } rsession->recv += s; if (rsession->hdrsize == 12) { state->ts = (hdr[0] << 16) | (hdr[1] << 8) | (hdr[2]); state->ts_delta = 0; } else if (rsession->hdrsize >= 4) { /* Save the timestamp delta since we have to re-use it with type 3 headers */ state->ts_delta = (hdr[0] << 16) | (hdr[1] << 8) | (hdr[2]); state->ts += state->ts_delta; } else if (rsession->hdrsize == 1) { /* Type 3: Re-use timestamp delta if we have one */ state->ts += state->ts_delta; } if (rsession->hdrsize >= 8) { /* Reset length counter since its included in the header */ state->remainlen = state->origlen = (hdr[3] << 16) | (hdr[4] << 8) | (hdr[5]); state->buf_pos = 0; state->type = hdr[6]; } if (rsession->hdrsize == 12) { state->stream_id = (hdr[10] << 24) | (hdr[9] << 16) | (hdr[8] << 8) | hdr[7]; } if (rsession->hdrsize >= 8 && state->origlen == 0) { /* Happens we sometimes get a 0 length packet */ rsession->parse_state = 0; return SWITCH_STATUS_SUCCESS; } /* FIXME: Handle extended timestamps */ if (state->ts == 0x00ffffff) { return SWITCH_STATUS_FALSE; } rsession->parse_state++; } break; case 2: { rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber]; if (rsession->parse_remain > 0) { s = rsession->parse_remain; } else { s = state->remainlen < rsession->in_chunksize ? state->remainlen : rsession->in_chunksize; rsession->parse_remain = s; } if (!s) { /* Restart from beginning */ state->remainlen = state->origlen; s = state->remainlen < rsession->in_chunksize ? state->remainlen : rsession->in_chunksize; rsession->parse_remain = s; if (!s) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error, forcing big read\n"); s = sizeof(state->buf); rsession->profile->io->read(rsession, state->buf, &s); return SWITCH_STATUS_FALSE; } } /* Sanity check */ if ((state->buf_pos + s) > AMF_MAX_SIZE) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "WTF %"SWITCH_SIZE_T_FMT" %"SWITCH_SIZE_T_FMT"\n", state->buf_pos, s); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: exceeding max AMF packet size\n"); return SWITCH_STATUS_FALSE; } if (s > rsession->in_chunksize) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: invalid chunksize\n"); return SWITCH_STATUS_FALSE; } if (rsession->profile->io->read(rsession, state->buf + state->buf_pos, &s) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); return SWITCH_STATUS_FALSE; } rsession->recv += s; state->remainlen -= s; rsession->parse_remain -= s; state->buf_pos += s; if (rsession->parse_remain > 0) { /* Need more data */ return SWITCH_STATUS_SUCCESS; } if (state->remainlen == 0) { if (state->type != RTMP_TYPE_AUDIO && state->type != RTMP_TYPE_VIDEO && state->type != RTMP_TYPE_ACK) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] len=%d\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, state->origlen); } #ifdef RTMP_DEBUG_IO fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] len=%d\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, state->origlen); #endif switch(state->type) { case RTMP_TYPE_CHUNKSIZE: rsession->in_chunksize = state->buf[0] << 24 | state->buf[1] << 16 | state->buf[2] << 8 | state->buf[3]; switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "SET CHUNKSIZE=%d\n", (int)rsession->in_chunksize); break; case RTMP_TYPE_USERCTRL: rtmp_handle_control(rsession, rsession->amfnumber); break; case RTMP_TYPE_INVOKE: rtmp_handle_invoke(rsession, rsession->amfnumber); break; case RTMP_TYPE_AUDIO: /* Audio data */ if (rsession->media_debug & RTMP_MD_AUDIO_READ) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "R A ts:%u data:0x%02x len:%d\n", state->ts, *(state->buf), state->origlen); } switch_thread_rwlock_wrlock(rsession->rwlock); if (rsession->tech_pvt) { uint16_t len = state->origlen; if (!rsession->tech_pvt->readbuf) { switch_thread_rwlock_unlock(rsession->rwlock); return SWITCH_STATUS_FALSE; } switch_mutex_lock(rsession->tech_pvt->readbuf_mutex); if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > rsession->tech_pvt->maxlen * 40) { rsession->tech_pvt->over_size++; } else { rsession->tech_pvt->over_size = 0; } if (rsession->tech_pvt->over_size > 10) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "%s buffer > %u for 10 consecutive packets... Flushing buffer\n", switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->maxlen * 40); switch_buffer_zero(rsession->tech_pvt->readbuf); #ifdef RTMP_DEBUG_IO fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] FLUSH BUFFER [exceeded %u]\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, rsession->tech_pvt->maxlen * 5); #endif } switch_buffer_write(rsession->tech_pvt->readbuf, &len, 2); switch_buffer_write(rsession->tech_pvt->readbuf, state->buf, len); if (len > rsession->tech_pvt->maxlen) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "changing maxlen from %d to %d\n", rsession->tech_pvt->maxlen, len); rsession->tech_pvt->maxlen = len; } switch_mutex_unlock(rsession->tech_pvt->readbuf_mutex); } switch_thread_rwlock_unlock(rsession->rwlock); break; case RTMP_TYPE_VIDEO: /* Video data */ if (rsession->media_debug & RTMP_MD_VIDEO_READ) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "R V ts:%u data:0x%02x len:%d \n", state->ts, *(state->buf), state->origlen); } if ((!rsession->tech_pvt) || (!rsession->tech_pvt->has_video)) break; switch_thread_rwlock_wrlock(rsession->rwlock); if (rsession->tech_pvt) { uint16_t len = state->origlen; if (!rsession->tech_pvt->video_readbuf) { switch_thread_rwlock_unlock(rsession->rwlock); return SWITCH_STATUS_FALSE; } switch_mutex_lock(rsession->tech_pvt->video_readbuf_mutex); if (rsession->tech_pvt->video_maxlen && switch_buffer_inuse(rsession->tech_pvt->video_readbuf) > rsession->tech_pvt->video_maxlen * 100) { rsession->tech_pvt->video_over_size++; } else { rsession->tech_pvt->video_over_size = 0; } if (rsession->tech_pvt->video_over_size > 10) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "%s buffer > %u for 10 consecutive packets... Flushing buffer\n", switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->video_maxlen * 100); switch_buffer_zero(rsession->tech_pvt->video_readbuf); #ifdef RTMP_DEBUG_IO fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] FLUSH BUFFER [exceeded %u]\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, rsession->tech_pvt->video_maxlen * 5); #endif } switch_buffer_write(rsession->tech_pvt->video_readbuf, &len, 2); switch_buffer_write(rsession->tech_pvt->video_readbuf, &state->ts, 4); switch_buffer_write(rsession->tech_pvt->video_readbuf, state->buf, len); if (len > rsession->tech_pvt->video_maxlen) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "changing video max len from %d to %d\n", rsession->tech_pvt->video_maxlen, len); rsession->tech_pvt->video_maxlen = len; } switch_mutex_unlock(rsession->tech_pvt->video_readbuf_mutex); } switch_thread_rwlock_unlock(rsession->rwlock); break; case RTMP_TYPE_METADATA: /* Metadata */ break; case RTMP_TYPE_WINDOW_ACK_SIZE: { uint32_t new_window = (state->buf[0] << 24) | (state->buf[1] << 16) | (state->buf[2] << 8) | (state->buf[3]); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Set window size: from %u to %u bytes\n", rsession->send_ack_window, new_window); rsession->send_ack_window = new_window; } break; case RTMP_TYPE_ACK: { switch_time_t now = switch_micro_time_now(); uint32_t ack = (state->buf[0] << 24) | (state->buf[1] << 16) | (state->buf[2] << 8) | (state->buf[3]); uint32_t delta = rsession->send_ack_ts == 0 ? 0 : now - rsession->send_ack_ts; delta /= 1000000; /* microseconds -> seconds */ if (delta) { rsession->send_bw = (ack - rsession->send_ack) / delta; } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "got ack %d send:%d wait-ack:%d\n", ack, rsession->send + 3073, rsession->send + 3073 - ack); rsession->send_ack = ack; rsession->send_ack_ts = switch_micro_time_now(); break; } default: switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Cannot handle message type 0x%x\n", state->type); break; } state->buf_pos = 0; } rsession->parse_state = 0; /* Send an ACK if we need to */ if (rsession->recv - rsession->recv_ack_sent >= rsession->recv_ack_window) { unsigned char ackbuf[] = { INT32(rsession->recv) }; rtmp_send_message(rsession, 2/*chunkstream*/, 0/*ts*/, RTMP_TYPE_ACK, 0/*msg stream id */, ackbuf, sizeof(ackbuf), 0 /*flags*/); rsession->recv_ack_sent = rsession->recv; } } } } return SWITCH_STATUS_SUCCESS; } /* For Emacs: * Local Variables: * mode:c * indent-tabs-mode:t * tab-width:4 * c-basic-offset:4 * End: * For VIM: * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: */