FS-10167: Update to the preliminary blade.subscribe support, added registration of local callback for processing an event received through upcoming blade.broadcast

This commit is contained in:
Shane Bryldt 2017-06-10 20:30:58 -06:00
parent 61f8380b70
commit ca19bee71c
5 changed files with 86 additions and 17 deletions

View File

@ -874,7 +874,7 @@ KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, co
return brpc;
}
KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
KS_DECLARE(ks_bool_t) blade_handle_subscriber_add(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
@ -919,12 +919,12 @@ KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const ch
ks_pool_free(bh->pool, &key);
if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_FALSE, NULL, NULL);
if (bsubP) *bsubP = bsub;
return KS_STATUS_SUCCESS;
return propagate;
}
KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
KS_DECLARE(ks_bool_t) blade_handle_subscriber_remove(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
@ -966,9 +966,9 @@ KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const
ks_pool_free(bh->pool, &key);
if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
if (bsubP) *bsubP = bsub;
return KS_STATUS_SUCCESS;
return propagate;
}
@ -1129,7 +1129,9 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
ks_hash_read_unlock(bh->subscriptions);
if (!unsubbed) {
blade_handle_subscriber_remove(bh, event, protocol, realm, id);
if (blade_handle_subscriber_remove(bh, NULL, event, protocol, realm, id)) {
blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
}
ks_pool_free(bh->pool, &event);
ks_pool_free(bh->pool, &protocol);
ks_pool_free(bh->pool, &realm);
@ -1968,10 +1970,12 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
// blade.subscribe request generator
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_bool_t propagate = KS_FALSE;
blade_subscription_t *bsub = NULL;
ks_assert(bh);
ks_assert(event);
@ -1982,12 +1986,19 @@ KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char
ret = KS_STATUS_DISCONNECTED;
goto done;
}
if (remove) {
blade_handle_subscriber_remove(bh, event, protocol, realm, bh->local_nodeid);
propagate = blade_handle_subscriber_remove(bh, &bsub, event, protocol, realm, bh->local_nodeid);
} else {
blade_handle_subscriber_add(bh, event, protocol, realm, bh->local_nodeid);
propagate = blade_handle_subscriber_add(bh, &bsub, event, protocol, realm, bh->local_nodeid);
ks_assert(event_callback);
}
if (bsub) {
blade_subscription_callback_set(bsub, event_callback);
blade_subscription_callback_data_set(bsub, event_data);
}
if (propagate) ret = blade_protocol_subscribe_raw(bh, event, protocol, realm, remove, callback, data);
done:
if (bs) blade_session_read_unlock(bs);
@ -2049,6 +2060,7 @@ ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq,
ks_bool_t remove = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
ks_bool_t propagate = KS_FALSE;
ks_assert(brpcreq);
@ -2104,11 +2116,13 @@ ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq,
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
if (remove) {
blade_handle_subscriber_remove(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
propagate = blade_handle_subscriber_remove(bh, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
} else {
blade_handle_subscriber_add(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
propagate = blade_handle_subscriber_add(bh, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
}
if (propagate) blade_protocol_subscribe_raw(bh, req_params_event, req_params_protocol, req_params_realm, remove, NULL, NULL);
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));

View File

@ -40,6 +40,9 @@ struct blade_subscription_s {
const char *protocol;
const char *realm;
ks_hash_t *subscribers;
blade_rpc_request_callback_t callback;
void *callback_data;
};
@ -160,6 +163,34 @@ KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription
}
KS_DECLARE(blade_rpc_request_callback_t) blade_subscription_callback_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->callback;
}
KS_DECLARE(void) blade_subscription_callback_set(blade_subscription_t *bsub, blade_rpc_request_callback_t callback)
{
ks_assert(bsub);
bsub->callback = callback;
}
KS_DECLARE(void *) blade_subscription_callback_data_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->callback_data;
}
KS_DECLARE(void) blade_subscription_callback_data_set(blade_subscription_t *bsub, void *data)
{
ks_assert(bsub);
bsub->callback_data = data;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -78,8 +78,8 @@ KS_DECLARE(ks_status_t) blade_handle_protocolrpc_register(blade_rpc_t *brpc);
KS_DECLARE(ks_status_t) blade_handle_protocolrpc_unregister(blade_rpc_t *brpc);
KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, const char *method, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_handle_subscriber_add(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_handle_subscriber_remove(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
@ -109,7 +109,7 @@ KS_DECLARE(cJSON *) blade_protocol_execute_request_params_get(blade_rpc_request_
KS_DECLARE(cJSON *) blade_protocol_execute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C

View File

@ -44,6 +44,10 @@ KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub
KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);
KS_DECLARE(blade_rpc_request_callback_t) blade_subscription_callback_get(blade_subscription_t *bsub);
KS_DECLARE(void) blade_subscription_callback_set(blade_subscription_t *bsub, blade_rpc_request_callback_t callback);
KS_DECLARE(void *) blade_subscription_callback_data_get(blade_subscription_t *bsub);
KS_DECLARE(void) blade_subscription_callback_data_set(blade_subscription_t *bsub, void *data);
KS_END_EXTERN_C
#endif

View File

@ -131,6 +131,26 @@ ks_bool_t blade_subscribe_response_handler(blade_rpc_response_t *brpcres, void *
return KS_FALSE;
}
ks_bool_t test_event_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) test.event request processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
return KS_FALSE;
}
int main(int argc, char **argv)
{
blade_handle_t *bh = NULL;
@ -273,7 +293,7 @@ void command_subscribe(blade_handle_t *bh, char *args)
ks_assert(bh);
ks_assert(args);
blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL);
blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
}
/* For Emacs: