FS-10167: Implemented first draft of blade.publish, still need to circle back to finish blade.route to support multi-tiered routes

This commit is contained in:
Shane Bryldt 2017-05-30 10:51:15 -06:00
parent 84ec92048a
commit e9d1ea645f
18 changed files with 824 additions and 275 deletions

View File

@ -191,6 +191,7 @@
<ClCompile Include="src\blade_connection.c" />
<ClCompile Include="src\blade_identity.c" />
<ClCompile Include="src\blade_jsonrpc.c" />
<ClCompile Include="src\blade_protocol.c" />
<ClCompile Include="src\blade_transport_wss.c" />
<ClCompile Include="src\blade_session.c" />
<ClCompile Include="src\blade_stack.c" />
@ -202,6 +203,7 @@
<ClInclude Include="src\include\blade_connection.h" />
<ClInclude Include="src\include\blade_identity.h" />
<ClInclude Include="src\include\blade_jsonrpc.h" />
<ClInclude Include="src\include\blade_protocol.h" />
<ClInclude Include="src\include\blade_transport_wss.h" />
<ClInclude Include="src\include\blade_session.h" />
<ClInclude Include="src\include\blade_stack.h" />

View File

@ -42,6 +42,9 @@
<ClCompile Include="src\blade_transport_wss.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="src\blade_protocol.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="src\include\unqlite.h">
@ -74,5 +77,8 @@
<ClInclude Include="src\include\blade_transport_wss.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="src\include\blade_protocol.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
</Project>

View File

@ -90,11 +90,6 @@ KS_DECLARE(ks_status_t) blade_identity_destroy(blade_identity_t **biP)
ks_assert(*biP);
bi = *biP;
if (bi->uri) {
ks_pool_free(bi->pool, &bi->uri);
ks_pool_free(bi->pool, &bi->components);
}
if (bi->parameters) ks_hash_destroy(&bi->parameters);
ks_pool_free(bi->pool, biP);

View File

