FS-10167: Added preliminary blade.broadcast, tested event being broadcasted with bladec and blades through switchblade

This commit is contained in:
Shane Bryldt 2017-06-10 23:08:39 -06:00
parent ca19bee71c
commit e931137444
4 changed files with 265 additions and 0 deletions

View File

@ -88,6 +88,7 @@ ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, v
ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
@ -359,6 +360,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL);
blade_handle_corerpc_register(brpc);
blade_rpc_create(&brpc, bh, "blade.broadcast", NULL, NULL, blade_protocol_broadcast_request_handler, NULL);
blade_handle_corerpc_register(brpc);
// register internal transport for secure websockets
blade_transport_wss_create(&bt, bh);
ks_assert(bt);
@ -2141,6 +2145,232 @@ done:
return KS_FALSE;
}
// blade.broadcast request generator
KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
// this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to
ret = blade_protocol_broadcast_raw(bh, NULL, event, protocol, realm, params, callback, data);
// @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
// if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
// is where this normally occurs
return ret;
}
KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
const char *bsub_key = NULL;
blade_subscription_t *bsub = NULL;
blade_session_t *bs = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
ks_hash_read_lock(bh->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED);
if (bsub) {
ks_hash_t *subscribers = blade_subscription_subscribers_get(bsub);
ks_assert(subscribers);
for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_hash_this(it, &key, NULL, &value);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
if (blade_handle_local_nodeid_compare(bh, (const char *)key)) continue;
bs = blade_handle_sessions_lookup(bh, (const char *)key);
if (bs) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
blade_session_send(bs, req, callback, data);
cJSON_Delete(req);
blade_session_read_unlock(bs);
}
}
}
ks_hash_read_unlock(bh->subscriptions);
ks_pool_free(bh->pool, &bsub_key);
bs = blade_handle_sessions_upstream(bh);
if (bs) {
if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
blade_rpc_request_raw_create(bh->pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
blade_session_send(bs, req, callback, data);
cJSON_Delete(req);
}
blade_session_read_unlock(bs);
}
return KS_STATUS_SUCCESS;
}
// blade.broadcast request handler
ks_bool_t blade_protocol_broadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_event = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
cJSON *req_params_params = NULL;
const char *bsub_key = NULL;
blade_subscription_t *bsub = NULL;
blade_rpc_request_callback_t callback = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
req = blade_rpc_request_message_get(brpcreq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (!req_params) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'params' object\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
if (!req_params_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'realm'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_params = cJSON_GetObjectCstr(req_params, "params");
blade_protocol_broadcast_raw(bh, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL);
bsub_key = ks_psprintf(bh->pool, "%s@%s/%s", req_params_protocol, req_params_realm, req_params_event);
ks_hash_read_lock(bh->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)bsub_key, KS_UNLOCKED);
if (bsub) {
ks_rwl_read_lock(bh->local_nodeid_rwl);
if (ks_hash_search(blade_subscription_subscribers_get(bsub), bh->local_nodeid, KS_UNLOCKED)) {
callback = blade_subscription_callback_get(bsub);
if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
}
ks_rwl_read_unlock(bh->local_nodeid_rwl);
}
ks_hash_read_unlock(bh->subscriptions);
ks_pool_free(bh->pool, &bsub_key);
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_AddStringToObject(res_result, "event", req_params_event);
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
done:
if (res) cJSON_Delete(res);
if (bs) blade_session_read_unlock(bs);
return ret;
}
KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq)
{
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_params = NULL;
ks_assert(brpcreq);
req = blade_rpc_request_message_get(brpcreq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (req_params) req_params_params = cJSON_GetObjectItem(req_params, "params");
return req_params_params;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -112,6 +112,10 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_protocol_broadcast(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_protocol_broadcast_raw(blade_handle_t *bh, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(cJSON *) blade_protocol_broadcast_request_params_get(blade_rpc_request_t *brpcreq);
KS_END_EXTERN_C
#endif

View File

@ -202,6 +202,8 @@ int main(int argc, char **argv)
blade_identity_destroy(&target);
ks_sleep_ms(5000);
}
loop(bh);

View File

@ -16,9 +16,11 @@ struct command_def_s {
};
void command_quit(blade_handle_t *bh, char *args);
void command_broadcast(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "quit", command_quit },
{ "broadcast", command_broadcast },
{ NULL, NULL }
};
@ -81,6 +83,25 @@ ks_bool_t test_echo_request_handler(blade_rpc_request_t *brpcreq, void *data)
return KS_FALSE;
}
ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
ks_assert(brpcres);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) test.event response processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
return KS_FALSE;
}
int main(int argc, char **argv)
{
@ -216,6 +237,14 @@ void command_quit(blade_handle_t *bh, char *args)
g_shutdown = KS_TRUE;
}
void command_broadcast(blade_handle_t *bh, char *args)
{
ks_assert(bh);
ks_assert(args);
blade_protocol_broadcast(bh, "test.event", "test", "mydomain.com", NULL, test_event_response_handler, NULL);
}
/* For Emacs:
* Local Variables: