FS-10167: Preliminary version of blade.subscribe is implemented

This commit is contained in:
Shane Bryldt 2017-06-09 21:34:02 -06:00
parent 56d66e9f7a
commit 61f8380b70
10 changed files with 596 additions and 11 deletions

View File

@ -192,6 +192,7 @@
<ClCompile Include="src\blade_identity.c" />
<ClCompile Include="src\blade_rpc.c" />
<ClCompile Include="src\blade_protocol.c" />
<ClCompile Include="src\blade_subscription.c" />
<ClCompile Include="src\blade_transport_wss.c" />
<ClCompile Include="src\blade_session.c" />
<ClCompile Include="src\blade_stack.c" />
@ -204,6 +205,7 @@
<ClInclude Include="src\include\blade_identity.h" />
<ClInclude Include="src\include\blade_rpc.h" />
<ClInclude Include="src\include\blade_protocol.h" />
<ClInclude Include="src\include\blade_subscription.h" />
<ClInclude Include="src\include\blade_transport_wss.h" />
<ClInclude Include="src\include\blade_session.h" />
<ClInclude Include="src\include\blade_stack.h" />

View File

@ -45,6 +45,9 @@
<ClCompile Include="src\blade_rpc.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="src\blade_subscription.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="src\include\unqlite.h">
@ -80,5 +83,8 @@
<ClInclude Include="src\include\blade_rpc.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="src\include\blade_subscription.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
</Project>

View File