@ -226,6 +226,18 @@ KS_DECLARE(blade_handle_t *) blade_jsonrpc_request_handle_get(blade_jsonrpc_requ
return bjsonrpcreq->handle;
}
KS_DECLARE(const char *) blade_jsonrpc_request_sessionid_get(blade_jsonrpc_request_t *bjsonrpcreq)
{
ks_assert(bjsonrpcreq);
return bjsonrpcreq->session_id;
}
KS_DECLARE(cJSON *) blade_jsonrpc_request_message_get(blade_jsonrpc_request_t *bjsonrpcreq)
{
ks_assert(bjsonrpcreq);
return bjsonrpcreq->message;
}
KS_DECLARE(const char *) blade_jsonrpc_request_messageid_get(blade_jsonrpc_request_t *bjsonrpcreq)
{
ks_assert(bjsonrpcreq);
@ -358,6 +370,30 @@ KS_DECLARE(ks_status_t) blade_jsonrpc_response_raw_create(cJSON **json, cJSON **
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_handle_t *) blade_jsonrpc_response_handle_get(blade_jsonrpc_response_t *bjsonrpcres)
{
ks_assert(bjsonrpcres);
return bjsonrpcres->handle;
}
KS_DECLARE(const char *) blade_jsonrpc_response_sessionid_get(blade_jsonrpc_response_t *bjsonrpcres)
{
ks_assert(bjsonrpcres);
return bjsonrpcres->session_id;
}
KS_DECLARE(blade_jsonrpc_request_t *) blade_jsonrpc_response_request_get(blade_jsonrpc_response_t *bjsonrpcres)
{
ks_assert(bjsonrpcres);
return bjsonrpcres->request;
}
KS_DECLARE(cJSON *) blade_jsonrpc_response_message_get(blade_jsonrpc_response_t *bjsonrpcres)
{
ks_assert(bjsonrpcres);
return bjsonrpcres->message;
}
KS_DECLARE(ks_status_t) blade_jsonrpc_error_raw_create(cJSON **json, cJSON **error, const char *id, int32_t code, const char *message)
{
cJSON *root = NULL;

View File

@ -0,0 +1,145 @@
/*
* Copyright (c) 2017, Shane Bryldt
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
struct blade_protocol_s {
ks_pool_t *pool;
const char *name;
const char *realm;
ks_hash_t *providers;
// @todo descriptors (schema, etc) for each method within a protocol
};
static void blade_protocol_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_protocol_t *bp = (blade_protocol_t *)ptr;
ks_assert(bp);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
if (bp->name) ks_pool_free(bp->pool, &bp->name);
if (bp->realm) ks_pool_free(bp->pool, &bp->realm);
if (bp->providers) ks_hash_destroy(&bp->providers);
break;
case KS_MPCL_DESTROY:
break;
}
}
KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm)
{
blade_protocol_t *bp = NULL;
ks_assert(bpP);
ks_assert(pool);
ks_assert(name);
ks_assert(realm);
bp = ks_pool_alloc(pool, sizeof(blade_protocol_t));
bp->pool = pool;
bp->name = ks_pstrdup(pool, name);
bp->realm = ks_pstrdup(pool, realm);
ks_hash_create(&bp->providers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
ks_assert(bp->providers);
ks_pool_set_cleanup(pool, bp, NULL, blade_protocol_cleanup);
*bpP = bp;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP)
{
blade_protocol_t *bp = NULL;
ks_assert(bpP);
ks_assert(*bpP);
bp = *bpP;
ks_pool_free(bp->pool, bpP);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_hash_t *) blade_protocol_providers_get(blade_protocol_t *bp)
{
ks_assert(bp);
return bp->providers;
}
KS_DECLARE(ks_status_t) blade_protocol_providers_add(blade_protocol_t *bp, const char *nodeid)
{
char *key = NULL;
ks_assert(bp);
ks_assert(nodeid);
key = ks_pstrdup(bp->pool, nodeid);
ks_hash_insert(bp->providers, (void *)key, (void *)KS_TRUE);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_protocol_providers_remove(blade_protocol_t *bp, const char *nodeid)
{
ks_assert(bp);
ks_assert(nodeid);
ks_hash_remove(bp->providers, (void *)nodeid);
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -50,7 +50,6 @@ struct blade_session_s {
ks_q_t *sending;
ks_q_t *receiving;
ks_hash_t *identities;
ks_hash_t *realms;
ks_hash_t *routes;
@ -132,9 +131,6 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
ks_q_create(&bs->receiving, pool, 0);
ks_assert(bs->receiving);
ks_hash_create(&bs->identities, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bs->pool);
ks_assert(bs->identities);
ks_hash_create(&bs->realms, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bs->pool);
ks_assert(bs->realms);
@ -246,35 +242,6 @@ KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs)
return bs->state;
}
KS_DECLARE(ks_status_t) blade_session_identity_add(blade_session_t *bs, const char *identity)
{
char *key = NULL;
ks_assert(bs);
ks_assert(identity);
key = ks_pstrdup(bs->pool, identity);
ks_hash_insert(bs->identities, (void *)key, (void *)KS_TRUE);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_session_identity_remove(blade_session_t *bs, const char *identity)
{
ks_assert(bs);
ks_assert(identity);
ks_hash_remove(bs->identities, (void *)identity);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_hash_t *) blade_session_identities_get(blade_session_t *bs)
{
ks_assert(bs);
return bs->identities;
}
KS_DECLARE(ks_status_t) blade_session_realm_add(blade_session_t *bs, const char *realm)
{
char *key = NULL;

View File

@ -42,27 +42,19 @@ struct blade_handle_s {
ks_pool_t *pool;
ks_thread_pool_t *tpool;
// These are for the master identity, since it has no upstream, but the realm list will also propagate through other router nodes in "blade.connect" calls
const char *master_user;
const char **master_realms;
int32_t master_realms_length;
// local nodeid, can also be used to get the upstream session, and is provided by upstream session "blade.connect" response
const char *local_nodeid;
ks_rwl_t *local_nodeid_rwl;
// local identities such as upstream-session-id@mydomain.com, messages with a destination matching a key in this hash will be received and processed locally
// @todo currently the value is unused, but may find use for it later (could store a blade_identity_t, but these are becoming less useful in the core)
ks_hash_t *identities;
// master router nodeid, provided by upstream session "blade.connect" response
const char *master_nodeid;
ks_rwl_t *master_nodeid_rwl;
// realms for new identities, identities get created in all of these realms, these originate from the master and may be reduced down to a single realm by
// the master router, or by each router as it sees fit
// realms for new nodes, these originate from the master, and are provided by upstream session "blade.connect" response
ks_hash_t *realms;
// The guts of routing messages, this maps a remote identity key to a local sessionid value, sessions must also track the identities coming through them to
// allow for removing downstream identities from these routes when they are no longer available upon session termination
// When any node registers an identity through this node, whether it is a locally connected session or downstream through another router node, the registered
// identity will be added to this hash, with the sessionid of the session it came through as the value
// Any future message received and destined for identities that are not our own (see identities hash above), will use this hash for downstream relays or will
// otherwise attempt to send upstream if it did not come from upstream
// Messages must never back-travel through a session they were received from, thus when recieved from a downstream session, that downstream session is excluded
// for further downstream routing scenarios to avoid any possible circular routing, message routing must be checked through downstreams before passing upstream
// routes to reach any downstream node, keyed by nodeid of the target node with a value of the nodeid for the locally connected session
// which is the next step from this node to the target node
ks_hash_t *routes;
ks_hash_t *transports; // registered blade_transport_t
@ -73,15 +65,29 @@ struct blade_handle_s {
ks_hash_t *connections; // active connections keyed by connection id
ks_hash_t *sessions; // active sessions keyed by session id
ks_mutex_t *upstream_mutex; // locked when messing with upstream_id
const char *upstream_id; // session id of the currently active upstream session
ks_hash_t *sessions; // active sessions keyed by session id (which comes from the nodeid of the downstream side of the session, thus an upstream session is keyed under the local_nodeid)
ks_hash_t *session_state_callbacks;
// @note everything below this point is exclusively for the master node
// @todo need to track the details from blade.publish, a protocol may be published under multiple realms, and each protocol published to a realm may have multiple target providers
// @todo how does "exclusive" play into the providers, does "exclusive" mean only one provider can exist for a given protocol and realm?
// for now, ignore exclusive and multiple providers, key by "protocol" in a hash, and use a blade_protocol_t to represent a protocol in the context of being published so it can be located by other nodes
// each blade_protocol_t will contain the "protocol", common method/namespace/schema data, and a hash keyed by the "realm", with a value of an object of type blade_protocol_realm_t
// each blade_protocol_realm_t will contain the "realm" and a list of publisher nodeid's, any of which can be chosen at random to use the protocol within the given realm (does "exclusive" only limit this to 1 provider per realm?)
// @todo protocols must be cleaned up when routes are removed due to session terminations, should incorporate a faster way to lookup which protocols are tied to a given nodeid for efficient removal
// create blade_protocol_method_t to represent a method that is executed with blade.execute, and is part of a protocol made available through blade.publish, registered locally by the protocol and method name (protocol.methodname?),
// with a callback handler which should also have the realm available when executed so a single provider can easily provide a protocol for multiple realms with the same method callbacks
ks_hash_t *protocols; // master only: protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate
ks_hash_t *protocols_cleanup; // master only: keyed by the nodeid, each value should be a list_t* of which contains string values matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup
};
ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, void *data);
ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres);
typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
struct blade_handle_session_state_callback_registration_s {
@ -193,8 +199,11 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP)
bh->pool = pool;
bh->tpool = tpool;
ks_hash_create(&bh->identities, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
ks_assert(bh->identities);
ks_rwl_create(&bh->local_nodeid_rwl, bh->pool);
ks_assert(bh->local_nodeid_rwl);
ks_rwl_create(&bh->master_nodeid_rwl, bh->pool);
ks_assert(bh->master_nodeid_rwl);
ks_hash_create(&bh->realms, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
ks_assert(bh->realms);
@ -218,12 +227,15 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP)
ks_hash_create(&bh->sessions, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool);
ks_assert(bh->sessions);
ks_mutex_create(&bh->upstream_mutex, KS_MUTEX_FLAG_DEFAULT, bh->pool);
ks_assert(bh->upstream_mutex);
ks_hash_create(&bh->session_state_callbacks, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_VALUE, bh->pool);
ks_assert(bh->session_state_callbacks);
ks_hash_create(&bh->protocols, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool);
ks_assert(bh->protocols);
ks_hash_create(&bh->protocols_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool);
ks_assert(bh->protocols_cleanup);
ks_pool_set_cleanup(pool, bh, NULL, blade_handle_cleanup);
*bhP = bh;
@ -259,10 +271,9 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
{
config_setting_t *master = NULL;
config_setting_t *master_user = NULL;
config_setting_t *master_nodeid = NULL;
config_setting_t *master_realms = NULL;
const char *user = NULL;
const char **realms = NULL;
const char *nodeid = NULL;
int32_t realms_length = 0;
ks_assert(bh);
@ -275,39 +286,34 @@ ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
master = config_setting_get_member(config, "master");
if (master) {
master_user = config_lookup_from(master, "user");
if (master_user) {
if (config_setting_type(master_user) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
user = config_setting_get_string(master_user);
master_nodeid = config_lookup_from(master, "nodeid");
if (master_nodeid) {
if (config_setting_type(master_nodeid) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
nodeid = config_setting_get_string(master_nodeid);
blade_handle_local_nodeid_set(bh, nodeid);
blade_handle_master_nodeid_set(bh, nodeid);
}
master_realms = config_lookup_from(master, "realms");
if (master_realms) {
if (config_setting_type(master_realms) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
realms_length = config_setting_length(master_realms);
if (realms_length > 0) {
realms = ks_pool_alloc(bh->pool, sizeof(const char *) * realms_length);
for (int32_t index = 0; index < realms_length; ++index) {
const char *realm = config_setting_get_string_elem(master_realms, index);
if (!realm) return KS_STATUS_FAIL;
realms[index] = ks_pstrdup(bh->pool, realm);
blade_handle_realm_register(bh, realm);
}
}
}
}
// @todo in spirit of simple config, keep the list of routers you can attempt as a client at a root level config setting "routers" using identities with transport parameters if required
if (user && realms_length > 0) {
bh->master_user = ks_pstrdup(bh->pool, user);
bh->master_realms = realms;
bh->master_realms_length = realms_length;
}
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config)
{
blade_jsonrpc_t *bjsonrpc = NULL;
blade_transport_t *bt = NULL;
ks_hash_iterator_t *it = NULL;
@ -319,22 +325,14 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
}
// register internals
blade_jsonrpc_create(&bjsonrpc, bh, "blade.publish", blade_protocol_publish_request_handler, NULL);
blade_handle_jsonrpc_register(bjsonrpc);
blade_transport_wss_create(&bt, bh);
ks_assert(bt);
bh->default_transport = bt;
blade_handle_transport_register(bt);
for (int32_t index = 0; index < bh->master_realms_length; ++index) {
const char *realm = bh->master_realms[index];
//char *identity = ks_pstrcat(bh->pool, bh->master_user, "@", realm); // @todo this does not work... why?
char *identity = ks_psprintf(bh->pool, "%s@%s", bh->master_user, realm);
blade_handle_identity_register(bh, identity);
blade_handle_realm_register(bh, realm);
ks_pool_free(bh->pool, &identity);
}
for (it = ks_hash_first(bh->transports, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
blade_transport_t *value = NULL;
@ -412,44 +410,86 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh)
return bh->tpool;
}
KS_DECLARE(ks_status_t) blade_handle_identity_register(blade_handle_t *bh, const char *identity)
KS_DECLARE(ks_status_t) blade_handle_local_nodeid_set(blade_handle_t *bh, const char *nodeid)
{
char *key = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bh);
ks_assert(identity);
key = ks_pstrdup(bh->pool, identity);
ks_hash_insert(bh->identities, (void *)key, (void *)KS_TRUE);
ks_rwl_write_lock(bh->local_nodeid_rwl);
if (bh->local_nodeid && nodeid) {
ret = KS_STATUS_NOT_ALLOWED;
goto done;
}
if (!bh->local_nodeid && !nodeid) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
ks_log(KS_LOG_DEBUG, "Identity Registered: %s\n", key);
if (bh->master_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid);
if (nodeid) bh->local_nodeid = ks_pstrdup(bh->pool, nodeid);
return KS_STATUS_SUCCESS;
ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid);
done:
ks_rwl_write_unlock(bh->local_nodeid_rwl);
return ret;
}
KS_DECLARE(ks_status_t) blade_handle_identity_unregister(blade_handle_t *bh, const char *identity)
KS_DECLARE(ks_bool_t) blade_handle_local_nodeid_compare(blade_handle_t *bh, const char *nodeid)
{
ks_bool_t ret = KS_FALSE;
ks_assert(bh);
ks_assert(identity);
ks_assert(nodeid);
ks_log(KS_LOG_DEBUG, "Identity Unregistered: %s\n", identity);
ks_rwl_read_lock(bh->local_nodeid_rwl);
ret = ks_safe_strcasecmp(bh->local_nodeid, nodeid) == 0;
ks_rwl_read_unlock(bh->local_nodeid_rwl);
ks_hash_remove(bh->identities, (void *)identity);
return KS_STATUS_SUCCESS;
return ret;
}
KS_DECLARE(ks_bool_t) blade_handle_identity_local(blade_handle_t *bh, const char *identity)
KS_DECLARE(const char *) blade_handle_master_nodeid_copy(blade_handle_t *bh, ks_pool_t *pool)
{
void *exists = NULL;
const char *nodeid = NULL;
ks_assert(bh);
ks_assert(identity);
ks_assert(pool);
exists = ks_hash_search(bh->routes, (void *)identity, KS_READLOCKED);
ks_hash_read_unlock(bh->routes);
ks_rwl_write_lock(bh->master_nodeid_rwl);
if (bh->master_nodeid) nodeid = ks_pstrdup(pool, bh->master_nodeid);
ks_rwl_write_unlock(bh->master_nodeid_rwl);
return (ks_bool_t)(uintptr_t)exists == KS_TRUE;
return nodeid;
}
KS_DECLARE(ks_status_t) blade_handle_master_nodeid_set(blade_handle_t *bh, const char *nodeid)
{
ks_assert(bh);
ks_rwl_write_lock(bh->master_nodeid_rwl);
if (bh->master_nodeid) ks_pool_free(bh->pool, &bh->master_nodeid);
if (nodeid) bh->master_nodeid = ks_pstrdup(bh->pool, nodeid);
ks_rwl_write_unlock(bh->master_nodeid_rwl);
ks_log(KS_LOG_DEBUG, "Master NodeID: %s\n", nodeid);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_handle_master_nodeid_compare(blade_handle_t *bh, const char *nodeid)
{
ks_bool_t ret = KS_FALSE;
ks_assert(bh);
ks_assert(nodeid);
ks_rwl_read_lock(bh->master_nodeid_rwl);
ret = ks_safe_strcasecmp(bh->master_nodeid, nodeid) == 0;
ks_rwl_read_unlock(bh->master_nodeid_rwl);
return ret;
}
KS_DECLARE(ks_status_t) blade_handle_realm_register(blade_handle_t *bh, const char *realm)
@ -485,19 +525,19 @@ KS_DECLARE(ks_hash_t *) blade_handle_realms_get(blade_handle_t *bh)
return bh->realms;
}
KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *identity, const char *id)
KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *nodeid, const char *sessionid)
{
char *key = NULL;
char *value = NULL;
ks_assert(bh);
ks_assert(identity);
ks_assert(id);
ks_assert(nodeid);
ks_assert(sessionid);
key = ks_pstrdup(bh->pool, identity);
value = ks_pstrdup(bh->pool, id);
key = ks_pstrdup(bh->pool, nodeid);
value = ks_pstrdup(bh->pool, sessionid);
ks_hash_insert(bh->identities, (void *)key, (void *)value);
ks_hash_insert(bh->routes, (void *)key, (void *)value);
ks_log(KS_LOG_DEBUG, "Route Added: %s through %s\n", key, value);
@ -506,33 +546,71 @@ KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *i
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char *identity)
KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char *nodeid)
{
ks_hash_t *protocols = NULL;
ks_assert(bh);
ks_assert(identity);
ks_assert(nodeid);
ks_log(KS_LOG_DEBUG, "Route Removed: %s\n", identity);
ks_log(KS_LOG_DEBUG, "Route Removed: %s\n", nodeid);
ks_hash_remove(bh->identities, (void *)identity);
ks_hash_remove(bh->routes, (void *)nodeid);
// @todo when a route is removed, upstream needs to be notified, for whatever reason the identity is no longer
// available through this node so the routes leading here need to be cleared, the disconnected node cannot be informed
// and does not need to change it's routes because upstream is not included in routes (and thus should never call to remove
// a route if an upstream session is closed)
// @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through
// this node so the routes leading here need to be cleared by passing a "blade.route" upstream to remove the routes, this
// should actually happen only for local sessions, and blade.route should be always passed upstream AND processed locally, so
// we don't want to duplicate blade.route calls already being passed up if this route is not a local session
// @note everything below here is for master-only cleanup when a node is no longer routable
// @note protocols are cleaned up here because routes can be removed that are not locally connected with a session but still
// have protocols published to the master node from further downstream, in which case if a route is announced upstream to be
// removed, a master node is still able to catch that here even when there is no direct session, but is also hit when there
// is a direct session being terminated
ks_hash_write_lock(bh->protocols);
protocols = (ks_hash_t *)ks_hash_search(bh->protocols_cleanup, (void *)nodeid, KS_UNLOCKED);
if (protocols) {
for (ks_hash_iterator_t *it = ks_hash_first(protocols, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
blade_protocol_t *bp = NULL;
ks_hash_t *providers = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
bp = (blade_protocol_t *)ks_hash_search(bh->protocols, key, KS_UNLOCKED);
ks_assert(bp); // should not happen when a cleanup still has a provider tracked for a protocol
ks_log(KS_LOG_DEBUG, "Protocol (%s) provider (%s) removed\n", key, nodeid);
blade_protocol_providers_remove(bp, nodeid);
providers = blade_protocol_providers_get(bp);
if (ks_hash_count(providers) == 0) {
// @note this depends on locking something outside of the protocol that won't be destroyed, like the top level
// protocols hash, but assumes then that any reader keeps the top level hash read locked while using the protocol
// so it cannot be deleted
ks_log(KS_LOG_DEBUG, "Protocol (%s) removed\n", key);
ks_hash_remove(bh->protocols, key);
}
}
ks_hash_remove(bh->protocols_cleanup, (void *)nodeid);
}
ks_hash_write_unlock(bh->protocols);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_session_t *) blade_handle_route_lookup(blade_handle_t *bh, const char *identity)
KS_DECLARE(blade_session_t *) blade_handle_route_lookup(blade_handle_t *bh, const char *nodeid)
{
blade_session_t *bs = NULL;
const char *id = NULL;
const char *sessionid = NULL;
ks_assert(bh);
ks_assert(identity);
ks_assert(nodeid);
id = ks_hash_search(bh->routes, (void *)identity, KS_READLOCKED);
if (id) bs = blade_handle_sessions_lookup(bh, id);
sessionid = ks_hash_search(bh->routes, (void *)nodeid, KS_READLOCKED);
if (sessionid) bs = blade_handle_sessions_lookup(bh, sessionid);
ks_hash_read_unlock(bh->routes);
return bs;
@ -693,7 +771,8 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
ks_assert(bh);
ks_assert(target);
if (bh->upstream_id) return KS_STATUS_DUPLICATE_OPERATION;
// @todo better locking here
if (bh->local_nodeid) return KS_STATUS_DUPLICATE_OPERATION;
ks_hash_read_lock(bh->transports);
@ -790,6 +869,7 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
blade_handle_t *bh = NULL;
const char *id = NULL;
ks_hash_iterator_t *it = NULL;
ks_bool_t upstream = KS_FALSE;
ks_assert(bs);
@ -804,28 +884,20 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
ks_hash_write_lock(bh->sessions);
if (ks_hash_remove(bh->sessions, (void *)id) == NULL) ret = KS_STATUS_FAIL;
ks_mutex_lock(bh->upstream_mutex);
if (bh->upstream_id && !ks_safe_strcasecmp(bh->upstream_id, id)) {
// the session is the upstream being terminated, so clear out all of the local identities and realms from the handle,
// @todo this complicates any remaining connected downstream sessions, because they are based on realms that may not
// be available after a new upstream is registered, therefore all downstream sessions should be fully terminated when
// this happens, and ignore inbound downstream sessions until the upstream is available again, and require new
// downstream inbound sessions to be completely reestablished fresh
while ((it = ks_hash_first(bh->identities, KS_UNLOCKED))) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
ks_hash_remove(bh->identities, key);
}
ks_rwl_read_lock(bh->local_nodeid_rwl);
upstream = bh->local_nodeid && !ks_safe_strcasecmp(bh->local_nodeid, id);
ks_rwl_read_unlock(bh->local_nodeid_rwl);
if (upstream) {
blade_handle_local_nodeid_set(bh, NULL);
blade_handle_master_nodeid_set(bh, NULL);
while ((it = ks_hash_first(bh->realms, KS_UNLOCKED))) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
ks_hash_remove(bh->realms, key);
}
ks_pool_free(bh->pool, &bh->upstream_id);
}
ks_mutex_unlock(bh->upstream_mutex);
ks_hash_write_unlock(bh->sessions);
@ -858,29 +930,19 @@ KS_DECLARE(blade_session_t *) blade_handle_sessions_lookup(blade_handle_t *bh, c
return bs;
}
KS_DECLARE(ks_status_t) blade_handle_upstream_set(blade_handle_t *bh, const char *id)
KS_DECLARE(blade_session_t *) blade_handle_sessions_upstream(blade_handle_t *bh)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_assert(bh);
ks_mutex_lock(bh->upstream_mutex);
ks_rwl_read_lock(bh->local_nodeid_rwl);
bs = blade_handle_sessions_lookup(bh, bh->local_nodeid);
ks_rwl_read_unlock(bh->local_nodeid_rwl);
if (bh->upstream_id) {
ret = KS_STATUS_DUPLICATE_OPERATION;
goto done;
}
bh->upstream_id = ks_pstrdup(bh->pool, id);
done:
ks_mutex_unlock(bh->upstream_mutex);
return ret;
return bs;
}
KS_DECLARE(void) blade_handle_sessions_send(blade_handle_t *bh, ks_list_t *sessions, const char *exclude, cJSON *json)
{
blade_session_t *bs = NULL;
@ -957,6 +1019,296 @@ KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *b
}
// BLADE PROTOCOL HANDLERS
// This is where the real work happens for the blade protocol, where routing is done based on the specific intent of the given message, these exist here to simplify
// access to the internals of the blade_handle_t where all the relevant data is stored
// Each jsonrpc method for the blade protocol will require 3 functions: a request generator, a request handler, and a response handler
// Responses can be generated internally and are not required for an isolated entry point, in the case of further external layers like blade.execute, they will be
// handled within the blade protocol handlers to dispatch further execution callbacks, however each jsonrpc exposed to support the blade protocols may deal with
// routing in their own ways as they have different requirements for different blade layer messages.
// blade.publish notes
// This jsonrpc is used to notify the master of a new protocol being made available, the purpose of which is to make such protocols able to be located by other nodes with
// only minimal information about the protocol, particularly it's registered name, which is most often the main/only namespace for the protocols methods, however it is
// possible that additional namespaces could be included in this publish as well if the namespaces are defined separately from the protocol name, and the protocol name could
// result in an implicitly created namespace in addition to any others provided.
// Routing Notes:
// When routing a publish request, it only needs to travel upstream to the master node for processing, however in order to receive a publish response the original request
// and response must carry a nodeid for the requesting node (requester-nodeid), technically the master does not need to be provided, but for posterity and consistency
// the master nodeid can be provided in whatever is used for the responder of a request (responder-nodeid).
// By using requester-nodeid and responder-nodeid, these do not need to be swapped in the response, they can simply be copied over, and the routing looks at the
// appropriate field depending on whether it is handling a request or a response to determine the appropriate downstream nodeid
// blade.publish request generator
// @todo add additional async callback to be called upon a publish response to inform caller of the result?
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm)
{
ks_status_t ret = KS_STATUS_SUCCESS;
cJSON *req = NULL;
cJSON *req_params = NULL;
blade_session_t *bs = NULL;
ks_assert(bh);
ks_assert(name);
ks_assert(realm);
if (!(bs = blade_handle_sessions_upstream(bh))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
blade_jsonrpc_request_raw_create(blade_handle_pool_get(bh), &req, &req_params, NULL, "blade.publish");
// fill in the req_params
cJSON_AddStringToObject(req_params, "protocol", name);
cJSON_AddStringToObject(req_params, "realm", realm);
ks_rwl_read_lock(bh->local_nodeid_rwl);
cJSON_AddStringToObject(req_params, "requester-nodeid", bh->local_nodeid);
ks_rwl_read_unlock(bh->local_nodeid_rwl);
ks_rwl_read_lock(bh->master_nodeid_rwl);
cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid);
ks_rwl_read_unlock(bh->master_nodeid_rwl);
ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
ret = blade_session_send(bs, req, blade_protocol_publish_response_handler);
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
return ret;
}
// blade.publish request handler
ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_requester_nodeid = NULL;
const char *req_params_responder_nodeid = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
blade_protocol_t *bp = NULL;
const char *bp_key = NULL;
ks_hash_t *bp_cleanup = NULL;
ks_assert(breq);
bh = blade_jsonrpc_request_handle_get(breq);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_request_sessionid_get(breq));
ks_assert(bs);
req = blade_jsonrpc_request_message_get(breq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (!req_params) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'params' object\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params object");
blade_session_send(bs, res, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'protocol'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params protocol");
blade_session_send(bs, res, NULL);
goto done;
}
req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
if (!req_params_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'realm'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params realm");
blade_session_send(bs, res, NULL);
goto done;
}
// @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
if (!req_params_requester_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'requester-nodeid'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params requester-nodeid");
blade_session_send(bs, res, NULL);
goto done;
}
req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
if (!req_params_responder_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request missing 'responder-nodeid'\n", blade_session_id_get(bs));
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params responder-nodeid");
blade_session_send(bs, res, NULL);
goto done;
}
if (!blade_handle_master_nodeid_compare(bh, req_params_responder_nodeid)) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Invalid params responder-nodeid");
blade_session_send(bs, res, NULL);
goto done;
}
// errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates
// errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing
if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) {
// not meant for local processing, continue with routing which on a publish request, it always goes upstream to the master node
blade_session_t *bsu = blade_handle_sessions_upstream(bh);
if (!bsu) {
cJSON *res_error = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable");
// needed in case this error must propagate further than the session which sent it
cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded
blade_session_send(bs, res, NULL);
goto done;
}
// @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified
// by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this
// and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being
// routed would not enter into these handlers and would not leave a footprint passing through routers
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu));
blade_session_send(bsu, req, blade_protocol_publish_response_handler);
blade_session_read_unlock(bsu);
goto done;
}
// this local node must be responder-nodeid for the request, so process the request
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm);
ks_hash_write_lock(bh->protocols);
bp = (blade_protocol_t *)ks_hash_search(bh->protocols, bp_key, KS_UNLOCKED);
if (bp) {
// @todo deal with exclusive stuff when the protocol is already registered
}
if (!bp) {
blade_protocol_create(&bp, bh->pool, req_params_protocol, req_params_realm);
ks_assert(bp);
ks_log(KS_LOG_DEBUG, "Protocol (%s) added\n", bp_key);
ks_hash_insert(bh->protocols, (void *)ks_pstrdup(bh->pool, bp_key), bp);
}
bp_cleanup = (ks_hash_t *)ks_hash_search(bh->protocols_cleanup, req_params_requester_nodeid, KS_UNLOCKED);
if (!bp_cleanup) {
ks_hash_create(&bp_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
ks_assert(bp_cleanup);
ks_hash_insert(bh->protocols_cleanup, (void *)ks_pstrdup(bh->pool, req_params_requester_nodeid), bp_cleanup);
}
ks_hash_insert(bp_cleanup, (void *)ks_pstrdup(bh->pool, bp_key), (void *)KS_TRUE);
blade_protocol_providers_add(bp, req_params_requester_nodeid);
ks_log(KS_LOG_DEBUG, "Protocol (%s) provider (%s) added\n", bp_key, req_params_requester_nodeid);
ks_hash_write_unlock(bh->protocols);
// build the actual response finally
blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq));
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL);
done:
if (res) cJSON_Delete(res);
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
// blade.publish response handler
ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *res = NULL;
cJSON *res_error = NULL;
cJSON *res_result = NULL;
cJSON *res_object = NULL;
const char *requester_nodeid = NULL;
const char *responder_nodeid = NULL;
ks_assert(bres);
bh = blade_jsonrpc_response_handle_get(bres);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_response_sessionid_get(bres));
ks_assert(bs);
res = blade_jsonrpc_response_message_get(bres);
ks_assert(res);
res_error = cJSON_GetObjectItem(res, "error");
res_result = cJSON_GetObjectItem(res, "result");
if (!res_error && !res_result) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish response missing 'error' or 'result' object\n", blade_session_id_get(bs));
goto done;
}
res_object = res_error ? res_error : res_result;
requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) {
blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid);
if (!bsd) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid);
goto done;
}
ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd));
blade_session_send(bsd, res, NULL);
blade_session_read_unlock(bsd);
goto done;
}
// this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response
ks_log(KS_LOG_DEBUG, "Session (%s) publish response processing\n", blade_session_id_get(bs));
if (res_error) {
// @todo process error response
ks_log(KS_LOG_DEBUG, "Session (%s) publish response error... add details\n", blade_session_id_get(bs));
goto done;
}
// @todo process result response
done:
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -719,18 +719,18 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
cJSON *json_res = NULL;
cJSON *json_params = NULL;
cJSON *json_result = NULL;
cJSON *json_result_identities = NULL;
cJSON *json_result_realms = NULL;
//cJSON *error = NULL;
blade_session_t *bs = NULL;
blade_handle_t *bh = NULL;
ks_pool_t *pool = NULL;
const char *jsonrpc = NULL;
const char *id = NULL;
const char *method = NULL;
const char *sid = NULL;
const char *nodeid = NULL;
const char *master_nodeid = NULL;
ks_time_t timeout;
ks_hash_iterator_t *it = NULL;
ks_hash_t *identities = NULL;
ks_hash_t *realms = NULL;
ks_assert(bc);
@ -795,15 +795,15 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
json_params = cJSON_GetObjectItem(json_req, "params");
if (json_params) {
sid = cJSON_GetObjectCstr(json_params, "session-id");
if (sid) {
nodeid = cJSON_GetObjectCstr(json_params, "session-id");
if (nodeid) {
// @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid
ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sid);
ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", nodeid);
}
}
if (sid) {
bs = blade_handle_sessions_lookup(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
if (nodeid) {
bs = blade_handle_sessions_lookup(bh, nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
if (bs) {
if (blade_session_terminating(bs)) {
blade_session_read_unlock(bs);
@ -816,18 +816,16 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
}
if (!bs) {
ks_pool_t *pool = NULL;
blade_session_create(&bs, bh, NULL);
ks_assert(bs);
sid = blade_session_id_get(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", sid);
nodeid = blade_session_id_get(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", nodeid);
blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", sid);
ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", nodeid);
blade_transport_wss_jsonrpc_error_send(bc, id, -32603, "Internal error, session could not be started");
blade_session_read_unlock(bs);
blade_session_destroy(&bs);
@ -837,10 +835,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
// This is an inbound connection, thus it is always creating a downstream session
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", sid);
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", nodeid);
blade_handle_sessions_add(bs);
pool = blade_connection_pool_get(bc);
// This is primarily to cleanup the routes added to the blade_handle for main routing when a session terminates, these don't have a lot of use otherwise but it will keep the main route table
// from having long running write locks when a session cleans up
blade_session_route_add(bs, nodeid);
// This is the main routing entry to make an identity routable through a session when a message is received for a given identity in this table, these allow efficiently determine which session
// a message should pass through when it does not match the local node identities from blade_handle_identity_register(), and must be matched with a call to blade_session_route_add() for cleanup,
// additionally when a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.register" would also
// result in the new identities being added as routes however new entire wildcard subrealm registration would require a special process for matching any identities from those subrealms
blade_handle_route_add(bh, nodeid, nodeid);
// iterate the realms from the handle ultimately provided by the master router node, and obtained when establishing upstream sessions (see outbound handler), for each of
// these realms an identity based on the sessionid will be created, in the future this process can be adjusted based on authentication which is currently skipped
@ -850,34 +855,10 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
for (it = ks_hash_first(realms, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
char *identity = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
identity = ks_psprintf(pool, "%s@%s", sid, (const char *)key);
// @note This is where routing gets complicated, lots of stuff has to be tracked for different reasons and different perspectives, and a lot of it is determined and passed during this
// initial "blade.connect" message handler. As this is a downstream session connecting inbound, this node is responsible for giving the new node identities to be known by, the realms
// which the new node is permitted to register or route additional new identities under, and making sure messages for these new identities can be routed to the correct downstream session
// when they arrive on this node, this includes making sure upstream nodes are notified of routing changes.
// This tracks the identities that are specifically for the remote node of this session NOT including further downstream sessions, the remote node calls blade_handle_identity_register()
// when these are given to it, these would also include additional explicitly registered identities via "blade.register", but not those received via "blade.route" for new identities
blade_session_identity_add(bs, identity);
// This tracks the realms that are permitted for the remote node of this session to register or route new identities, these include realms provided from upstream as implicit initial realms
// that this downstream session may register or route additional identities under, the remote node calls blade_handle_realm_register() for these, additional explicit subrealm registrations
// from "blade.register" would appear in here as well, and "blade.route" has no impact on these
blade_session_realm_add(bs, (const char *)key);
// This is primarily to cleanup the routes added to the blade_handle for main routing when a session terminates, these don't have a lot of use otherwise but it will keep the main route table
// from having long running write locks when a session cleans up
blade_session_route_add(bs, identity);
// This is the main routing entry to make an identity routable through a session when a message is received for a given identity in this table, these allow efficiently determine which session
// a message should pass through when it does not match the local node identities from blade_handle_identity_register(), and must be matched with a call to blade_session_route_add() for cleanup,
// additionally when a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.register" would also
// result in the new identities being added as routes however new entire wildcard subrealm registration would require a special process for matching any identities from those subrealms
blade_handle_route_add(bh, identity, sid);
ks_pool_free(pool, &identity);
}
ks_hash_read_unlock(realms);
}
@ -885,34 +866,27 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
blade_jsonrpc_response_raw_create(&json_res, &json_result, id);
ks_assert(json_res);
cJSON_AddStringToObject(json_result, "session-id", sid);
cJSON_AddStringToObject(json_result, "nodeid", nodeid);
// add the list of actual identities the local node will recognize the remote node, this is the same list that the remote side would be adding to the handle with blade_handle_identity_add()
// and may contain additional identities that are explicitly registered by the remote node, this ensures upon reconnect that the same list of identities gets provided to the remote node to refresh
// the remote nodes local identities on the edge case that the session times out on the remote end while reconnecting
json_result_identities = cJSON_CreateArray();
cJSON_AddItemToObject(json_result, "identities", json_result_identities);
identities = blade_session_identities_get(bs);
ks_hash_read_lock(identities);
for (it = ks_hash_first(identities, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
cJSON_AddItemToArray(json_result_identities, cJSON_CreateString((const char *)key));
pool = blade_handle_pool_get(bh);
master_nodeid = blade_handle_master_nodeid_copy(bh, pool);
if (!master_nodeid) {
ks_log(KS_LOG_DEBUG, "Master nodeid unavailable\n");
blade_transport_wss_jsonrpc_error_send(bc, id, -32602, "Master nodeid unavailable");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
ks_hash_read_unlock(identities);
cJSON_AddStringToObject(json_result, "master-nodeid", master_nodeid);
ks_pool_free(pool, &master_nodeid);
json_result_realms = cJSON_CreateArray();
cJSON_AddItemToObject(json_result, "realms", json_result_realms);
// add the list of actual realms the local node will permit the remote node to register or route, this is the same list that the remote side would be adding to the handle with blade_handle_realm_add()
// and may contain additional subrealms that are explicitly registered by the remote node, this ensures upon reconnect that the same list of realms gets provided to the remote node to refresh
// the remote nodes local realms on the edge case that the session times out on the remote end while reconnecting
json_result_realms = cJSON_CreateArray();
cJSON_AddItemToObject(json_result, "realms", json_result_realms);
realms = blade_session_realms_get(bs);
ks_hash_read_lock(realms);
for (it = ks_hash_first(realms, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
@ -928,7 +902,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
// This starts the final process for associating the connection to the session, including for reconnecting to an existing session, this simply
// associates the session to this connection, upon return the remainder of the association for the session to the connection is handled along
// with making sure both this connection and the session state machines are in running states
blade_connection_session_set(bc, sid);
blade_connection_session_set(bc, nodeid);
// @todo end of reusable handler for "blade.connect" request
@ -964,11 +938,10 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
const char *id = NULL;
cJSON *json_error = NULL;
cJSON *json_result = NULL;
cJSON *json_result_identities = NULL;
int json_result_identities_size = 0;
cJSON *json_result_realms = NULL;
int json_result_realms_size = 0;
const char *sid = NULL;
const char *nodeid = NULL;
const char *master_nodeid = NULL;
blade_session_t *bs = NULL;
ks_assert(bc);
@ -1045,16 +1018,16 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
goto done;
}
sid = cJSON_GetObjectCstr(json_result, "session-id");
if (!sid) {
ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'session-id'\n");
nodeid = cJSON_GetObjectCstr(json_result, "nodeid");
if (!nodeid) {
ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'nodeid'\n");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
json_result_identities = cJSON_GetObjectItem(json_result, "identities");
if (!json_result_identities || json_result_identities->type != cJSON_Array || (json_result_identities_size = cJSON_GetArraySize(json_result_identities)) <= 0) {
ks_log(KS_LOG_DEBUG, "Received message is missing 'identities'\n");
master_nodeid = cJSON_GetObjectCstr(json_result, "master-nodeid");
if (!master_nodeid) {
ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'master-nodeid'\n");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
@ -1066,16 +1039,15 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
goto done;
}
if (sid) {
// @todo validate uuid format by parsing, not currently available in uuid functions
bs = blade_handle_sessions_lookup(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
if (bs) {
ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
}
// @todo validate uuid format by parsing, not currently available in uuid functions
bs = blade_handle_sessions_lookup(bh, nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
if (bs) {
ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
}
if (!bs) {
blade_session_create(&bs, bh, sid);
blade_session_create(&bs, bh, nodeid);
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
@ -1090,9 +1062,9 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
goto done;
}
// This is an outbound connection, thus it is always creating an upstream session
// This is an outbound connection, thus it is always creating an upstream session, defined by the sessionid matching the local_nodeid in the handle
if (blade_handle_upstream_set(bh, sid) != KS_STATUS_SUCCESS) {
if (blade_handle_local_nodeid_set(bh, nodeid) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Session (%s) abandoned, upstream already available\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
blade_session_hangup(bs);
@ -1104,14 +1076,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
blade_handle_sessions_add(bs);
// new upstream sessions register the realms and local identities based on the realms and identities of the response, however
// upstream sessions do not register any routes and thus do not track any routes on the session
// iterate identities and register to handle as local node identities
for (int index = 0; index < json_result_identities_size; ++index) {
cJSON *elem = cJSON_GetArrayItem(json_result_identities, index);
blade_handle_identity_register(bh, elem->valuestring);
}
blade_handle_master_nodeid_set(bh, master_nodeid);
// iterate realms and register to handle as permitted realms for future registrations
for (int index = 0; index < json_result_realms_size; ++index) {

View File

@ -44,8 +44,7 @@
#include "blade_jsonrpc.h"
#include "blade_connection.h"
#include "blade_session.h"
#include "ks_dht.h"
#include "ks_bencode.h"
#include "blade_protocol.h"
#include "blade_transport_wss.h"

View File

@ -51,6 +51,8 @@ KS_DECLARE(ks_status_t) blade_jsonrpc_request_create(blade_jsonrpc_request_t **b
blade_jsonrpc_response_callback_t callback);
KS_DECLARE(ks_status_t) blade_jsonrpc_request_destroy(blade_jsonrpc_request_t **bjsonrpcreqP);
KS_DECLARE(blade_handle_t *) blade_jsonrpc_request_handle_get(blade_jsonrpc_request_t *bjsonrpcreq);
KS_DECLARE(const char *) blade_jsonrpc_request_sessionid_get(blade_jsonrpc_request_t *bjsonrpcreq);
KS_DECLARE(cJSON *) blade_jsonrpc_request_message_get(blade_jsonrpc_request_t *bjsonrpcreq);
KS_DECLARE(const char *) blade_jsonrpc_request_messageid_get(blade_jsonrpc_request_t *bjsonrpcreq);
KS_DECLARE(blade_jsonrpc_response_callback_t) blade_jsonrpc_request_callback_get(blade_jsonrpc_request_t *bjsonrpcreq);
KS_DECLARE(ks_status_t) blade_jsonrpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);
@ -63,6 +65,11 @@ KS_DECLARE(ks_status_t) blade_jsonrpc_response_create(blade_jsonrpc_response_t *
cJSON *json);
KS_DECLARE(ks_status_t) blade_jsonrpc_response_destroy(blade_jsonrpc_response_t **bjsonrpcresP);
KS_DECLARE(ks_status_t) blade_jsonrpc_response_raw_create(cJSON **json, cJSON **result, const char *id);
KS_DECLARE(blade_handle_t *) blade_jsonrpc_response_handle_get(blade_jsonrpc_response_t *bjsonrpcres);
KS_DECLARE(const char *) blade_jsonrpc_response_sessionid_get(blade_jsonrpc_response_t *bjsonrpcres);
KS_DECLARE(blade_jsonrpc_request_t *) blade_jsonrpc_response_request_get(blade_jsonrpc_response_t *bjsonrpcres);
KS_DECLARE(cJSON *) blade_jsonrpc_response_message_get(blade_jsonrpc_response_t *bjsonrpcres);
KS_DECLARE(ks_status_t) blade_jsonrpc_error_raw_create(cJSON **json, cJSON **error, const char *id, int32_t code, const char *message);
KS_END_EXTERN_C

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2017, Shane Bryldt
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BLADE_PROTOCOL_H_
#define _BLADE_PROTOCOL_H_
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
KS_DECLARE(ks_hash_t *) blade_protocol_providers_get(blade_protocol_t *bp);
KS_DECLARE(ks_status_t) blade_protocol_providers_add(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_status_t) blade_protocol_providers_remove(blade_protocol_t *bp, const char *nodeid);
KS_END_EXTERN_C
#endif
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -43,9 +43,6 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs);
KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs);
KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_identity_add(blade_session_t *bs, const char *identity);
KS_DECLARE(ks_status_t) blade_session_identity_remove(blade_session_t *bs, const char *identity);
KS_DECLARE(ks_hash_t *) blade_session_identities_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_realm_add(blade_session_t *bs, const char *realm);
KS_DECLARE(ks_status_t) blade_session_realm_remove(blade_session_t *bs, const char *realm);
KS_DECLARE(ks_hash_t *) blade_session_realms_get(blade_session_t *bs);

View File

@ -48,16 +48,20 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
KS_DECLARE(ks_pool_t *) blade_handle_pool_get(blade_handle_t *bh);
KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_identity_register(blade_handle_t *bh, const char *identity);
KS_DECLARE(ks_status_t) blade_handle_identity_unregister(blade_handle_t *bh, const char *identity);
KS_DECLARE(ks_status_t) blade_handle_local_nodeid_set(blade_handle_t *bh, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_handle_local_nodeid_compare(blade_handle_t *bh, const char *nodeid);
KS_DECLARE(const char *) blade_handle_master_nodeid_copy(blade_handle_t *bh, ks_pool_t *pool);
KS_DECLARE(ks_status_t) blade_handle_master_nodeid_set(blade_handle_t *bh, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_handle_master_nodeid_compare(blade_handle_t *bh, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_realm_register(blade_handle_t *bh, const char *realm);
KS_DECLARE(ks_status_t) blade_handle_realm_unregister(blade_handle_t *bh, const char *realm);
KS_DECLARE(ks_hash_t *) blade_handle_realms_get(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *identity, const char *id);
KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char *identity);
KS_DECLARE(blade_session_t *) blade_handle_route_lookup(blade_handle_t *bh, const char *identity);
KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *nodeid, const char *sessionid);
KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char *nodeid);
KS_DECLARE(blade_session_t *) blade_handle_route_lookup(blade_handle_t *bh, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_transport_t *bt);
KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_transport_t *bt);
@ -80,14 +84,14 @@ KS_DECLARE(blade_connection_t *) blade_handle_connections_lookup(blade_handle_t
KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs);
KS_DECLARE(blade_session_t *) blade_handle_sessions_lookup(blade_handle_t *bh, const char *id);
KS_DECLARE(ks_status_t) blade_handle_upstream_set(blade_handle_t *bh, const char *id);
KS_DECLARE(blade_session_t *) blade_handle_sessions_upstream(blade_handle_t *bh);
KS_DECLARE(void) blade_handle_sessions_send(blade_handle_t *bh, ks_list_t *sessions, const char *exclude, cJSON *json);
KS_DECLARE(ks_status_t) blade_handle_session_state_callback_register(blade_handle_t *bh, void *data, blade_session_state_callback_t callback, const char **id);
KS_DECLARE(ks_status_t) blade_handle_session_state_callback_unregister(blade_handle_t *bh, const char *id);
KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *bs, blade_session_state_condition_t condition);
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm);
KS_END_EXTERN_C
#endif

View File

@ -45,21 +45,18 @@ typedef struct blade_transport_callbacks_s blade_transport_callbacks_t;
typedef struct blade_jsonrpc_s blade_jsonrpc_t;
typedef struct blade_jsonrpc_request_s blade_jsonrpc_request_t;
typedef struct blade_jsonrpc_response_s blade_jsonrpc_response_t;
typedef struct blade_session_callbacks_s blade_session_callbacks_t;
typedef struct blade_connection_s blade_connection_t;
typedef struct blade_session_s blade_session_t;
typedef struct blade_application_s blade_application_t;
typedef struct blade_session_callbacks_s blade_session_callbacks_t;
typedef struct blade_protocol_s blade_protocol_t;
typedef struct blade_protocol_realm_s blade_protocol_realm_t;
typedef struct blade_protocol_method_s blade_protocol_method_t;
typedef struct blade_datastore_s blade_datastore_t;
typedef ks_bool_t (*blade_jsonrpc_request_callback_t)(blade_jsonrpc_request_t *breq, void *data);
typedef ks_bool_t (*blade_jsonrpc_response_callback_t)(blade_jsonrpc_response_t *bres);
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);
typedef enum {
BLADE_CONNECTION_STATE_NONE,
BLADE_CONNECTION_STATE_CLEANUP,

View File

@ -1,6 +1,11 @@
blade:
{
identity = "master1@yourdomain.com";
master:
{
enabled = true;
nodeid = "00000000-0000-0000-0000-000000000000";
realms = ( "mydomain.com" );
};
wss:
{
endpoints:

View File

@ -85,6 +85,10 @@ int main(int argc, char **argv)
blade_identity_destroy(&target);
ks_sleep_ms(5000);
blade_protocol_publish(bh, "test", "mydomain.com");
ks_sleep_ms(5000);
} else loop(bh);
//blade_handle_session_state_callback_unregister(bh, session_state_callback_id);

View File

@ -63,6 +63,7 @@ int main(int argc, char **argv)
//blade_module_t *mod_chat = NULL;
//blade_identity_t *id = NULL;
const char *cfgpath = "blades.cfg";
const char *autoconnect = NULL;
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
@ -71,6 +72,7 @@ int main(int argc, char **argv)
blade_handle_create(&bh);
//if (argc > 1) cfgpath = argv[1];
if (argc > 1) autoconnect = argv[1];
config_init(&config);
if (!config_read_file(&config, cfgpath)) {
@ -98,6 +100,21 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
if (autoconnect) {
blade_connection_t *bc = NULL;
blade_identity_t *target = NULL;
blade_identity_create(&target, blade_handle_pool_get(bh));
if (blade_identity_parse(target, autoconnect) == KS_STATUS_SUCCESS) blade_handle_connect(bh, &bc, target, NULL);
blade_identity_destroy(&target);
ks_sleep_ms(5000);
blade_protocol_publish(bh, "test", "mydomain.com");
}
loop(bh);
blade_handle_destroy(&bh);

View File

@ -1,11 +1,5 @@
blade:
{
master:
{
enabled = true;
user = "00000000-0000-0000-0000-000000000000";
realms = ( "mydomain.com" );
};
transport:
{
wss: