mod_rayo: rework component inheritance- component now inherits parent call/mixer mutex and parent reference counting moved to base class

This commit is contained in:
Chris Rienzo 2014-06-13 12:16:26 -04:00
parent 730d2f88ba
commit 1511fe3ee8
8 changed files with 49 additions and 45 deletions

View File

@ -950,6 +950,10 @@ void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int line)
if (actor->cleanup_fn) {
actor->cleanup_fn(actor);
}
if (actor->parent) {
/* safe to destroy parent now */
RAYO_RELEASE(actor->parent);
}
switch_core_hash_delete(globals.destroy_actors, RAYO_JID(actor));
switch_core_destroy_memory_pool(&pool);
} else {
@ -1224,10 +1228,12 @@ void rayo_actor_send_ignore(struct rayo_actor *to, struct rayo_message *msg)
switch_log_printf(SWITCH_CHANNEL_ID_LOG, msg->file, "", msg->line, "", SWITCH_LOG_WARNING, "%s, dropping unexpected message to %s.\n", msg->from_jid, RAYO_JID(to));
}
#define RAYO_ACTOR_INIT(actor, pool, type, subtype, id, jid, cleanup, send) rayo_actor_init(actor, pool, type, subtype, id, jid, cleanup, send, __FILE__, __LINE__)
#define RAYO_ACTOR_INIT(actor, pool, type, subtype, id, jid, cleanup, send) rayo_actor_init(actor, pool, type, subtype, id, jid, cleanup, send, NULL, __FILE__, __LINE__)
#define RAYO_ACTOR_INIT_PARENT(actor, pool, type, subtype, id, jid, cleanup, send, parent) rayo_actor_init(actor, pool, type, subtype, id, jid, cleanup, send, parent, __FILE__, __LINE__)
/**
* Initialize a rayo actor
* @param actor to initialize
* @param pool to use
* @param type of actor (MIXER, CALL, SERVER, COMPONENT)
* @param subtype of actor (input/output/prompt)
@ -1235,11 +1241,12 @@ void rayo_actor_send_ignore(struct rayo_actor *to, struct rayo_message *msg)
* @param jid external ID
* @param cleanup function
* @param send sent message handler
* @param parent of actor
* @param file that called this function
* @param line that called this function
* @return the actor or NULL if JID conflict
*/
static struct rayo_actor *rayo_actor_init(struct rayo_actor *actor, switch_memory_pool_t *pool, const char *type, const char *subtype, const char *id, const char *jid, rayo_actor_cleanup_fn cleanup, rayo_actor_send_fn send, const char *file, int line)
static struct rayo_actor *rayo_actor_init(struct rayo_actor *actor, switch_memory_pool_t *pool, const char *type, const char *subtype, const char *id, const char *jid, rayo_actor_cleanup_fn cleanup, rayo_actor_send_fn send, struct rayo_actor *parent, const char *file, int line)
{
char *domain;
actor->type = switch_core_strdup(pool, type);
@ -1265,7 +1272,6 @@ static struct rayo_actor *rayo_actor_init(struct rayo_actor *actor, switch_memor
actor->seq = 1;
actor->ref_count = 1;
actor->destroy = 0;
switch_mutex_init(&actor->mutex, SWITCH_MUTEX_NESTED, pool);
actor->cleanup_fn = cleanup;
if (send == NULL) {
actor->send_fn = rayo_actor_send_ignore;
@ -1273,6 +1279,17 @@ static struct rayo_actor *rayo_actor_init(struct rayo_actor *actor, switch_memor
actor->send_fn = send;
}
actor->parent = parent;
if (!actor->parent) {
switch_mutex_init(&actor->mutex, SWITCH_MUTEX_NESTED, pool);
} else {
/* inherit mutex from parent */
actor->mutex = actor->parent->mutex;
/* prevent parent destruction */
RAYO_RETAIN(actor->parent);
}
/* add to hash of actors, so commands can route to call */
switch_mutex_lock(globals.actors_mutex);
if (!zstr(jid)) {
@ -1280,6 +1297,11 @@ static struct rayo_actor *rayo_actor_init(struct rayo_actor *actor, switch_memor
/* duplicate JID, give up! */
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_NOTICE, "JID conflict! %s\n", RAYO_JID(actor));
switch_mutex_unlock(globals.actors_mutex);
if (actor->parent) {
/* unlink from parent */
RAYO_RELEASE(actor->parent);
actor->parent = NULL;
}
return NULL;
}
switch_core_hash_insert(globals.actors, RAYO_JID(actor), actor);
@ -1313,7 +1335,7 @@ static struct rayo_call *rayo_call_init(struct rayo_call *call, switch_memory_po
}
call_jid = switch_mprintf("%s@%s", uuid, RAYO_JID(globals.server));
call = RAYO_CALL(rayo_actor_init(RAYO_ACTOR(call), pool, RAT_CALL, "", uuid, call_jid, rayo_call_cleanup, rayo_call_send, file, line));
call = RAYO_CALL(rayo_actor_init(RAYO_ACTOR(call), pool, RAT_CALL, "", uuid, call_jid, rayo_call_cleanup, rayo_call_send, NULL, file, line));
if (call) {
call->dcp_jid = "";
call->idle_start_time = switch_micro_time_now();
@ -1370,7 +1392,7 @@ static void rayo_mixer_cleanup(struct rayo_actor *actor)
static struct rayo_mixer *rayo_mixer_init(struct rayo_mixer *mixer, switch_memory_pool_t *pool, const char *name, const char *file, int line)
{
char *mixer_jid = switch_mprintf("%s@%s", name, RAYO_JID(globals.server));
mixer = RAYO_MIXER(rayo_actor_init(RAYO_ACTOR(mixer), pool, RAT_MIXER, "", name, mixer_jid, rayo_mixer_cleanup, rayo_mixer_send, file, line));
mixer = RAYO_MIXER(rayo_actor_init(RAYO_ACTOR(mixer), pool, RAT_MIXER, "", name, mixer_jid, rayo_mixer_cleanup, rayo_mixer_send, NULL, file, line));
if (mixer) {
switch_core_hash_init(&mixer->members);
switch_core_hash_init(&mixer->subscribers);
@ -1397,19 +1419,6 @@ static struct rayo_mixer *_rayo_mixer_create(const char *name, const char *file,
return mixer;
}
/**
* Clean up component before destruction
*/
static void rayo_component_cleanup(struct rayo_actor *actor)
{
if (RAYO_COMPONENT(actor)->cleanup_fn) {
RAYO_COMPONENT(actor)->cleanup_fn(actor);
}
/* parent can now be destroyed */
RAYO_RELEASE(RAYO_COMPONENT(actor)->parent);
}
/**
* Initialize Rayo component
* @param type of this component
@ -1430,13 +1439,10 @@ struct rayo_component *_rayo_component_init(struct rayo_component *component, sw
id = jid;
}
component = RAYO_COMPONENT(rayo_actor_init(RAYO_ACTOR(component), pool, type, subtype, id, jid, rayo_component_cleanup, rayo_component_send, file, line));
component = RAYO_COMPONENT(rayo_actor_init(RAYO_ACTOR(component), pool, type, subtype, id, jid, cleanup, rayo_component_send, parent, file, line));
if (component) {
RAYO_RETAIN(parent);
component->client_jid = switch_core_strdup(pool, client_jid);
component->ref = switch_core_strdup(pool, ref);
component->parent = parent;
component->cleanup_fn = cleanup;
}
switch_safe_free(ref);

View File

@ -106,6 +106,8 @@ struct rayo_actor {
rayo_actor_send_fn send_fn;
/** optional cleanup */
rayo_actor_cleanup_fn cleanup_fn;
/** optional parent */
struct rayo_actor *parent;
};
/**
@ -114,16 +116,12 @@ struct rayo_actor {
struct rayo_component {
/** base actor class */
struct rayo_actor base;
/** parent to this component */
struct rayo_actor *parent;
/** owning client JID */
const char *client_jid;
/** external ref */
const char *ref;
/** true if component has completed */
int complete;
/** optional cleanup */
rayo_actor_cleanup_fn cleanup_fn;
};
#define RAYO_ACTOR(x) ((struct rayo_actor *)x)

View File

@ -166,14 +166,14 @@ static void stop_cpa_detectors(struct cpa_component *cpa)
void *cpa_signal = NULL;
switch_core_hash_this(hi, &signal_type, NULL, &cpa_signal);
if (cpa_signal) {
rayo_cpa_detector_stop(RAYO_COMPONENT(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name);
unsubscribe(RAYO_COMPONENT(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name, RAYO_JID(cpa));
rayo_cpa_detector_stop(RAYO_ACTOR(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name);
unsubscribe(RAYO_ACTOR(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name, RAYO_JID(cpa));
}
}
switch_core_hash_destroy(&cpa->signals);
cpa->signals = NULL;
}
unsubscribe(RAYO_COMPONENT(cpa)->parent->id, "hangup", RAYO_JID(cpa));
unsubscribe(RAYO_ACTOR(cpa)->parent->id, "hangup", RAYO_JID(cpa));
}
/**
@ -197,7 +197,7 @@ static void rayo_cpa_detector_event(const char *jid, void *user_data)
switch_event_t *event = (switch_event_t *)user_data;
const char *signal_type = switch_event_get_header(event, "signal-type");
struct cpa_signal *cpa_signal = switch_core_hash_find(CPA_COMPONENT(component)->signals, signal_type);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(RAYO_COMPONENT(component)->parent->id), SWITCH_LOG_DEBUG, "Handling CPA event\n");
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(component->parent->id), SWITCH_LOG_DEBUG, "Handling CPA event\n");
if (cpa_signal) {
const char *value = switch_event_get_header(event, "value");
const char *duration = switch_event_get_header(event, "duration");
@ -234,7 +234,7 @@ static void rayo_cpa_detector_event(const char *jid, void *user_data)
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(RAYO_COMPONENT(component)->parent->id), SWITCH_LOG_DEBUG, "Skipping CPA event\n");
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(component->parent->id), SWITCH_LOG_DEBUG, "Skipping CPA event\n");
}
RAYO_RELEASE(component);
}

View File

@ -340,7 +340,7 @@ static iks *start_receivefax_component(struct rayo_actor *call, struct rayo_mess
static iks *stop_fax_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_core_session_t *session = switch_core_session_locate(RAYO_COMPONENT(component)->parent->id);
switch_core_session_t *session = switch_core_session_locate(component->parent->id);
FAX_COMPONENT(component)->stop = 1;
if (session) {
switch_core_session_execute_application_async(session, "stopfax", "");
@ -459,7 +459,7 @@ static void on_execute_complete_event(switch_event_t *event)
insert_fax_metadata(event, "fax_remote_station_id", complete);
/* flag faxing as done */
rayo_call_set_faxing(RAYO_CALL(RAYO_COMPONENT(component)->parent), 0);
rayo_call_set_faxing(RAYO_CALL(component->parent), 0);
rayo_component_send_complete_event(RAYO_COMPONENT(component), result);

View File

@ -762,7 +762,7 @@ static iks *stop_call_input_component(struct rayo_actor *component, struct rayo_
struct input_component *input_component = INPUT_COMPONENT(component);
if (input_component && !input_component->stop) {
switch_core_session_t *session = switch_core_session_locate(RAYO_COMPONENT(component)->parent->id);
switch_core_session_t *session = switch_core_session_locate(component->parent->id);
if (session) {
switch_mutex_lock(input_component->handler->mutex);
input_component->stop = 1;
@ -787,7 +787,7 @@ static iks *start_timers_call_input_component(struct rayo_actor *component, stru
iks *iq = msg->payload;
struct input_component *input_component = INPUT_COMPONENT(component);
if (input_component) {
switch_core_session_t *session = switch_core_session_locate(RAYO_COMPONENT(component)->parent->id);
switch_core_session_t *session = switch_core_session_locate(component->parent->id);
if (session) {
switch_mutex_lock(input_component->handler->mutex);
if (input_component->speech_mode) {

View File

@ -536,7 +536,7 @@ static switch_status_t rayo_file_close(switch_file_handle_t *handle)
} else {
if (!strcmp(RAYO_ACTOR(context->component)->type, RAT_CALL_COMPONENT)) {
/* call output... check for hangup */
switch_core_session_t *session = switch_core_session_locate(context->component->parent->id);
switch_core_session_t *session = switch_core_session_locate(RAYO_ACTOR(context->component)->parent->id);
if (session) {
if (switch_channel_get_state(switch_core_session_get_channel(session)) >= CS_HANGUP) {
rayo_component_send_complete(context->component, COMPONENT_COMPLETE_HANGUP);

View File

@ -126,14 +126,14 @@ static void start_input(struct prompt_component *prompt, int start_timers, int b
iks *input = iks_find(PROMPT_COMPONENT(prompt)->iq, "prompt");
input = iks_find(input, "input");
iks_insert_attrib(iq, "from", RAYO_JID(prompt));
iks_insert_attrib(iq, "to", RAYO_JID(RAYO_COMPONENT(prompt)->parent));
iks_insert_attrib(iq, "to", RAYO_JID(RAYO_ACTOR(prompt)->parent));
iks_insert_attrib_printf(iq, "id", "mod_rayo-prompt-%d", RAYO_SEQ_NEXT(prompt));
iks_insert_attrib(iq, "type", "set");
input = iks_copy_within(input, iks_stack(iq));
iks_insert_attrib(input, "start-timers", start_timers ? "true" : "false");
iks_insert_attrib(input, "barge-event", barge_event ? "true" : "false");
iks_insert_node(iq, input);
RAYO_SEND_MESSAGE(prompt, RAYO_JID(RAYO_COMPONENT(prompt)->parent), iq);
RAYO_SEND_MESSAGE(prompt, RAYO_JID(RAYO_ACTOR(prompt)->parent), iq);
}
/**
@ -281,7 +281,7 @@ static iks *prompt_component_handle_input_error(struct rayo_actor *prompt, struc
/* forward IQ error to client */
iq = PROMPT_COMPONENT(prompt)->iq;
iks_insert_attrib(iq, "from", RAYO_JID(RAYO_COMPONENT(prompt)->parent));
iks_insert_attrib(iq, "from", RAYO_JID(prompt->parent));
iks_insert_attrib(iq, "to", RAYO_COMPONENT(prompt)->client_jid);
iks_insert_node(iq, iks_copy_within(error, iks_stack(iq)));
RAYO_SEND_REPLY(prompt, RAYO_COMPONENT(prompt)->client_jid, iq);
@ -309,7 +309,7 @@ static iks *prompt_component_handle_input_error(struct rayo_actor *prompt, struc
/* forward IQ error to client */
iq = PROMPT_COMPONENT(prompt)->iq;
iks_insert_attrib(iq, "from", RAYO_JID(RAYO_COMPONENT(prompt)->parent));
iks_insert_attrib(iq, "from", RAYO_JID(prompt->parent));
iks_insert_attrib(iq, "to", RAYO_COMPONENT(prompt)->client_jid);
iks_insert_node(iq, iks_copy_within(error, iks_stack(iq)));
PROMPT_COMPONENT(prompt)->complete = iks_copy(iq);
@ -350,7 +350,7 @@ static iks *prompt_component_handle_output_error(struct rayo_actor *prompt, stru
/* forward IQ error to client */
iq = PROMPT_COMPONENT(prompt)->iq;
iks_insert_attrib(iq, "from", RAYO_JID(RAYO_COMPONENT(prompt)->parent));
iks_insert_attrib(iq, "from", RAYO_JID(prompt->parent));
iks_insert_attrib(iq, "to", RAYO_COMPONENT(prompt)->client_jid);
iks_insert_node(iq, iks_copy_within(error, iks_stack(iq)));
RAYO_SEND_REPLY(prompt, RAYO_COMPONENT(prompt)->client_jid, iq);

View File

@ -84,7 +84,7 @@ struct record_component {
static void complete_record(struct rayo_component *component, const char *reason, const char *reason_namespace)
{
switch_core_session_t *session = NULL;
const char *uuid = component->parent->id;
const char *uuid = RAYO_ACTOR(component)->parent->id;
const char *uri = RECORD_COMPONENT(component)->local_file_path;
iks *recording;
switch_size_t file_size = 0;
@ -296,7 +296,7 @@ static iks *start_call_record_component(struct rayo_actor *call, struct rayo_mes
static iks *stop_call_record_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
{
iks *iq = msg->payload;
switch_core_session_t *session = switch_core_session_locate(RAYO_COMPONENT(component)->parent->id);
switch_core_session_t *session = switch_core_session_locate(component->parent->id);
if (session) {
RECORD_COMPONENT(component)->stop = 1;
switch_ivr_stop_record_session(session, RAYO_ID(component));
@ -383,7 +383,7 @@ static int start_mixer_record(struct rayo_component *component)
char *args;
SWITCH_STANDARD_STREAM(stream);
args = switch_mprintf("%s recording start %s", component->parent->id, RAYO_ID(component));
args = switch_mprintf("%s recording start %s", RAYO_ACTOR(component)->parent->id, RAYO_ID(component));
switch_api_execute("conference", args, NULL, &stream);
switch_safe_free(args);
switch_safe_free(stream.data);
@ -439,7 +439,7 @@ static iks *stop_mixer_record_component(struct rayo_actor *component, struct ray
SWITCH_STANDARD_STREAM(stream);
RECORD_COMPONENT(component)->stop = 1;
args = switch_mprintf("%s recording stop %s", RAYO_COMPONENT(component)->parent->id, RAYO_ID(component));
args = switch_mprintf("%s recording stop %s", component->parent->id, RAYO_ID(component));
switch_api_execute("conference", args, NULL, &stream);
switch_safe_free(args);
switch_safe_free(stream.data);