From 5db04d7d0f9d001c31eb4a844f69245c36928b69 Mon Sep 17 00:00:00 2001 From: Chris Rienzo Date: Fri, 20 Jan 2017 08:50:42 -0500 Subject: [PATCH] FS-9965 [mod_hiredis] Improvements for performance, memory utilization, and resiliency. Pipelining of requests to improve throughput -- new parameter max-pipelined-requests (default 20) for maximum number of requests to batch at one time Deletion of counter keys when zero -- new parameter delete-when-zero (default false) to enable. This will cause a redis eval to execute to decrement counters instead of DECR. Detection of negative limit counters -- Self healing of negative counters (due to key eviction, etc) --- src/mod/applications/mod_hiredis/Makefile.am | 2 +- .../mod_hiredis/hiredis_pipeline.c | 290 ++++++++++++++++++ .../mod_hiredis/hiredis_profile.c | 178 ++++++++--- .../applications/mod_hiredis/hiredis_utils.c | 12 +- .../applications/mod_hiredis/mod_hiredis.c | 274 ++++++++++------- .../applications/mod_hiredis/mod_hiredis.h | 83 +++-- 6 files changed, 667 insertions(+), 172 deletions(-) create mode 100644 src/mod/applications/mod_hiredis/hiredis_pipeline.c diff --git a/src/mod/applications/mod_hiredis/Makefile.am b/src/mod/applications/mod_hiredis/Makefile.am index 5b6e6b7fa8..2fec98d209 100644 --- a/src/mod/applications/mod_hiredis/Makefile.am +++ b/src/mod/applications/mod_hiredis/Makefile.am @@ -4,7 +4,7 @@ MODNAME=mod_hiredis if HAVE_HIREDIS mod_LTLIBRARIES = mod_hiredis.la -mod_hiredis_la_SOURCES = mod_hiredis.c hiredis_utils.c hiredis_profile.c +mod_hiredis_la_SOURCES = mod_hiredis.c hiredis_utils.c hiredis_profile.c hiredis_pipeline.c mod_hiredis_la_CFLAGS = $(AM_CFLAGS) $(HIREDIS_CFLAGS) mod_hiredis_la_LIBADD = $(switch_builddir)/libfreeswitch.la mod_hiredis_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(HIREDIS_LIBS) $(SWITCH_AM_LDFLAGS) diff --git a/src/mod/applications/mod_hiredis/hiredis_pipeline.c b/src/mod/applications/mod_hiredis/hiredis_pipeline.c new file mode 100644 index 0000000000..dc09d34362 --- /dev/null +++ b/src/mod/applications/mod_hiredis/hiredis_pipeline.c @@ -0,0 +1,290 @@ +/* +* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application +* Copyright (C) 2005-2016, Anthony Minessale II +* +* Version: MPL 1.1 +* +* The contents of this file are subject to the Mozilla Public License Version +* 1.1 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* http://www.mozilla.org/MPL/ +* +* Software distributed under the License is distributed on an "AS IS" basis, +* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +* for the specific language governing rights and limitations under the +* License. +* +* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application +* +* The Initial Developer of the Original Code is +* Anthony Minessale II +* Portions created by the Initial Developer are Copyright (C) +* the Initial Developer. All Rights Reserved. +* +* Contributor(s): +* +* Christopher Rienzo +* +* hiredis_pipeline.c -- batched operations to redis +* +*/ + +#include + +/** + * Thread that processes redis requests + * @param thread this thread + * @param obj the profile + * @return NULL + */ +static void *SWITCH_THREAD_FUNC pipeline_thread(switch_thread_t *thread, void *obj) +{ + hiredis_profile_t *profile = (hiredis_profile_t *)obj; + switch_thread_rwlock_rdlock(profile->pipeline_lock); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Redis pipeline thread started for [%s]\n", profile->name); + + while ( profile->pipeline_running || switch_queue_size(profile->active_requests) > 0 ) { + void *val = NULL; + if ( switch_queue_pop_timeout(profile->active_requests, &val, 500 * 1000) == SWITCH_STATUS_SUCCESS && val ) { + int request_count = 1; + hiredis_request_t *requests = (hiredis_request_t *)val; + hiredis_request_t *cur_request = requests; + cur_request->next = NULL; + /* This would be easier to code in reverse order, but I prefer to execute requests in the order that they arrive */ + while ( request_count < profile->max_pipelined_requests ) { + if ( switch_queue_trypop(profile->active_requests, &val) == SWITCH_STATUS_SUCCESS && val ) { + request_count++; + cur_request = cur_request->next = (hiredis_request_t *)val; + cur_request->next = NULL; + } else { + break; + } + } + hiredis_profile_execute_requests(profile, NULL, requests); + cur_request = requests; + while ( cur_request ) { + hiredis_request_t *next_request = cur_request->next; /* done here to prevent race with waiter */ + if ( cur_request->response ) { + /* signal waiter */ + switch_mutex_lock(cur_request->mutex); + cur_request->done = 1; + switch_thread_cond_signal(cur_request->cond); + switch_mutex_unlock(cur_request->mutex); + } else { + /* nobody to signal, clean it up */ + switch_safe_free(cur_request->request); + switch_safe_free(cur_request->keys); + switch_safe_free(cur_request->session_uuid); + switch_queue_trypush(profile->request_pool, cur_request); + } + cur_request = next_request; + } + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Redis pipeline thread ended for [%s]\n", profile->name); + + switch_thread_rwlock_unlock(profile->pipeline_lock); + + return NULL; +} + +/** + * Add a pipeline thread to the profile's thread pool + */ +void hiredis_pipeline_thread_start(hiredis_profile_t *profile) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + profile->pipeline_running = 1; + switch_threadattr_create(&thd_attr, profile->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, pipeline_thread, profile, profile->pool); +} + +/** + * Wait for all pipeline threads to complete + */ +void hiredis_pipeline_threads_stop(hiredis_profile_t *profile) +{ + if ( profile->pipeline_running ) { + profile->pipeline_running = 0; + switch_queue_interrupt_all(profile->active_requests); + switch_thread_rwlock_wrlock(profile->pipeline_lock); + } +} + +/** + * Execute pipelined request and wait for response. + * @param profile to use + * @param session (optional) + * @param request - the request + * @return status SWITCH_STATUS_SUCCESS if successful + */ +static switch_status_t hiredis_profile_execute_pipeline_request(hiredis_profile_t *profile, switch_core_session_t *session, hiredis_request_t *request) +{ + switch_status_t status; + + /* send request to thread pool */ + if ( profile->pipeline_running && switch_queue_trypush(profile->active_requests, request) == SWITCH_STATUS_SUCCESS ) { + if ( request->response ) { + /* wait for response */ + switch_mutex_lock(request->mutex); + while ( !request->done ) { + switch_thread_cond_timedwait(request->cond, request->mutex, 1000 * 1000); + } + + /* get response */ + switch_mutex_unlock(request->mutex); + status = request->status; + + /* save back to pool */ + switch_queue_trypush(profile->request_pool, request); + } else { + status = SWITCH_STATUS_SUCCESS; + } + } else { + /* failed... do sync request instead */ + status = hiredis_profile_execute_sync(profile, session, request->response, request->request); + if ( !request->response ) { + switch_safe_free(request->request); + switch_safe_free(request->keys); + switch_safe_free(request->session_uuid); + } + switch_queue_trypush(profile->request_pool, request); + } + return status; +} + +/** + * Execute pipelined request and wait for response. + * @param profile to use + * @param session (optional) + * @param resp (optional) - if no resp, this function will not wait for the result + * @param request_string - the request + * @return status SWITCH_STATUS_SUCCESS if successful + */ +static switch_status_t hiredis_profile_execute_pipeline(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *request_string) +{ + void *val = NULL; + hiredis_request_t *request = NULL; + + if (switch_queue_trypop(profile->request_pool, &val) == SWITCH_STATUS_SUCCESS && val) { + request = (hiredis_request_t *)val; + } else { + request = switch_core_alloc(profile->pool, sizeof(*request)); + switch_thread_cond_create(&request->cond, profile->pool); + switch_mutex_init(&request->mutex, SWITCH_MUTEX_UNNESTED, profile->pool); + } + request->response = resp; + request->done = 0; + request->do_eval = 0; + request->num_keys = 0; + request->keys = NULL; + request->status = SWITCH_STATUS_SUCCESS; + request->next = NULL; + request->session_uuid = NULL; + if ( resp ) { + /* will block, no need to dup memory */ + request->request = (char *)request_string; + if ( session ) { + request->session_uuid = switch_core_session_get_uuid(session); + } + } else { + /* fire and forget... need to dup memory */ + request->request = strdup(request_string); + if ( session ) { + request->session_uuid = strdup(switch_core_session_get_uuid(session)); + } + } + + return hiredis_profile_execute_pipeline_request(profile, session, request); +} + +/** + * Execute pipelined eval and wait for response. + * @param profile to use + * @param session (optional) + * @param resp (optional) - if no resp, this function will not wait for the result + * @param script + * @param num_keys + * @param keys + * @return status SWITCH_STATUS_SUCCESS if successful + */ +switch_status_t hiredis_profile_eval_pipeline(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *script, int num_keys, const char *keys) +{ + void *val = NULL; + hiredis_request_t *request = NULL; + + if (switch_queue_trypop(profile->request_pool, &val) == SWITCH_STATUS_SUCCESS && val) { + request = (hiredis_request_t *)val; + } else { + request = switch_core_alloc(profile->pool, sizeof(*request)); + switch_thread_cond_create(&request->cond, profile->pool); + switch_mutex_init(&request->mutex, SWITCH_MUTEX_UNNESTED, profile->pool); + } + request->response = resp; + request->done = 0; + request->do_eval = 1; + request->num_keys = num_keys; + request->status = SWITCH_STATUS_SUCCESS; + request->next = NULL; + request->session_uuid = NULL; + if ( resp ) { + /* will block, no need to dup memory */ + request->request = (char *)script; + request->keys = (char *)keys; + if ( session ) { + request->session_uuid = switch_core_session_get_uuid(session); + } + } else { + /* fire and forget... need to dup memory */ + request->request = strdup(script); + request->keys = strdup(keys); + if ( session ) { + request->session_uuid = strdup(switch_core_session_get_uuid(session)); + } + } + + return hiredis_profile_execute_pipeline_request(profile, session, request); +} + +/** + * Execute pipelined request and wait for response. + * @param profile to use + * @param session (optional) + * @param resp (optional) - if no resp, this function will not wait for the result + * @param format_string - the request + * @return status SWITCH_STATUS_SUCCESS if successful + */ +switch_status_t hiredis_profile_execute_pipeline_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *format_string, ...) +{ + switch_status_t result = SWITCH_STATUS_GENERR; + char *request = NULL; + va_list ap; + int ret; + + va_start(ap, format_string); + ret = switch_vasprintf(&request, format_string, ap); + va_end(ap); + + if ( ret != -1 ) { + result = hiredis_profile_execute_pipeline(profile, session, resp, request); + } + switch_safe_free(request); + return result; +} + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/src/mod/applications/mod_hiredis/hiredis_profile.c b/src/mod/applications/mod_hiredis/hiredis_profile.c index db7ce10bb6..b6b548e1ce 100644 --- a/src/mod/applications/mod_hiredis/hiredis_profile.c +++ b/src/mod/applications/mod_hiredis/hiredis_profile.c @@ -62,10 +62,12 @@ static switch_status_t hiredis_context_reconnect(hiredis_context_t *context) /* Return a context back to the pool */ static void hiredis_context_release(hiredis_context_t *context, switch_core_session_t *session) { - if (switch_queue_push(context->connection->context_pool, context) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "hiredis: failed to release back to pool [%s, %d]\n", context->connection->host, context->connection->port); - } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "hiredis: release back to pool [%s, %d]\n", context->connection->host, context->connection->port); + if (context) { + if (switch_queue_push(context->connection->context_pool, context) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "hiredis: failed to release back to pool [%s, %d]\n", context->connection->host, context->connection->port); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "hiredis: release back to pool [%s, %d]\n", context->connection->host, context->connection->port); + } } } @@ -113,7 +115,7 @@ static hiredis_context_t *hiredis_connection_get_context(hiredis_connection_t *c return NULL; } -switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t ignore_connect_fail, uint8_t ignore_error) +switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t ignore_connect_fail, uint8_t ignore_error, int max_pipelined_requests, int delete_when_zero) { hiredis_profile_t *profile = NULL; switch_memory_pool_t *pool = NULL; @@ -127,6 +129,13 @@ switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *na profile->conn_head = NULL; profile->ignore_connect_fail = ignore_connect_fail; profile->ignore_error = ignore_error; + profile->delete_when_zero = delete_when_zero; + + profile->pipeline_running = 0; + profile->max_pipelined_requests = max_pipelined_requests; + switch_thread_rwlock_create(&profile->pipeline_lock, pool); + switch_queue_create(&profile->request_pool, 2000, pool); + switch_queue_create(&profile->active_requests, 2000, pool); switch_core_hash_insert(mod_hiredis_globals.profiles, name, (void *) profile); @@ -146,6 +155,8 @@ switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile) *old_profile = NULL; } + hiredis_pipeline_threads_stop(profile); + switch_core_hash_delete(mod_hiredis_globals.profiles, profile->name); switch_core_destroy_memory_pool(&(profile->pool)); @@ -174,6 +185,8 @@ switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char new_context->connection = new_conn; new_context->context = NULL; switch_queue_push(new_conn->context_pool, new_context); + + hiredis_pipeline_thread_start(profile); } } @@ -189,7 +202,7 @@ switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "hiredis: adding conn[%s,%d], pool size = %d\n", new_conn->host, new_conn->port, max_contexts); - if ( profile->conn_head != NULL ){ + if ( profile->conn_head != NULL ) { /* Adding 'another' connection */ connection = profile->conn_head; while ( connection->next != NULL ) { @@ -221,61 +234,118 @@ static hiredis_context_t *hiredis_profile_get_context(hiredis_profile_t *profile return NULL; } -static switch_status_t hiredis_context_execute_sync(hiredis_context_t *context, const char *data, char **resp, switch_core_session_t *session) +static void hiredis_parse_response(hiredis_request_t *request, redisReply *response) { - redisReply *response; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "hiredis: %s\n", data); - response = redisCommand(context->context, data); if ( !response ) { - *resp = NULL; - return SWITCH_STATUS_GENERR; + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(request->session_uuid), SWITCH_LOG_ERROR, "hiredis: no response\n"); + if ( request->response ) { + *(request->response) = NULL; + } + request->status = SWITCH_STATUS_GENERR; + return; } switch(response->type) { case REDIS_REPLY_STATUS: /* fallthrough */ case REDIS_REPLY_STRING: - *resp = strdup(response->str); + if ( request->response ) { + *(request->response ) = strdup(response->str); + } + request->status = SWITCH_STATUS_SUCCESS; break; case REDIS_REPLY_INTEGER: - *resp = switch_mprintf("%lld", response->integer); + if ( request->response ) { + *(request->response) = switch_mprintf("%lld", response->integer); + } + request->status = SWITCH_STATUS_SUCCESS; break; case REDIS_REPLY_NIL: - *resp = NULL; + if ( request->response ) { + *(request->response) = NULL; + } + request->status = SWITCH_STATUS_SUCCESS; break; case REDIS_REPLY_ERROR: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: error response[%s][%d]\n", response->str, response->type); - freeReplyObject(response); - *resp = NULL; - return SWITCH_STATUS_GENERR; + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(request->session_uuid), SWITCH_LOG_ERROR, "hiredis: error response[%s][%d]\n", response->str, response->type); + if ( request->response ) { + if (!zstr(response->str)) { + *(request->response) = strdup(response->str); + } else { + *(request->response) = NULL; + } + } + request->status = SWITCH_STATUS_GENERR; + break; + case REDIS_REPLY_ARRAY: + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(request->session_uuid), SWITCH_LOG_WARNING, "hiredis: unsupported array response[%d]\n", response->type); + if ( request->response ) { + *(request->response) = NULL; + } + request->status = SWITCH_STATUS_IGNORE; + break; default: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "hiredis: unsupported response[%s][%d]\n", response->str, response->type); - freeReplyObject(response); - *resp = NULL; - return SWITCH_STATUS_IGNORE; + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(request->session_uuid), SWITCH_LOG_WARNING, "hiredis: unsupported response[%d]\n", response->type); + if ( request->response ) { + *(request->response) = NULL; + } + request->status = SWITCH_STATUS_IGNORE; + break; } - - freeReplyObject(response); - return SWITCH_STATUS_SUCCESS; } -switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **resp, switch_core_session_t *session) +static switch_status_t hiredis_context_execute_requests(hiredis_context_t *context, hiredis_request_t *requests) { - hiredis_context_t *context = NULL; - int reconnected = 0; + hiredis_request_t *cur_request; + int ok = 1; + + for ( cur_request = requests; cur_request; cur_request = cur_request->next ) { + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(cur_request->session_uuid), SWITCH_LOG_DEBUG, "hiredis: %s\n", cur_request->request); + if ( cur_request->do_eval ) { + /* eval needs special formatting to work properly */ + redisAppendCommand(context->context, "eval %s %d %s", cur_request->request, cur_request->num_keys, cur_request->keys ? cur_request->keys : ""); + } else { + redisAppendCommand(context->context, cur_request->request); + } + } + + for ( cur_request = requests; cur_request; cur_request = cur_request->next ) { + redisReply *response = NULL; + int ret = redisGetReply(context->context, (void **)&response); + if ( ret == REDIS_OK ) { + hiredis_parse_response(cur_request, response); + } else { + ok = 0; + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(cur_request->session_uuid), SWITCH_LOG_ERROR, "hiredis: failed to get reply\n"); + cur_request->status = SWITCH_STATUS_GENERR; + } + if ( response ) { + freeReplyObject(response); + } + } + + if ( ok ) { + return SWITCH_STATUS_SUCCESS; + } + + return SWITCH_STATUS_GENERR; +} + +switch_status_t hiredis_profile_execute_requests(hiredis_profile_t *profile, switch_core_session_t *session, hiredis_request_t *requests) +{ + hiredis_context_t *context = hiredis_profile_get_context(profile, NULL, session); + int reconnected = 0; + hiredis_request_t *cur_request; - context = hiredis_profile_get_context(profile, NULL, session); while (context) { - if (hiredis_context_execute_sync(context, data, resp, session) == SWITCH_STATUS_SUCCESS) { - /* got result */ + if (hiredis_context_execute_requests(context, requests) == SWITCH_STATUS_SUCCESS) { hiredis_context_release(context, session); return SWITCH_STATUS_SUCCESS; - } else if (context->context->err) { + } else if ( context->context->err ) { /* have a bad connection, try a single reconnect attempt before moving on to alternate connection */ if (reconnected || hiredis_context_reconnect(context) != SWITCH_STATUS_SUCCESS) { /* try alternate connection */ - hiredis_context_t *new_context = hiredis_profile_get_context(profile, context->connection, session); hiredis_context_release(context, session); - context = new_context; + context = hiredis_profile_get_context(profile, context->connection, session); if (context) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "hiredis: got alternate connection to [%s, %d]\n", context->connection->host, context->connection->port); } else { @@ -287,14 +357,50 @@ switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const c reconnected = 1; } } else { - /* no problem with context, so don't retry */ hiredis_context_release(context, session); + /* mark all requests as failed */ + for ( cur_request = requests; cur_request; cur_request = cur_request->next ) { + cur_request->status = SWITCH_STATUS_GENERR; + } return SWITCH_STATUS_GENERR; } } + /* mark all requests as failed */ + for ( cur_request = requests; cur_request; cur_request = cur_request->next ) { + cur_request->status = SWITCH_STATUS_SOCKERR; + } return SWITCH_STATUS_SOCKERR; } +switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *data) +{ + hiredis_request_t request = { 0 }; + request.response = resp; + request.request = (char *)data; + request.next = NULL; + request.session_uuid = session ? (char *)switch_core_session_get_uuid(session) : NULL; + hiredis_profile_execute_requests(profile, session, &request); + return request.status; +} + +switch_status_t hiredis_profile_execute_sync_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **resp, const char *format_string, ...) +{ + switch_status_t result = SWITCH_STATUS_GENERR; + char *data = NULL; + va_list ap; + int ret; + + va_start(ap, format_string); + ret = switch_vasprintf(&data, format_string, ap); + va_end(ap); + + if (ret != -1) { + result = hiredis_profile_execute_sync(profile, session, resp, data); + } + switch_safe_free(data); + return result; +} + /* For Emacs: * Local Variables: diff --git a/src/mod/applications/mod_hiredis/hiredis_utils.c b/src/mod/applications/mod_hiredis/hiredis_utils.c index 4dc9ce3973..95e1503a0c 100644 --- a/src/mod/applications/mod_hiredis/hiredis_utils.c +++ b/src/mod/applications/mod_hiredis/hiredis_utils.c @@ -47,6 +47,8 @@ switch_status_t mod_hiredis_do_config() hiredis_profile_t *new_profile = NULL; uint8_t ignore_connect_fail = 0; uint8_t ignore_error = 0; + int max_pipelined_requests = 0; + int delete_when_zero = 0; char *name = (char *) switch_xml_attr_soft(profile, "name"); // Load params @@ -57,11 +59,19 @@ switch_status_t mod_hiredis_do_config() ignore_connect_fail = switch_true(switch_xml_attr_soft(param, "value")); } else if ( !strncmp(var, "ignore-error", 12) ) { ignore_error = switch_true(switch_xml_attr_soft(param, "value")); + } else if ( !strncmp(var, "max-pipelined-requests", 22) ) { + max_pipelined_requests = atoi(switch_xml_attr_soft(param, "value")); + } else if ( !strncmp(var, "delete-when-zero", 16) ) { + delete_when_zero = switch_true(switch_xml_attr_soft(param, "value")); } } } - if ( hiredis_profile_create(&new_profile, name, ignore_connect_fail, ignore_error) == SWITCH_STATUS_SUCCESS ) { + if (max_pipelined_requests <= 0) { + max_pipelined_requests = 20; + } + + if ( hiredis_profile_create(&new_profile, name, ignore_connect_fail, ignore_error, max_pipelined_requests, delete_when_zero) == SWITCH_STATUS_SUCCESS ) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Created profile[%s]\n", name); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create profile[%s]\n", name); diff --git a/src/mod/applications/mod_hiredis/mod_hiredis.c b/src/mod/applications/mod_hiredis/mod_hiredis.c index 89a83b1165..6fe1a2db70 100644 --- a/src/mod/applications/mod_hiredis/mod_hiredis.c +++ b/src/mod/applications/mod_hiredis/mod_hiredis.c @@ -37,6 +37,68 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_hiredis_shutdown); SWITCH_MODULE_LOAD_FUNCTION(mod_hiredis_load); SWITCH_MODULE_DEFINITION(mod_hiredis, mod_hiredis_load, mod_hiredis_shutdown, NULL); +#define DECR_DEL_SCRIPT "local v=redis.call(\"decr\",KEYS[1]);if v <= 0 then redis.call(\"del\",KEYS[1]) end;return v;" + +/** + * Get exclusive access to limit_pvt, if it exists + */ +static hiredis_limit_pvt_t *get_limit_pvt(switch_core_session_t *session) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + + hiredis_limit_pvt_t *limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt"); + if (limit_pvt) { + /* pvt already exists, return it */ + switch_mutex_lock(limit_pvt->mutex); + return limit_pvt; + } + return NULL; +} + +/** + * Add limit_pvt and get exclusive access to it + */ +static hiredis_limit_pvt_t *add_limit_pvt(switch_core_session_t *session) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + + hiredis_limit_pvt_t *limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt"); + if (limit_pvt) { + /* pvt already exists, return it */ + switch_mutex_lock(limit_pvt->mutex); + return limit_pvt; + } + + /* not created yet, add it - NOTE a channel mutex would be better here if we had access to it */ + switch_mutex_lock(mod_hiredis_globals.limit_pvt_mutex); + limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt"); + if (limit_pvt) { + /* was just added by another thread */ + switch_mutex_unlock(mod_hiredis_globals.limit_pvt_mutex); + switch_mutex_lock(limit_pvt->mutex); + return limit_pvt; + } + + /* still not created yet, add it */ + limit_pvt = switch_core_session_alloc(session, sizeof(*limit_pvt)); + switch_mutex_init(&limit_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + limit_pvt->first = NULL; + switch_channel_set_private(channel, "hiredis_limit_pvt", limit_pvt); + switch_mutex_unlock(mod_hiredis_globals.limit_pvt_mutex); + switch_mutex_lock(limit_pvt->mutex); + return limit_pvt; +} + +/** + * Release exclusive acess to limit_pvt + */ +static void release_limit_pvt(hiredis_limit_pvt_t *limit_pvt) +{ + if (limit_pvt) { + switch_mutex_unlock(limit_pvt->mutex); + } +} + SWITCH_STANDARD_APP(raw_app) { switch_channel_t *channel = switch_core_session_get_channel(session); @@ -65,7 +127,7 @@ SWITCH_STANDARD_APP(raw_app) return; } - if ( hiredis_profile_execute_sync(profile, cmd, &response, session) != SWITCH_STATUS_SUCCESS) { + if ( hiredis_profile_execute_sync(profile, session, &response, cmd) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", profile_name, cmd, response ? response : ""); } @@ -103,7 +165,7 @@ SWITCH_STANDARD_API(raw_api) switch_goto_status(SWITCH_STATUS_GENERR, done); } - if ( hiredis_profile_execute_sync(profile, data, &response, session) != SWITCH_STATUS_SUCCESS) { + if ( hiredis_profile_execute_sync(profile, session, &response, data) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] reason:[%s]\n", input, data, response ? response : ""); switch_goto_status(SWITCH_STATUS_GENERR, done); } @@ -125,11 +187,12 @@ SWITCH_LIMIT_INCR(hiredis_limit_incr) { switch_channel_t *channel = switch_core_session_get_channel(session); hiredis_profile_t *profile = NULL; - char *hashkey = NULL, *response = NULL, *limit_key = NULL; + char *response = NULL, *limit_key = NULL; int64_t count = 0; /* Redis defines the incr action as to be performed on a 64 bit signed integer */ time_t now = switch_epoch_time_now(NULL); switch_status_t status = SWITCH_STATUS_SUCCESS; hiredis_limit_pvt_t *limit_pvt = NULL; + hiredis_limit_pvt_node_t *limit_pvt_node = NULL; switch_memory_pool_t *session_pool = switch_core_session_get_pool(session); if ( zstr(realm) ) { @@ -150,69 +213,70 @@ SWITCH_LIMIT_INCR(hiredis_limit_incr) } if ( interval ) { - limit_key = switch_mprintf("%s_%d", resource, now / interval); + limit_key = switch_core_session_sprintf(session, "%s_%d", resource, now / interval); } else { - limit_key = switch_mprintf("%s", resource); + limit_key = switch_core_session_sprintf(session, "%s", resource); } - hashkey = switch_mprintf("incr %s", limit_key); - - if ( (status = hiredis_profile_execute_sync(profile, hashkey, &response, session)) != SWITCH_STATUS_SUCCESS ) { + if ( (status = hiredis_profile_execute_pipeline_printf(profile, session, &response, "incr %s", limit_key) ) != SWITCH_STATUS_SUCCESS ) { if ( status == SWITCH_STATUS_SOCKERR && profile->ignore_connect_fail) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] connection error executing [%s]\n", realm, hashkey); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] connection error incrementing [%s]\n", realm, limit_key); switch_goto_status(SWITCH_STATUS_SUCCESS, done); } else if ( profile->ignore_error ) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] general error executing [%s]\n", realm, hashkey); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] general error incrementing [%s]\n", realm, limit_key); switch_goto_status(SWITCH_STATUS_SUCCESS, done); } - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", realm, hashkey, response ? response : ""); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error incrementing [%s] because [%s]\n", realm, limit_key, response ? response : ""); switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : ""); switch_goto_status(SWITCH_STATUS_GENERR, done); } /* set expiration for interval on first increment */ if ( interval && !strcmp("1", response ? response : "") ) { - char *expire_response = NULL; - char *expire_cmd = switch_mprintf("expire %s %d", limit_key, interval); - hiredis_profile_execute_sync(profile, expire_cmd, &expire_response, session); - switch_safe_free(expire_cmd); - switch_safe_free(expire_response); + hiredis_profile_execute_pipeline_printf(profile, session, NULL, "expire %s %d", limit_key, interval); } switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : ""); - limit_pvt = switch_core_alloc(session_pool, sizeof(hiredis_limit_pvt_t)); - limit_pvt->next = switch_channel_get_private(channel, "hiredis_limit_pvt"); - limit_pvt->realm = switch_core_strdup(session_pool, realm); - limit_pvt->resource = switch_core_strdup(session_pool, resource); - limit_pvt->limit_key = switch_core_strdup(session_pool, limit_key); - limit_pvt->inc = 1; - limit_pvt->interval = interval; - switch_channel_set_private(channel, "hiredis_limit_pvt", limit_pvt); - count = atoll(response ? response : ""); - if (!interval && count > max ) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "%s is already at max value (%d)\n", limit_key , max); - switch_safe_free(hashkey); - switch_safe_free(response); - hashkey = switch_mprintf("decr %s", limit_key); - if ( (status = hiredis_profile_execute_sync(profile, hashkey, &response, session)) != SWITCH_STATUS_SUCCESS ) { - if ( status == SWITCH_STATUS_SOCKERR && profile->ignore_connect_fail ) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "hiredis: profile[%s] connection error executing [%s] with limit already reached\n", realm, hashkey); - switch_goto_status(SWITCH_STATUS_SUCCESS, done); // increment has been succesful but decrement have failed - } - } + if ( switch_is_number(response ? response : "") && count <= 0 ) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "limit not positive after increment, resource = %s, val = %s\n", limit_key, response ? response : ""); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "resource = %s, response = %s\n", limit_key, response ? response : ""); } - if ( !count || count > max ) { + if ( !switch_is_number(response ? response : "") && !profile->ignore_error ) { + /* got response error */ + switch_goto_status(SWITCH_STATUS_GENERR, done); + } else if ( max > 0 && count > 0 && count > max ) { + switch_channel_set_variable(channel, "hiredis_limit_exceeded", "true"); + if ( !interval ) { /* don't need to decrement intervals if limit exceeded since the interval keys are named w/ timestamp */ + if ( profile->delete_when_zero ) { + hiredis_profile_eval_pipeline(profile, session, NULL, DECR_DEL_SCRIPT, 1, limit_key); + } else { + hiredis_profile_execute_pipeline_printf(profile, session, NULL, "decr %s", limit_key); + } + } switch_goto_status(SWITCH_STATUS_GENERR, done); } + if ( !interval && count > 0 ) { + /* only non-interval limits need to be released on session destroy */ + limit_pvt_node = switch_core_alloc(session_pool, sizeof(*limit_pvt_node)); + limit_pvt_node->realm = switch_core_strdup(session_pool, realm); + limit_pvt_node->resource = switch_core_strdup(session_pool, resource); + limit_pvt_node->limit_key = limit_key; + limit_pvt_node->inc = 1; + limit_pvt_node->interval = interval; + limit_pvt = add_limit_pvt(session); + limit_pvt_node->next = limit_pvt->first; + limit_pvt->first = limit_pvt_node; + release_limit_pvt(limit_pvt); + } + done: - switch_safe_free(limit_key); switch_safe_free(response); - switch_safe_free(hashkey); return status; } @@ -223,54 +287,75 @@ SWITCH_LIMIT_RELEASE(hiredis_limit_release) { switch_channel_t *channel = switch_core_session_get_channel(session); hiredis_profile_t *profile = NULL; - char *hashkey = NULL, *response = NULL; + char *response = NULL; switch_status_t status = SWITCH_STATUS_SUCCESS; - hiredis_limit_pvt_t *limit_pvt = switch_channel_get_private(channel, "hiredis_limit_pvt"); + hiredis_limit_pvt_t *limit_pvt = get_limit_pvt(session); + + if (!limit_pvt) { + /* nothing to release */ + return SWITCH_STATUS_SUCCESS; + } /* If realm and resource are NULL, then clear all of the limits */ - if ( !realm && !resource ) { - hiredis_limit_pvt_t *tmp = limit_pvt; + if ( zstr(realm) && zstr(resource) ) { + hiredis_limit_pvt_node_t *cur = NULL; - while (tmp) { + for ( cur = limit_pvt->first; cur; cur = cur->next ) { /* Rate limited resources are not auto-decremented, they will expire. */ - if (!tmp->interval) { - profile = switch_core_hash_find(mod_hiredis_globals.profiles, limit_pvt->realm); - hashkey = switch_mprintf("decr %s", tmp->limit_key); - - if ( hiredis_profile_execute_sync(profile, hashkey, &response, session) != SWITCH_STATUS_SUCCESS ) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", - tmp->realm, hashkey, response ? response : ""); + if ( !cur->interval && cur->inc ) { + switch_status_t result; + cur->inc = 0; /* mark as released */ + profile = switch_core_hash_find(mod_hiredis_globals.profiles, cur->realm); + if ( profile->delete_when_zero ) { + result = hiredis_profile_eval_pipeline(profile, session, &response, DECR_DEL_SCRIPT, 1, cur->limit_key); + } else { + result = hiredis_profile_execute_pipeline_printf(profile, session, &response, "decr %s", cur->limit_key); + } + if ( result != SWITCH_STATUS_SUCCESS ) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error decrementing [%s] because [%s]\n", + cur->realm, cur->limit_key, response ? response : ""); } - switch_safe_free(response); - switch_safe_free(hashkey); + response = NULL; } - tmp = tmp->next; } - } else { - profile = switch_core_hash_find(mod_hiredis_globals.profiles, limit_pvt->realm); + } else if (!zstr(resource) ) { + /* clear single non-interval resource */ + hiredis_limit_pvt_node_t *cur = NULL; + for (cur = limit_pvt->first; cur; cur = cur->next ) { + if ( !cur->interval && cur->inc && !strcmp(cur->resource, resource) && (zstr(realm) || !strcmp(cur->realm, realm)) ) { + /* found the resource to clear */ + cur->inc = 0; /* mark as released */ + profile = switch_core_hash_find(mod_hiredis_globals.profiles, cur->realm); + if (profile) { + if ( profile->delete_when_zero ) { + status = hiredis_profile_eval_pipeline(profile, session, &response, DECR_DEL_SCRIPT, 1, cur->limit_key); + } else { + status = hiredis_profile_execute_pipeline_printf(profile, session, &response, "decr %s", cur->limit_key); + } + if ( status != SWITCH_STATUS_SUCCESS ) { + if ( status == SWITCH_STATUS_SOCKERR && profile->ignore_connect_fail ) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] connection error decrementing [%s]\n", cur->realm, cur->limit_key); + switch_goto_status(SWITCH_STATUS_SUCCESS, done); + } else if ( profile->ignore_error ) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] general error decrementing [%s]\n", realm, cur->limit_key); + switch_goto_status(SWITCH_STATUS_SUCCESS, done); + } + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error decrementing [%s] because [%s]\n", realm, cur->limit_key, response ? response : ""); + switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : ""); + switch_goto_status(SWITCH_STATUS_GENERR, done); + } - hashkey = switch_mprintf("decr %s", limit_pvt->limit_key); - - if ( ( status = hiredis_profile_execute_sync(profile, hashkey, &response, session) ) != SWITCH_STATUS_SUCCESS ) { - if ( status == SWITCH_STATUS_SOCKERR && profile->ignore_connect_fail ) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] connection error executing [%s]\n", realm, hashkey); - switch_goto_status(SWITCH_STATUS_SUCCESS, done); - } else if ( profile->ignore_error ) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "hiredis: ignoring profile[%s] general error executing [%s]\n", realm, hashkey); - switch_goto_status(SWITCH_STATUS_SUCCESS, done); + switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : ""); + } + break; } - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", realm, hashkey, response ? response : ""); - switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : ""); - switch_goto_status(SWITCH_STATUS_GENERR, done); } - - switch_channel_set_variable(channel, "hiredis_raw_response", response ? response : ""); } done: + release_limit_pvt(limit_pvt); switch_safe_free(response); - switch_safe_free(hashkey); return status; } @@ -281,7 +366,7 @@ SWITCH_LIMIT_USAGE(hiredis_limit_usage) { hiredis_profile_t *profile = switch_core_hash_find(mod_hiredis_globals.profiles, realm); int64_t count = 0; /* Redis defines the incr action as to be performed on a 64 bit signed integer */ - char *hashkey = NULL, *response = NULL; + char *response = NULL; if ( zstr(realm) ) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: realm must be defined\n"); @@ -293,22 +378,18 @@ SWITCH_LIMIT_USAGE(hiredis_limit_usage) goto err; } - hashkey = switch_mprintf("get %s", resource); - - if ( hiredis_profile_execute_sync(profile, hashkey, &response, NULL) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", realm, hashkey, response ? response : ""); + if ( hiredis_profile_execute_pipeline_printf(profile, NULL, &response, "get %s", resource) != SWITCH_STATUS_SUCCESS ) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error querying [%s] because [%s]\n", realm, resource, response ? response : ""); goto err; } count = atoll(response ? response : ""); switch_safe_free(response); - switch_safe_free(hashkey); return count; err: switch_safe_free(response); - switch_safe_free(hashkey); return -1; } @@ -317,10 +398,8 @@ SWITCH_LIMIT_RESET(name) static switch_status_t name (void) */ SWITCH_LIMIT_RESET(hiredis_limit_reset) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, - "hiredis: unable to globally reset hiredis limit resources. Use 'hiredis_raw set resource_name 0'\n"); - - return SWITCH_STATUS_GENERR; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to globally reset hiredis limit resources. Use 'hiredis_raw set resource_name 0'\n"); + return SWITCH_STATUS_NOTIMPL; } /* @@ -328,32 +407,8 @@ SWITCH_LIMIT_RESET(hiredis_limit_reset) */ SWITCH_LIMIT_INTERVAL_RESET(hiredis_limit_interval_reset) { - /* TODO this doesn't work since the key has the interval in it */ - hiredis_profile_t *profile = switch_core_hash_find(mod_hiredis_globals.profiles, realm); - switch_status_t status = SWITCH_STATUS_SUCCESS; - char *hashkey = NULL, *response = NULL; - - if ( !zstr(realm) ) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: realm must be defined\n"); - switch_goto_status(SWITCH_STATUS_GENERR, done); - } - - if ( !profile ) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: Unable to locate profile[%s]\n", realm); - switch_goto_status(SWITCH_STATUS_GENERR, done); - } - - hashkey = switch_mprintf("set %s 0", resource); - - if ( hiredis_profile_execute_sync(profile, hashkey, &response, NULL) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] because [%s]\n", realm, hashkey, response ? response : ""); - switch_goto_status(SWITCH_STATUS_GENERR, done); - } - - done: - switch_safe_free(response); - switch_safe_free(hashkey); - return status; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to reset hiredis interval limit resources.\n"); + return SWITCH_STATUS_NOTIMPL; } /* @@ -370,9 +425,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_hiredis_load) switch_api_interface_t *api_interface; switch_limit_interface_t *limit_interface; - memset(&mod_hiredis_globals, 0, sizeof(mod_hiredis_global_t)); + memset(&mod_hiredis_globals, 0, sizeof(mod_hiredis_globals)); *module_interface = switch_loadable_module_create_module_interface(pool, modname); mod_hiredis_globals.pool = pool; + switch_mutex_init(&mod_hiredis_globals.limit_pvt_mutex, SWITCH_MUTEX_NESTED, pool); switch_core_hash_init(&(mod_hiredis_globals.profiles)); diff --git a/src/mod/applications/mod_hiredis/mod_hiredis.h b/src/mod/applications/mod_hiredis/mod_hiredis.h index 362c69cdd3..faf8fb7a77 100644 --- a/src/mod/applications/mod_hiredis/mod_hiredis.h +++ b/src/mod/applications/mod_hiredis/mod_hiredis.h @@ -6,52 +6,85 @@ #include typedef struct mod_hiredis_global_s { - switch_memory_pool_t *pool; - switch_hash_t *profiles; + switch_memory_pool_t *pool; + switch_hash_t *profiles; + switch_mutex_t *limit_pvt_mutex; } mod_hiredis_global_t; extern mod_hiredis_global_t mod_hiredis_globals; +typedef struct hiredis_request_s { + char *request; + char **response; + int done; + int do_eval; + int num_keys; + char *keys; + char *session_uuid; + switch_status_t status; + switch_mutex_t *mutex; + switch_thread_cond_t *cond; + struct hiredis_request_s *next; +} hiredis_request_t; + typedef struct mod_hiredis_context_s { - struct hiredis_connection_s *connection; - redisContext *context; + struct hiredis_connection_s *connection; + redisContext *context; } hiredis_context_t; typedef struct hiredis_connection_s { - char *host; - char *password; - uint32_t port; - switch_interval_time_t timeout_us; - struct timeval timeout; - switch_memory_pool_t *pool; - switch_queue_t *context_pool; + char *host; + char *password; + uint32_t port; + switch_interval_time_t timeout_us; + struct timeval timeout; + switch_memory_pool_t *pool; + switch_queue_t *context_pool; - struct hiredis_connection_s *next; + struct hiredis_connection_s *next; } hiredis_connection_t; typedef struct hiredis_profile_s { - switch_memory_pool_t *pool; - char *name; - uint8_t ignore_connect_fail; - uint8_t ignore_error; + switch_memory_pool_t *pool; + char *name; + uint8_t ignore_connect_fail; + uint8_t ignore_error; + hiredis_connection_t *conn_head; - hiredis_connection_t *conn_head; + switch_thread_rwlock_t *pipeline_lock; + switch_queue_t *request_pool; + switch_queue_t *active_requests; + int pipeline_running; + int max_pipelined_requests; + + int delete_when_zero; } hiredis_profile_t; +typedef struct hiredis_limit_pvt_node_s { + char *realm; + char *resource; + char *limit_key; + int inc; + int interval; + struct hiredis_limit_pvt_node_s *next; +} hiredis_limit_pvt_node_t; + typedef struct hiredis_limit_pvt_s { - char *realm; - char *resource; - char *limit_key; - int inc; - int interval; - struct hiredis_limit_pvt_s *next; + switch_mutex_t *mutex; + struct hiredis_limit_pvt_node_s *first; } hiredis_limit_pvt_t; switch_status_t mod_hiredis_do_config(void); -switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t ignore_connect_fail, uint8_t ignore_error); +switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t ignore_connect_fail, uint8_t ignore_error, int max_pipelined_requests, int delete_when_zero); switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile); switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_connections); +switch_status_t hiredis_profile_execute_requests(hiredis_profile_t *profile, switch_core_session_t *session, hiredis_request_t *requests); +switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *data); +switch_status_t hiredis_profile_execute_sync_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *data_format_string, ...); -switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **response, switch_core_session_t *session); +void hiredis_pipeline_thread_start(hiredis_profile_t *profile); +void hiredis_pipeline_threads_stop(hiredis_profile_t *profile); +switch_status_t hiredis_profile_execute_pipeline_printf(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *data_format_string, ...); +switch_status_t hiredis_profile_eval_pipeline(hiredis_profile_t *profile, switch_core_session_t *session, char **response, const char *script, int num_keys, const char *keys); #endif /* MOD_HIREDIS_H */