@ -280,6 +280,7 @@ KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const ch
return KS_STATUS_SUCCESS;
}
KS_DECLARE(cJSON *) blade_session_properties_get(blade_session_t *bs)
{
ks_assert(bs);

View File

@ -65,6 +65,9 @@ struct blade_handle_s {
ks_hash_t *protocolrpcs; // registered blade_rpc_t, for locally processing protocol messages, keyed by the rpc method
ks_hash_t *subscriptions; // registered blade_subscription_t, subscribers may include the local node
ks_hash_t *subscriptions_cleanup; // cleanup for subscriptions, keyed by the downstream subscriber nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm/event" keys to remove each nodeid from as a subscriber during cleanup
ks_hash_t *connections; // active connections keyed by connection id
ks_hash_t *sessions; // active sessions keyed by session id (which comes from the nodeid of the downstream side of the session, thus an upstream session is keyed under the local_nodeid)
@ -73,14 +76,7 @@ struct blade_handle_s {
// @note everything below this point is exclusively for the master node
// @todo need to track the details from blade.publish, a protocol may be published under multiple realms, and each protocol published to a realm may have multiple target providers
// @todo how does "exclusive" play into the providers, does "exclusive" mean only one provider can exist for a given protocol and realm?
// for now, ignore exclusive and multiple providers, key by "protocol" in a hash, and use a blade_protocol_t to represent a protocol in the context of being published so it can be located by other nodes
// each blade_protocol_t will contain the "protocol", common method/namespace/schema data, and a hash keyed by the "realm", with a value of an object of type blade_protocol_realm_t
// each blade_protocol_realm_t will contain the "realm" and a list of publisher nodeid's, any of which can be chosen at random to use the protocol within the given realm (does "exclusive" only limit this to 1 provider per realm?)
// @todo protocols must be cleaned up when routes are removed due to session terminations, should incorporate a faster way to lookup which protocols are tied to a given nodeid for efficient removal
// create blade_protocol_method_t to represent a method that is executed with blade.execute, and is part of a protocol made available through blade.publish, registered locally by the protocol and method name (protocol.methodname?),
// with a callback handler which should also have the realm available when executed so a single provider can easily provide a protocol for multiple realms with the same method callbacks
ks_hash_t *protocols; // master only: protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate
ks_hash_t *protocols_cleanup; // master only: keyed by the nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup
@ -91,6 +87,7 @@ ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq,
ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
@ -237,6 +234,12 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP)
ks_hash_create(&bh->protocolrpcs, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
ks_assert(bh->protocolrpcs);
ks_hash_create(&bh->subscriptions, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool);
ks_assert(bh->subscriptions);
ks_hash_create(&bh->subscriptions_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool);
ks_assert(bh->subscriptions_cleanup);
ks_hash_create(&bh->connections, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool);
ks_assert(bh->connections);
@ -353,6 +356,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
blade_rpc_create(&brpc, bh, "blade.execute", NULL, NULL, blade_protocol_execute_request_handler, NULL);
blade_handle_corerpc_register(brpc);
blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL);
blade_handle_corerpc_register(brpc);
// register internal transport for secure websockets
blade_transport_wss_create(&bt, bh);
ks_assert(bt);
@ -868,6 +874,104 @@ KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, co
return brpc;
}
KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
ks_hash_t *bsub_cleanup = NULL;
ks_bool_t propagate = KS_FALSE;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(nodeid);
key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
ks_hash_write_lock(bh->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)key, KS_UNLOCKED);
if (!bsub) {
blade_subscription_create(&bsub, bh->pool, event, protocol, realm);
ks_assert(bsub);
ks_hash_insert(bh->subscriptions, (void *)ks_pstrdup(bh->pool, key), bsub);
propagate = KS_TRUE;
}
bsub_cleanup = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)nodeid, KS_UNLOCKED);
if (!bsub_cleanup) {
ks_hash_create(&bsub_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
ks_assert(bsub_cleanup);
ks_log(KS_LOG_DEBUG, "Subscription (%s) added\n", key);
ks_hash_insert(bh->subscriptions_cleanup, (void *)ks_pstrdup(bh->pool, nodeid), bsub_cleanup);
}
ks_hash_insert(bsub_cleanup, (void *)ks_pstrdup(bh->pool, key), (void *)KS_TRUE);
blade_subscription_subscribers_add(bsub, nodeid);
ks_hash_write_unlock(bh->subscriptions);
ks_log(KS_LOG_DEBUG, "Subscription (%s) subscriber (%s) added\n", key, nodeid);
ks_pool_free(bh->pool, &key);
if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_FALSE, NULL, NULL);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
ks_hash_t *bsub_cleanup = NULL;
ks_bool_t propagate = KS_FALSE;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(nodeid);
key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
ks_hash_write_lock(bh->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)key, KS_UNLOCKED);
if (bsub) {
bsub_cleanup = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)nodeid, KS_UNLOCKED);
ks_assert(bsub_cleanup);
ks_hash_remove(bsub_cleanup, key);
if (ks_hash_count(bsub_cleanup) == 0) {
ks_hash_remove(bh->subscriptions_cleanup, (void *)nodeid);
}
ks_log(KS_LOG_DEBUG, "Subscription (%s) subscriber (%s) removed\n", key, nodeid);
blade_subscription_subscribers_remove(bsub, nodeid);
if (ks_hash_count(blade_subscription_subscribers_get(bsub)) == 0) {
ks_log(KS_LOG_DEBUG, "Subscription (%s) removed\n", key);
ks_hash_remove(bh->subscriptions, (void *)key);
propagate = KS_TRUE;
}
}
ks_hash_write_unlock(bh->subscriptions);
ks_pool_free(bh->pool, &key);
if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id)
{
@ -975,20 +1079,63 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_handle_t *bh = NULL;
ks_pool_t *pool = NULL;
const char *id = NULL;
ks_hash_iterator_t *it = NULL;
ks_bool_t upstream = KS_FALSE;
ks_bool_t unsubbed = KS_FALSE;
ks_assert(bs);
bh = blade_session_handle_get(bs);
ks_assert(bh);
pool = blade_handle_pool_get(bh);
ks_assert(pool);
blade_session_write_lock(bs, KS_TRUE);
id = blade_session_id_get(bs);
ks_assert(id);
// @todo this cleanup is a bit messy, move to using the combined key rather than passing around all 3 parts would make this cleaner
while (!unsubbed) {
ks_hash_t *subscriptions = NULL;
const char *event = NULL;
const char *protocol = NULL;
const char *realm = NULL;
ks_hash_read_lock(bh->subscriptions);
subscriptions = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)id, KS_UNLOCKED);
if (!subscriptions) unsubbed = KS_TRUE;
else {
void *key = NULL;
void *value = NULL;
blade_subscription_t *bsub = NULL;
it = ks_hash_first(subscriptions, KS_UNLOCKED);
ks_assert(it);
ks_hash_this(it, (const void **)&key, NULL, &value);
bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, key, KS_UNLOCKED);
ks_assert(bsub);
// @note allocate these to avoid lifecycle issues when the last subscriber is removed causing the subscription to be removed
event = ks_pstrdup(bh->pool, blade_subscription_event_get(bsub));
protocol = ks_pstrdup(bh->pool, blade_subscription_protocol_get(bsub));
realm = ks_pstrdup(bh->pool, blade_subscription_realm_get(bsub));
}
ks_hash_read_unlock(bh->subscriptions);
if (!unsubbed) {
blade_handle_subscriber_remove(bh, event, protocol, realm, id);
ks_pool_free(bh->pool, &event);
ks_pool_free(bh->pool, &protocol);
ks_pool_free(bh->pool, &realm);
}
}
ks_hash_write_lock(bh->sessions);
if (ks_hash_remove(bh->sessions, (void *)id) == NULL) ret = KS_STATUS_FAIL;
@ -1011,8 +1158,6 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
blade_session_write_unlock(bs);
return ret;
}
@ -1821,6 +1966,167 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
blade_session_read_unlock(bs);
}
// blade.subscribe request generator
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
if (!(bs = blade_handle_sessions_upstream(bh))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
if (remove) {
blade_handle_subscriber_remove(bh, event, protocol, realm, bh->local_nodeid);
} else {
blade_handle_subscriber_add(bh, event, protocol, realm, bh->local_nodeid);
}
done:
if (bs) blade_session_read_unlock(bs);
return ret;
}
KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
if (!(bs = blade_handle_sessions_upstream(bh))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
pool = blade_handle_pool_get(bh);
ks_assert(pool);
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (remove) cJSON_AddTrueToObject(req_params, "remove");
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
ret = blade_session_send(bs, req, callback, data);
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
return ret;
}
// blade.subscribe request handler
ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_event = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
cJSON *req_params_remove = NULL;
ks_bool_t remove = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
pool = blade_handle_pool_get(bh);
ks_assert(pool);
bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
req = blade_rpc_request_message_get(brpcreq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (!req_params) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'params' object\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
if (!req_params_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'realm'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_remove = cJSON_GetObjectItem(req_params, "remove");
remove = req_params_remove && req_params_remove->type == cJSON_True;
// @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
if (remove) {
blade_handle_subscriber_remove(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
} else {
blade_handle_subscriber_add(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
}
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_AddStringToObject(res_result, "event", req_params_event);
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
done:
if (res) cJSON_Delete(res);
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -0,0 +1,172 @@
/*
* Copyright (c) 2017, Shane Bryldt
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
struct blade_subscription_s {
ks_pool_t *pool;
const char *event;
const char *protocol;
const char *realm;
ks_hash_t *subscribers;
};
static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_subscription_t *bsub = (blade_subscription_t *)ptr;
ks_assert(bsub);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
if (bsub->event) ks_pool_free(bsub->pool, &bsub->event);
if (bsub->protocol) ks_pool_free(bsub->pool, &bsub->protocol);
if (bsub->realm) ks_pool_free(bsub->pool, &bsub->subscribers);
if (bsub->subscribers) ks_hash_destroy(&bsub->subscribers);
break;
case KS_MPCL_DESTROY:
break;
}
}
KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm)
{
blade_subscription_t *bsub = NULL;
ks_assert(bsubP);
ks_assert(pool);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
bsub = ks_pool_alloc(pool, sizeof(blade_subscription_t));
bsub->pool = pool;
bsub->event = ks_pstrdup(pool, event);
bsub->protocol = ks_pstrdup(pool, protocol);
bsub->realm = ks_pstrdup(pool, realm);
ks_hash_create(&bsub->subscribers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsub->pool);
ks_assert(bsub->subscribers);
ks_pool_set_cleanup(pool, bsub, NULL, blade_subscription_cleanup);
*bsubP = bsub;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP)
{
blade_subscription_t *bsub = NULL;
ks_assert(bsubP);
ks_assert(*bsubP);
bsub = *bsubP;
ks_pool_free(bsub->pool, bsubP);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->event;
}
KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->protocol;
}
KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->realm;
}
KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->subscribers;
}
KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid)
{
char *key = NULL;
ks_assert(bsub);
ks_assert(nodeid);
key = ks_pstrdup(bsub->pool, nodeid);
ks_hash_insert(bsub->subscribers, (void *)key, (void *)KS_TRUE);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid)
{
ks_assert(bsub);
ks_assert(nodeid);
ks_hash_remove(bsub->subscribers, (void *)nodeid);
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -45,6 +45,7 @@
#include "blade_connection.h"
#include "blade_session.h"
#include "blade_protocol.h"
#include "blade_subscription.h"
#include "blade_transport_wss.h"

View File

@ -78,6 +78,9 @@ KS_DECLARE(ks_status_t) blade_handle_protocolrpc_register(blade_rpc_t *brpc);
KS_DECLARE(ks_status_t) blade_handle_protocolrpc_unregister(blade_rpc_t *brpc);
KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, const char *method, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
@ -106,6 +109,9 @@ KS_DECLARE(cJSON *) blade_protocol_execute_request_params_get(blade_rpc_request_
KS_DECLARE(cJSON *) blade_protocol_execute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C
#endif

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2017, Shane Bryldt
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BLADE_SUBSCRIPTION_H_
#define _BLADE_SUBSCRIPTION_H_
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP);
KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub);
KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);
KS_END_EXTERN_C
#endif
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -49,8 +49,9 @@ typedef struct blade_connection_s blade_connection_t;
typedef struct blade_session_s blade_session_t;
typedef struct blade_session_callbacks_s blade_session_callbacks_t;
typedef struct blade_protocol_s blade_protocol_t;
typedef struct blade_protocol_realm_s blade_protocol_realm_t;
typedef struct blade_protocol_method_s blade_protocol_method_t;
typedef struct blade_subscription_s blade_subscription_t;
//typedef struct blade_protocol_realm_s blade_protocol_realm_t;
//typedef struct blade_protocol_method_s blade_protocol_method_t;
typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);

View File

@ -17,10 +17,12 @@ struct command_def_s {
void command_quit(blade_handle_t *bh, char *args);
void command_execute(blade_handle_t *bh, char *args);
void command_subscribe(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "quit", command_quit },
{ "execute", command_execute },
{ "subscribe", command_subscribe },
{ NULL, NULL }
};
@ -109,6 +111,26 @@ ks_bool_t blade_locate_response_handler(blade_rpc_response_t *brpcres, void *dat
return KS_FALSE;
}
ks_bool_t blade_subscribe_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
ks_assert(brpcres);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) blade.subscribe response processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
return KS_FALSE;
}
int main(int argc, char **argv)
{
blade_handle_t *bh = NULL;
@ -246,6 +268,14 @@ void command_execute(blade_handle_t *bh, char *args)
blade_protocol_locate(bh, "test", "mydomain.com", blade_locate_response_handler, NULL);
}
void command_subscribe(blade_handle_t *bh, char *args)
{
ks_assert(bh);
ks_assert(args);
blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL);
}
/* For Emacs:
* Local Variables:
* mode:c