add event socket listener filters

filter <header-name> <header-value>
filter <header-name> /<regexp>/
filter delete <header-name>
filter delete all

once even one filter is enabled you must pass a filter condition to get any events.
which events are still limited by the events <event list> as always.



git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10198 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2008-10-30 00:39:55 +00:00
parent f6e0a294ae
commit 0a5a51723d
2 changed files with 90 additions and 8 deletions

View File

@ -63,6 +63,7 @@ struct listener {
switch_memory_pool_t *pool;
event_format_t format;
switch_mutex_t *flag_mutex;
switch_mutex_t *filter_mutex;
uint32_t flags;
switch_log_level_t level;
char *ebuf;
@ -79,6 +80,7 @@ struct listener {
switch_sockaddr_t *sa;
char remote_ip[50];
switch_port_t remote_port;
switch_event_t *filters;
struct listener *next;
};
@ -189,7 +191,11 @@ static void expire_listener(listener_t **listener)
switch_thread_rwlock_unlock((*listener)->rwlock);
switch_core_hash_destroy(&(*listener)->event_hash);
switch_core_destroy_memory_pool(&(*listener)->pool);
switch_mutex_lock((*listener)->filter_mutex);
if ((*listener)->filters) {
switch_event_destroy(&(*listener)->filters);
}
switch_mutex_unlock((*listener)->filter_mutex);
*listener = NULL;
}
@ -223,7 +229,8 @@ static void event_handler(switch_event_t *event)
expire_listener(&l);
continue;
}
if (l->event_list[SWITCH_EVENT_ALL]) {
send = 1;
} else if ((l->event_list[event->event_id])) {
@ -232,6 +239,29 @@ static void event_handler(switch_event_t *event)
}
}
if (send && l->filters) {
switch_event_header_t *hp;
const char *hval;
send = 0;
switch_mutex_lock(l->filter_mutex);
for (hp = l->filters->headers; hp; hp = hp->next) {
if ((hval = switch_event_get_header(event, hp->name))) {
if (*hp->value == '/') {
switch_regex_t *re = NULL;
int ovector[30];
send = switch_regex_perform(hval, hp->value, &re, ovector, sizeof(ovector) / sizeof(ovector[0]));
switch_regex_safe_free(re);
} else {
if (!strcasecmp(hp->value, hval)) {
send = 1;
}
}
}
}
switch_mutex_unlock(l->filter_mutex);
}
if (send && switch_test_flag(l, LFLAG_MYEVENTS)) {
char *uuid = switch_event_get_header(event, "unique-id");
if (!uuid || strcmp(uuid, switch_core_session_get_uuid(l->session))) {
@ -341,6 +371,8 @@ SWITCH_STANDARD_APP(socket_function)
listener->session = session;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
switch_set_flag(listener, LFLAG_AUTHED);
for (x = 1; x < argc; x++) {
@ -520,6 +552,8 @@ SWITCH_STANDARD_API(event_manager_function)
listener->pool = pool;
listener->format = EVENT_FORMAT_PLAIN;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
switch_set_flag(listener, LFLAG_AUTHED);
switch_set_flag(listener, LFLAG_STATEFUL);
@ -1049,6 +1083,44 @@ static switch_status_t parse_command(listener_t *listener, switch_event_t **even
goto done;
}
if (!strncasecmp(cmd, "filter ", 7)) {
char *header_name = cmd + 7;
char *header_val;
strip_cr(header_name);
while(header_name && *header_name && *header_name == ' ') header_name++;
if (!(header_val = strchr(header_name, ' '))) {
switch_snprintf(reply, reply_len, "-ERR invalid syntax");
goto done;
}
*header_val++ = '\0';
switch_mutex_lock(listener->filter_mutex);
if (!listener->filters) {
switch_event_create(&listener->filters, SWITCH_EVENT_CHANNEL_DATA);
}
if (!strcasecmp(header_name, "delete")) {
if (!strcasecmp(header_val, "all")) {
switch_event_destroy(&listener->filters);
switch_event_create(&listener->filters, SWITCH_EVENT_CHANNEL_DATA);
} else {
switch_event_del_header(listener->filters, header_val);
}
switch_snprintf(reply, reply_len, "+OK filter deleted. [%s]", header_val);
} else {
switch_event_add_header_string(listener->filters, SWITCH_STACK_BOTTOM, header_name, header_val);
switch_snprintf(reply, reply_len, "+OK filter added. [%s]=[%s]", header_name, header_val);
}
switch_mutex_unlock(listener->filter_mutex);
goto done;
}
if (listener->session || !strncasecmp(cmd, "myevents ", 9)) {
switch_channel_t *channel = NULL;
@ -1651,7 +1723,11 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
switch_thread_rwlock_wrlock(listener->rwlock);
flush_listener(listener, SWITCH_TRUE, SWITCH_TRUE);
switch_mutex_lock(listener->filter_mutex);
if (listener->filters) {
switch_event_destroy(&listener->filters);
}
switch_mutex_unlock(listener->filter_mutex);
if (listener->sock) {
char disco_buf[512] = "";
const char message[] = "Disconnected, goodbye!\nSee you at ClueCon http://www.cluecon.com!\n";
@ -1831,6 +1907,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
listener_pool = NULL;
listener->format = EVENT_FORMAT_PLAIN;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock);
switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);

View File

@ -605,13 +605,17 @@ SWITCH_DECLARE(char *) switch_event_get_body(switch_event_t *event)
SWITCH_DECLARE(switch_status_t) switch_event_del_header(switch_event_t *event, const char *header_name)
{
switch_event_header_t *hp, *lp = NULL;
switch_event_header_t *hp, *lp = NULL, *tp;
switch_status_t status = SWITCH_STATUS_FALSE;
int x = 0;
for (hp = event->headers; hp; hp = hp->next) {
tp = event->headers;
while (tp) {
hp = tp;
tp = tp->next;
x++;
switch_assert(x < 1000);
if (!strcmp(header_name, hp->name)) {
if (!strcasecmp(header_name, hp->name)) {
if (lp) {
lp->next = hp->next;
} else {
@ -627,9 +631,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_del_header(switch_event_t *event, c
FREE(hp);
}
status = SWITCH_STATUS_SUCCESS;
break;
} else {
lp = hp;
}
lp = hp;
}
return status;