FS-10167: First pass on adding support for blade.locate

This commit is contained in:
Shane Bryldt 2017-05-30 22:44:46 -06:00
parent c15a7d1f89
commit f915bf515f
3 changed files with 320 additions and 3 deletions

View File

@ -88,6 +88,9 @@ struct blade_handle_s {
ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, void *data);
ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres);
ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data);
ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres);
typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
struct blade_handle_session_state_callback_registration_s {
@ -328,6 +331,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
blade_jsonrpc_create(&bjsonrpc, bh, "blade.publish", blade_protocol_publish_request_handler, NULL);
blade_handle_jsonrpc_register(bjsonrpc);
blade_jsonrpc_create(&bjsonrpc, bh, "blade.locate", blade_protocol_locate_request_handler, NULL);
blade_handle_jsonrpc_register(bjsonrpc);
blade_transport_wss_create(&bt, bh);
ks_assert(bt);
bh->default_transport = bt;
@ -1042,7 +1048,7 @@ KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *b
// blade.publish request generator
// @todo add additional async callback to be called upon a publish response to inform caller of the result?
// @todo add additional async callback to be called upon a publish response to inform caller of the result from the publish response handler
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm)
{
ks_status_t ret = KS_STATUS_SUCCESS;
@ -1073,8 +1079,14 @@ KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *n
cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
ks_rwl_read_unlock(bh->master_nodeid_rwl);
// @todo add a parameter containing a block of json for schema definitions for each of the methods being published
ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
ret = blade_session_send(bs, req, blade_protocol_publish_response_handler);
ret = blade_session_send(bs, req, blade_protocol_publish_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response
// @todo upon return, a provider should register the methods for this protocol to be locally available
// prior to receiving a response, if the response is an error then unregister, but if it is successful
// then the node is already primed to receive any immediate requests
done:
if (req) cJSON_Delete(req);
@ -1229,6 +1241,7 @@ ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq,
blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
@ -1309,6 +1322,300 @@ done:
return KS_FALSE;
}
// blade.locate request generator
// @todo add additional async callback to be called upon a locate response to inform caller of the result from the locate response handler
KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm)
{
ks_status_t ret = KS_STATUS_SUCCESS;
cJSON *req = NULL;
cJSON *req_params = NULL;
blade_session_t *bs = NULL;
ks_assert(bh);
ks_assert(name);
ks_assert(realm);
if (!(bs = blade_handle_sessions_upstream(bh))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
blade_jsonrpc_request_raw_create(blade_handle_pool_get(bh), &req, &req_params, NULL, "blade.locate");
// fill in the req_params
cJSON_AddStringToObject(req_params, "protocol", name);
cJSON_AddStringToObject(req_params, "realm", realm);
ks_rwl_read_lock(bh->local_nodeid_rwl);
cJSON_AddStringToObject(req_params, "requester-nodeid", bh->local_nodeid);
ks_rwl_read_unlock(bh->local_nodeid_rwl);
ks_rwl_read_lock(bh->master_nodeid_rwl);
cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
ks_rwl_read_unlock(bh->master_nodeid_rwl);
ks_log(KS_LOG_DEBUG, "Session (%s) locate request started\n", blade_session_id_get(bs));
ret = blade_session_send(bs, req, blade_protocol_locate_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response
// @todo upon return, a provider should register the methods for this protocol to be locally available
// prior to receiving a response, if the response is an error then unregister, but if it is successful
// then the node is already primed to receive any immediate requests
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
return ret;
}
// blade.locate request handler
ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_requester_nodeid = NULL;
const char *req_params_responder_nodeid = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
cJSON *res_result_providers;
blade_protocol_t *bp = NULL;
const char *bp_key = NULL;
ks_assert(breq);
bh = blade_jsonrpc_request_handle_get(breq);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_request_sessionid_get(breq));
ks_assert(bs);
req = blade_jsonrpc_request_message_get(breq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (!req_params) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'params' object\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params object");
blade_session_send(bs, res, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'protocol'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params protocol");
blade_session_send(bs, res, NULL);
goto done;
}
req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
if (!req_params_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'realm'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params realm");
blade_session_send(bs, res, NULL);
goto done;
}
// @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
if (!req_params_requester_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'requester-nodeid'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params requester-nodeid");
blade_session_send(bs, res, NULL);
goto done;
}
req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
if (!req_params_responder_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'responder-nodeid'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params responder-nodeid");
blade_session_send(bs, res, NULL);
goto done;
}
if (!blade_handle_master_nodeid_compare(bh, req_params_responder_nodeid)) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Invalid params responder-nodeid");
blade_session_send(bs, res, NULL);
goto done;
}
// errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
// errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
// not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node
blade_session_t *bsu = blade_handle_sessions_upstream(bh);
if (!bsu) {
cJSON *res_error = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
// needed in case this error must propagate further than the session which sent it
cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
blade_session_send(bs, res, NULL);
goto done;
}
// @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
// by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
// and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
// routed would not enter into these handlers and would not leave a footprint passing through routers
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
blade_session_send(bsu, req, blade_protocol_locate_response_handler);
blade_session_read_unlock(bsu);
goto done;
}
// this local node must be responder-nodeid for the request, so process the request
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
res_result_providers = cJSON_CreateObject();
bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm);
ks_hash_read_lock(bh->protocols);
bp = (blade_protocol_t *)ks_hash_search(bh->protocols, (void *)bp_key, KS_UNLOCKED);
if (bp) {
ks_hash_t *providers = blade_protocol_providers_get(bp);
for (ks_hash_iterator_t *it = ks_hash_first(providers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
cJSON_AddItemToArray(res_result_providers, cJSON_CreateString(key));
}
}
ks_hash_read_unlock(bh->protocols);
// build the actual response finally
blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
cJSON_AddItemToObject(res_result, "providers", res_result_providers);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL);
done:
if (res) cJSON_Delete(res);
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
// blade.locate response handler
ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *res = NULL;
cJSON *res_error = NULL;
cJSON *res_result = NULL;
cJSON *res_object = NULL;
cJSON *res_result_providers = NULL;
const char *requester_nodeid = NULL;
const char *responder_nodeid = NULL;
const char *res_result_protocol = NULL;
const char *res_result_realm = NULL;
ks_assert(bres);
bh = blade_jsonrpc_response_handle_get(bres);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_response_sessionid_get(bres));
ks_assert(bs);
res = blade_jsonrpc_response_message_get(bres);
ks_assert(res);
res_error = cJSON_GetObjectItem(res, "error");
res_result = cJSON_GetObjectItem(res, "result");
if (!res_error && !res_result) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs));
goto done;
}
res_object = res_error ? res_error : res_result;
requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
if (!bsd) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
goto done;
}
ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
blade_session_send(bsd, res, NULL);
blade_session_read_unlock(bsd);
goto done;
}
// this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
if (res_error) {
// @todo process error response
ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs));
goto done;
}
// process result response
res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
if (!res_result_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs));
goto done;
}
res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
if (!res_result_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs));
goto done;
}
res_result_providers = cJSON_GetObjectItem(res_result, "providers");
if (!res_result_providers) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs));
goto done;
}
for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) {
cJSON *elem = cJSON_GetArrayItem(res_result_providers, index);
if (elem->type == cJSON_String) {
ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring);
}
}
done:
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -92,6 +92,7 @@ KS_DECLARE(ks_status_t) blade_handle_session_state_callback_unregister(blade_han
KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *bs, blade_session_state_condition_t condition);
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm);
KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm);
KS_END_EXTERN_C
#endif

View File

@ -16,9 +16,11 @@ struct command_def_s {
};
void command_quit(blade_handle_t *bh, char *args);
void command_locate(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "quit", command_quit },
{ "locate", command_locate },
{ NULL, NULL }
};
@ -110,7 +112,7 @@ int main(int argc, char **argv)
blade_identity_destroy(&target);
ks_sleep_ms(5000);
ks_sleep_ms(5000); // @todo use session state change callback to know when the session is ready, this ensures it's ready before trying to publish upstream
blade_protocol_publish(bh, "test", "mydomain.com");
}
@ -191,6 +193,13 @@ void command_quit(blade_handle_t *bh, char *args)
g_shutdown = KS_TRUE;
}
void command_locate(blade_handle_t *bh, char *args)
{
ks_assert(bh);
ks_assert(args);
blade_protocol_locate(bh, "test", "mydomain.com");
}