From fc208bb0b988058423d5f5432091b6686d7e7bef Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Thu, 27 Aug 2009 21:41:47 +0000 Subject: [PATCH] Use our own memory pool instead of the sessions git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@14652 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/mod_erlang_event.c | 128 ++++++++++++------ .../mod_erlang_event/mod_erlang_event.h | 2 + 2 files changed, 89 insertions(+), 41 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 8b3344de6e..3c9bfa7b7a 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -42,6 +42,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime); SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime); static void remove_listener(listener_t *listener); +static switch_status_t state_handler(switch_core_session_t *session); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_cookie, prefs.cookie); @@ -147,7 +148,7 @@ static void send_event_to_attached_sessions(listener_t* listener, switch_event_t for (s = listener->session_list; s; s = s->next) { /* check the event uuid against the uuid of each session */ if (!strcmp(uuid, s->uuid_str)) { - switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event to attached session for %s\n", s->uuid_str); + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session for %s\n", switch_event_name(event->event_id), s->uuid_str); if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { /* add the event to the queue for this session */ if (switch_queue_trypush(s->event_queue, clone) != SWITCH_STATUS_SUCCESS) { @@ -312,7 +313,6 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element) { session_elem_t *s, *last = NULL; - switch_core_session_t *session; if (!session_element) return; @@ -325,19 +325,25 @@ static void remove_session_elem_from_listener(listener_t *listener, session_elem } else { listener->session_list = s->next; } - if ((session = switch_core_session_locate(session_element->uuid_str))) { - switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); - switch_core_session_rwunlock(session); - } - /* this allows the application threads to exit */ - switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE); - switch_safe_free(s); break; } last = s; } } +static void destroy_session_elem(session_elem_t *session_element) +{ + switch_core_session_t *session; + + if ((session = switch_core_session_locate(session_element->uuid_str))) { + switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); + switch_core_session_rwunlock(session); + } + /* this allows the application threads to exit */ + switch_clear_flag_locked(session_element, LFLAG_SESSION_ALIVE); + switch_core_destroy_memory_pool(&session_element->pool); + /*switch_safe_free(s);*/ +} static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element) { switch_mutex_lock(listener->session_mutex); @@ -537,9 +543,52 @@ static switch_status_t check_attached_sessions(listener_t *listener) } switch_set_flag(sp, LFLAG_OUTBOUND_INIT); } - /* check event queue for this session */ - if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + + if (switch_test_flag(sp, LFLAG_SESSION_COMPLETE)) { + ei_x_buff ebuf; + + /* flush the event queue */ + while (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + switch_event_t *pevent = (switch_event_t *) pop; + + /*switch_log_printf(SWITCH_CHANNEL_UUID_LOG(sp->uuid_str), SWITCH_LOG_DEBUG, "flushed event %s for %s\n", switch_event_name(pevent->event_id), sp->uuid_str);*/ + + /* events from attached sessions are wrapped in a {call_event,} tuple + to distinguish them from normal events (if they are sent to the same process) + */ + + ei_x_new_with_version(&ebuf); + ei_x_encode_tuple_header(&ebuf, 2); + ei_x_encode_atom(&ebuf, "call_event"); + ei_encode_switch_event(&ebuf, pevent); + + switch_mutex_lock(listener->sock_mutex); + ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); + switch_mutex_unlock(listener->sock_mutex); + ei_x_free(&ebuf); + switch_event_destroy(&pevent); + } + /* this session can be removed */ + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(sp->uuid_str), SWITCH_LOG_DEBUG, "Destroy event for attached session for %s in state %s\n", sp->uuid_str, switch_channel_state_name(sp->channel_state)); + + ei_x_new_with_version(&ebuf); + ei_x_encode_atom(&ebuf, "call_hangup"); + switch_mutex_lock(listener->sock_mutex); + ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); + switch_mutex_unlock(listener->sock_mutex); + ei_x_free(&ebuf); + removed = sp; + sp = removed->next; + + remove_session_elem_from_listener(listener, removed); + destroy_session_elem(removed); + continue; + } else if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + + /* check event queue for this session */ switch_event_t *pevent = (switch_event_t *) pop; + + /*switch_log_printf(SWITCH_CHANNEL_UUID_LOG(sp->uuid_str), SWITCH_LOG_DEBUG, "popped event %s for %s\n", switch_event_name(pevent->event_id), sp->uuid_str);*/ /* events from attached sessions are wrapped in a {call_event,} tuple to distinguish them from normal events (if they are sent to the same process) @@ -556,33 +605,9 @@ static switch_status_t check_attached_sessions(listener_t *listener) switch_mutex_unlock(listener->sock_mutex); ei_x_free(&ebuf); - /* event is a channel destroy, so this session can be removed */ - if (pevent->event_id == SWITCH_EVENT_CHANNEL_DESTROY) { - switch_log_printf(SWITCH_CHANNEL_UUID_LOG(sp->uuid_str), SWITCH_LOG_DEBUG, "Destroy event for attached session for %s in state %s\n", sp->uuid_str, switch_channel_state_name(sp->channel_state)); - - /* this allows the application threads to exit */ - removed = sp; - - ei_x_new_with_version(&ebuf); - ei_x_encode_atom(&ebuf, "call_hangup"); - switch_mutex_lock(listener->sock_mutex); - ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); - switch_mutex_unlock(listener->sock_mutex); - ei_x_free(&ebuf); - - /* TODO - if this listener was created outbound, and the last session has been detached - should the listener also exit? Does it matter? - */ - } switch_event_destroy(&pevent); } sp = sp->next; - if (removed) { - remove_session_elem_from_listener(listener, removed); - } else { - last = sp; - } } switch_mutex_unlock(listener->session_mutex); return status; @@ -685,12 +710,13 @@ static void handle_exit(listener_t *listener, erlang_pid *pid) switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_set_private(channel, "_erlang_session_", NULL); switch_channel_set_private(channel, "_erlang_listener_", NULL); - /* TODO can we clear out the state_change hook too? */ + switch_core_event_hook_remove_state_change(session, state_handler); switch_core_session_rwunlock(session); } /* TODO - if a spawned process that was handling an outbound call fails.. what do we do with the call? */ } remove_session_elem_from_listener_locked(listener, s); + destroy_session_elem(s); } if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) { @@ -713,7 +739,10 @@ static void handle_exit(listener_t *listener, erlang_pid *pid) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Event handler process for node %s exited\n", pid->node); /*purge the event queue */ - while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS); + while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + switch_event_t *pevent = (switch_event_t *) pop; + switch_event_destroy(&pevent); + } if (switch_test_flag(listener, LFLAG_EVENTS)) { uint8_t x = 0; @@ -906,6 +935,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) /* clean up all the attached sessions */ switch_mutex_lock(listener->session_mutex); + /* TODO destroy memory pools since they're not children of the listener's pool*/ for (s = listener->session_list; s; s = s->next) { if ((session = switch_core_session_locate(s->uuid_str))) { switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); @@ -1076,6 +1106,11 @@ static switch_status_t state_handler(switch_core_session_t *session) if (session_element) { session_element->channel_state = state; + if (state == CS_DESTROY) { + /* indicate that once all the events in the event queue are done + * we can throw this away */ + switch_set_flag(session_element, LFLAG_SESSION_COMPLETE); + } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "unable to update channel state for %s to %s\n", switch_core_session_get_uuid(session), switch_channel_state_name(state)); } @@ -1086,15 +1121,25 @@ static switch_status_t state_handler(switch_core_session_t *session) session_elem_t *session_elem_create(listener_t* listener, switch_core_session_t *session) { /* create a session list element */ - session_elem_t* session_element = malloc(sizeof(*session_element)); + switch_memory_pool_t *session_elem_pool; + session_elem_t* session_element;/* = malloc(sizeof(*session_element));*/ switch_channel_t *channel = switch_core_session_get_channel(session); + if (switch_core_new_memory_pool(&session_elem_pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); + return NULL; + } + + session_element = switch_core_alloc(session_elem_pool, sizeof(*session_element)); + memset(session_element, 0, sizeof(*session_element)); memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); + session_element->pool = session_elem_pool; + session_elem_pool = NULL; - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, session_element->pool); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, session_element->pool); switch_channel_set_private(channel, "_erlang_session_", session_element); switch_channel_set_private(channel, "_erlang_listener_", listener); @@ -1184,6 +1229,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); remove_session_elem_from_listener_locked(listener,session_element); switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ + destroy_session_elem(session_element); return NULL; } i++; @@ -1399,7 +1445,7 @@ SWITCH_STANDARD_API(erlang_cmd) stream->write_function(stream, "Could not find a listener for %s\n", argv[1]); } else { - stream->write_function(stream, "I don't care for those arguments at all, sorry"); + stream->write_function(stream, "USAGE: erlang sessions \n"); goto done; } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 9f6a5ec9ee..0debebe943 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -36,6 +36,7 @@ typedef enum { LFLAG_WAITING_FOR_PID = (1 << 0), /* waiting for a node to return a pid */ LFLAG_OUTBOUND_INIT = (1 << 1), /* Erlang peer has been notified of this session */ LFLAG_SESSION_ALIVE = (1 << 2), + LFLAG_SESSION_COMPLETE = (1 << 3), } session_flag_t; typedef enum { @@ -61,6 +62,7 @@ struct session_elem { struct erlang_process process; switch_queue_t *event_queue; switch_channel_state_t channel_state; + switch_memory_pool_t *pool; struct session_elem *next; };