add nifty new goodies

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10132 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2008-10-23 17:31:22 +00:00
parent 603b0d1872
commit a61cdc9a92
1 changed files with 280 additions and 13 deletions

View File

@ -47,7 +47,8 @@ typedef enum {
LFLAG_FULL = (1 << 4),
LFLAG_MYEVENTS = (1 << 5),
LFLAG_SESSION = (1 << 6),
LFLAG_ASYNC = (1 << 7)
LFLAG_ASYNC = (1 << 7),
LFLAG_STATEFUL = (1 << 8)
} event_flag_t;
typedef enum {
@ -72,6 +73,9 @@ struct listener {
int lost_events;
int lost_logs;
int hup;
time_t last_flush;
uint32_t timeout;
uint32_t id;
switch_sockaddr_t *sa;
char remote_ip[50];
switch_port_t remote_port;
@ -99,8 +103,21 @@ static struct {
int threads;
char *acl[MAX_ACL];
uint32_t acl_count;
uint32_t id;
} prefs;
static void remove_listener(listener_t *listener);
static uint32_t next_id(void)
{
uint32_t id;
switch_mutex_lock(listen_list.mutex);
id = ++prefs.id;
switch_mutex_unlock(listen_list.mutex);
return id;
}
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password);
@ -165,6 +182,14 @@ static void event_handler(switch_event_t *event)
continue;
}
if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && switch_timestamp(NULL) - l->last_flush > l->timeout) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Stateful Listener %u has expired\n", l->id);
remove_listener(l);
switch_thread_rwlock_unlock(l->rwlock);
switch_core_hash_destroy(&l->event_hash);
switch_core_destroy_memory_pool(&l->pool);
}
if (l->event_list[SWITCH_EVENT_ALL]) {
send = 1;
} else if ((l->event_list[event->event_id])) {
@ -347,18 +372,6 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown)
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
{
switch_application_interface_t *app_interface;
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
static void add_listener(listener_t *listener)
{
/* add me to the listeners so I get events */
@ -386,6 +399,24 @@ static void remove_listener(listener_t *listener)
switch_mutex_unlock(listen_list.mutex);
}
static listener_t *find_listener(uint32_t id)
{
listener_t *l, *r = NULL;
switch_mutex_lock(listen_list.mutex);
for (l = listen_list.listeners; l; l = l->next) {
if (l->id && l->id == id) {
if (switch_thread_rwlock_tryrdlock(l->rwlock) == SWITCH_STATUS_SUCCESS) {
r = l;
}
break;
}
}
switch_mutex_unlock(listen_list.mutex);
return r;
}
static void strip_cr(char *s)
{
char *p;
@ -394,6 +425,242 @@ static void strip_cr(char *s)
}
}
static void xmlize_listener(listener_t *listener, switch_stream_handle_t *stream)
{
stream->write_function(stream, " <listener>\n");
stream->write_function(stream, " <listen-id>%u</listen-id>\n", listener->id);
stream->write_function(stream, " <format>%s</format>\n", listener->format == EVENT_FORMAT_XML ? "xml" : "plain");
stream->write_function(stream, " <timeout>%u</timeout>\n", listener->timeout);
stream->write_function(stream, " </listener>\n");
}
SWITCH_STANDARD_API(event_manager_function)
{
char *http = NULL;
char *wcmd = NULL;
char *format = NULL;
listener_t *listener = NULL;
if (stream->param_event) {
http = switch_event_get_header(stream->param_event, "http-host");
wcmd = switch_event_get_header(stream->param_event, "command");
format = switch_event_get_header(stream->param_event, "format");
}
if (!http) {
stream->write_function(stream, "This is a web application.!\n");
return SWITCH_STATUS_SUCCESS;
}
stream->write_function(stream, "Content-Type: text/xml\n\n");
stream->write_function(stream, "<?xml version=\"1.0\"?>\n");
stream->write_function(stream, "<root>\n");
if (!wcmd) {
stream->write_function(stream, "<data><reply type=\"error\">Missing command parameter!</reply></data>\n");
goto end;
}
if (!format) {
format = "xml";
}
if (!strcasecmp(wcmd, "create-listener")) {
char *events = switch_event_get_header(stream->param_event, "events");
switch_memory_pool_t *pool;
char *next, *cur;
uint32_t count = 0, key_count = 0;
uint8_t custom = 0;
char *edup;
if (switch_strlen_zero(events)) {
stream->write_function(stream, "<data><reply type=\"error\">Missing parameter!</reply></data>\n");
goto end;
}
switch_core_new_memory_pool(&pool);
listener = switch_core_alloc(pool, sizeof(*listener));
listener->pool = pool;
listener->format = EVENT_FORMAT_PLAIN;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
switch_set_flag(listener, LFLAG_AUTHED);
switch_set_flag(listener, LFLAG_STATEFUL);
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener->pool);
switch_thread_rwlock_create(&listener->rwlock, listener->pool);
listener->id = next_id();
listener->timeout = 60;
listener->last_flush = switch_timestamp(NULL);
if (switch_stristr("xml", format)) {
listener->format = EVENT_FORMAT_XML;
} else {
listener->format = EVENT_FORMAT_PLAIN;
}
edup = strdup(events);
for (cur = edup; cur; count++) {
switch_event_types_t type;
if ((next = strchr(cur, ' '))) {
*next++ = '\0';
}
if (custom) {
switch_core_hash_insert(listener->event_hash, cur, MARKER);
} else if (switch_name_event(cur, &type) == SWITCH_STATUS_SUCCESS) {
key_count++;
if (type == SWITCH_EVENT_ALL) {
uint32_t x = 0;
for (x = 0; x < SWITCH_EVENT_ALL; x++) {
listener->event_list[x] = 1;
}
}
if (type <= SWITCH_EVENT_ALL) {
listener->event_list[type] = 1;
}
if (type == SWITCH_EVENT_CUSTOM) {
custom++;
}
}
cur = next;
}
switch_safe_free(edup);
if (!key_count) {
switch_core_hash_destroy(&listener->event_hash);
switch_core_destroy_memory_pool(&listener->pool);
stream->write_function(stream, "<data><reply type=\"error\">No keywords supplied</reply></data>\n");
goto end;
}
switch_set_flag_locked(listener, LFLAG_EVENTS);
add_listener(listener);
stream->write_function(stream, "<data>\n");
stream->write_function(stream, " <reply type=\"success\">Listener %u Created</reply>\n", listener->id);
xmlize_listener(listener, stream);
stream->write_function(stream, "</data>\n");
goto end;
} else if (!strcasecmp(wcmd, "destroy-listener")) {
char *id = switch_event_get_header(stream->param_event, "listen-id");
uint32_t idl = (uint32_t) atol(id);
if ((listener = find_listener(idl))) {
remove_listener(listener);
stream->write_function(stream, "<data>\n <reply type=\"success\">listener %u destroyed</reply>\n", listener->id);
xmlize_listener(listener, stream);
stream->write_function(stream, "</data>\n");
switch_thread_rwlock_unlock(listener->rwlock);
switch_core_hash_destroy(&listener->event_hash);
switch_core_destroy_memory_pool(&listener->pool);
goto end;
} else {
stream->write_function(stream, "<data><reply type=\"error\">Can't find listener</reply></data>\n");
goto end;
}
} else if (!strcasecmp(wcmd, "check-listener")) {
char *id = switch_event_get_header(stream->param_event, "listen-id");
uint32_t idl = (uint32_t) atol(id);
void *pop;
switch_event_t *pevent;
if (!(listener = find_listener(idl))) {
stream->write_function(stream, "<data><reply type=\"error\">Can't find listener</reply></data>\n");
goto end;
}
listener->last_flush = switch_timestamp(NULL);
stream->write_function(stream, "<data>\n <reply type=\"success\">Current Events Follow</reply>\n");
xmlize_listener(listener, stream);
stream->write_function(stream, "<events>\n");
while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
pevent = (switch_event_t *) pop;
char *etype;
if (listener->format == EVENT_FORMAT_PLAIN) {
etype = "plain";
switch_event_serialize(pevent, &listener->ebuf, SWITCH_TRUE);
stream->write_function(stream, "<event type=\"plain\">\n%s</event>", listener->ebuf);
} else {
switch_xml_t xml;
etype = "xml";
if ((xml = switch_event_xmlize(pevent, "%s", ""))) {
listener->ebuf = switch_xml_toxml(xml, SWITCH_FALSE);
switch_xml_free(xml);
} else {
stream->write_function(stream, "-ERR XML Error\n");
break;
}
stream->write_function(stream, "%s\n", listener->ebuf);
}
switch_safe_free(listener->ebuf);
switch_event_destroy(&pevent);
}
stream->write_function(stream, " </events>\n</data>\n");
if (pevent) {
switch_event_destroy(&pevent);
}
switch_thread_rwlock_unlock(listener->rwlock);
} else if (!strcasecmp(wcmd, "exec-fsapi")) {
char *api_command = switch_event_get_header(stream->param_event, "fsapi-command");
char *api_args = switch_event_get_header(stream->param_event, "fsapi-args");
switch_event_t *event, *oevent;
if (!(api_command)) {
stream->write_function(stream, "<data><reply type=\"error\">INVALID API COMMAND!</reply></data>\n");
goto end;
}
stream->write_function(stream, "<data>\n <reply type=\"success\">Execute API Command</reply>\n<api-command>\n");
switch_event_create(&event, SWITCH_EVENT_REQUEST_PARAMS);
oevent = stream->param_event;
stream->param_event = event;
switch_api_execute(api_command, api_args, NULL, stream);
stream->param_event = oevent;
stream->write_function(stream, " </api-command>\n</data>");
} else {
stream->write_function(stream, "<data><reply type=\"error\">INVALID COMMAND!</reply></data\n");
}
end:
stream->write_function(stream, "</root>\n\n");
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
{
switch_application_interface_t *app_interface;
switch_api_interface_t *api_interface;
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
SWITCH_ADD_API(api_interface, "event_manager", "event_manager", event_manager_function, "<web data>");
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t timeout)
{
switch_size_t mlen, bytes = 0;