FS-10167: More work on the event channel workflow, switched callback data back to using void* and the assumption callback will clear the data or that it would be cleaned up by a handle shutdown by allocating within the handle pool. Base tests currently working, committing to sync up linux build

This commit is contained in:
Shane Bryldt 2017-08-01 16:30:25 -06:00
parent 4c29e4d630
commit fd3348cafc
11 changed files with 232 additions and 129 deletions

View File

@ -42,7 +42,7 @@ struct blade_rpc_s {
const char *realm;
blade_rpc_request_callback_t callback;
cJSON *data;
void *data;
};
struct blade_rpc_request_s {
@ -54,7 +54,7 @@ struct blade_rpc_request_s {
cJSON *message;
const char *message_id; // pulled from message for easier keying
blade_rpc_response_callback_t callback;
cJSON *data;
void *data;
// @todo ttl to wait for response before injecting an error response locally
};
@ -72,22 +72,22 @@ struct blade_rpc_response_s {
static void blade_rpc_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_rpc_t *brpc = (blade_rpc_t *)ptr;
//blade_rpc_t *brpc = (blade_rpc_t *)ptr;
ks_assert(brpc);
//ks_assert(brpc);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
if (brpc->data) cJSON_Delete(brpc->data);
// @todo delete data if present, requires update to ks_pool for self tracking the pool in allocation header
break;
case KS_MPCL_DESTROY:
break;
}
}
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data)
{
blade_rpc_t *brpc = NULL;
ks_pool_t *pool = NULL;
@ -170,7 +170,7 @@ KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brp
return brpc->callback;
}
KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc)
KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc)
{
ks_assert(brpc);
@ -190,7 +190,7 @@ static void blade_rpc_request_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_
case KS_MPCL_TEARDOWN:
ks_pool_free(brpcreq->pool, (void **)&brpcreq->session_id);
cJSON_Delete(brpcreq->message);
if (brpcreq->data) cJSON_Delete(brpcreq->data);
// @todo delete data if present, requires update to ks_pool for self tracking the pool in allocation header
break;
case KS_MPCL_DESTROY:
break;
@ -203,7 +203,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
const char *session_id,
cJSON *json,
blade_rpc_response_callback_t callback,
cJSON *data)
void *data)
{
blade_rpc_request_t *brpcreq = NULL;
@ -278,7 +278,7 @@ KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_r
return brpcreq->callback;
}
KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
{
ks_assert(brpcreq);
return brpcreq->data;

View File

@ -582,7 +582,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
}
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data)
{
blade_rpc_request_t *brpcreq = NULL;
const char *method = NULL;

View File

@ -47,14 +47,14 @@ struct blade_handle_s {
blade_sessionmgr_t *sessionmgr;
};
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data);
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, void *data);
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
static void blade_handle_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
@ -352,8 +352,20 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
// @todo all higher level errors should be handled by each of the calls internally so that a normal result response can be sent with an error block inside the result
// which is important for implementation of blade.execute where errors can be relayed back to the requester properly
typedef struct blade_rpcsubscribe_data_s blade_rpcsubscribe_data_t;
struct blade_rpcsubscribe_data_s {
ks_pool_t *pool;
blade_rpc_response_callback_t original_callback;
void *original_data;
blade_rpc_request_callback_t channel_callback;
void *channel_data;
const char *relayed_messageid;
};
ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data);
// blade.register request generator
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -390,7 +402,7 @@ done:
}
// blade.register request handler
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -454,7 +466,7 @@ done:
// blade.publish request generator
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -514,7 +526,7 @@ done:
}
// blade.publish request handler
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -644,7 +656,7 @@ done:
// blade.authorize request generator
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -704,7 +716,7 @@ done:
}
// blade.authorize request handler
ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -830,8 +842,6 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON
if (remove) {
if (!res_result_unauthorized_channels) res_result_unauthorized_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_unauthorized_channels, cJSON_CreateString(channel->valuestring));
// @todo unauthorizing channels should force a subscribe remove request for the target if they are subscribed, to prevent further events from reaching the target
// this will require the master node to invoke the subscription removal as opposed to the target who normally invokes subscribe
} else {
if (!res_result_authorized_channels) res_result_authorized_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_authorized_channels, cJSON_CreateString(channel->valuestring));
@ -854,6 +864,10 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
if (res_result_unauthorized_channels) {
blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, NULL, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL);
}
done:
if (res) cJSON_Delete(res);
@ -867,7 +881,7 @@ done:
// @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located
// to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting
// every protocol update to everyone which may actually be a better way than an explicit locate request
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -918,7 +932,7 @@ done:
}
// blade.locate request handler
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -1021,7 +1035,7 @@ done:
// blade.execute request generator
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -1074,7 +1088,7 @@ done:
}
// blade.execute request handler
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
@ -1303,20 +1317,40 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ
}
static void blade_rpcsubscribe_data_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_rpcsubscribe_data_t *brpcsd = (blade_rpcsubscribe_data_t *)ptr;
ks_assert(brpcsd);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
if (brpcsd->relayed_messageid) ks_pool_free(brpcsd->pool, &brpcsd->relayed_messageid);
break;
case KS_MPCL_DESTROY:
break;
}
}
// blade.subscribe request generator
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data)
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_pool_t *pool = NULL;
blade_session_t *bs = NULL;
const char *localid = NULL;
blade_subscription_t *bsub = NULL;
cJSON *temp_data = NULL;
blade_rpcsubscribe_data_t *temp_data = NULL;
ks_assert(bh);
ks_assert(protocol);
ks_assert(realm);
ks_assert(subscribe_channels || unsubscribe_channels);
pool = blade_handle_pool_get(bh);
ks_assert(pool);
// @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
ret = KS_STATUS_DISCONNECTED;
@ -1326,21 +1360,16 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
blade_upstreammgr_localid_copy(bh->upstreammgr, bh->pool, &localid);
ks_assert(localid);
if (unsubscribe_channels) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, unsubscribe_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, protocol, realm, channel->valuestring, localid);
}
}
temp_data = cJSON_CreateObject();
// @note since this is allocated in the handle's pool, if the handle is shutdown during a pending request, then the data
// memory will be cleaned up with the handle, otherwise should be cleaned up in the response callback
temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t));
temp_data->pool = pool;
temp_data->original_callback = callback;
temp_data->original_data = data;
temp_data->channel_callback = channel_callback;
temp_data->channel_data = channel_data;
ks_pool_set_cleanup(pool, temp_data, NULL, blade_rpcsubscribe_data_cleanup);
if (callback) cJSON_AddItemToObject(temp_data, "callback", cJSON_CreatePtr((uintptr_t)callback));
if (data) cJSON_AddItemToObject(temp_data, "data", data);
if (channel_callback) cJSON_AddItemToObject(temp_data, "channel-callback", cJSON_CreatePtr((uintptr_t)channel_callback));
if (channel_data) cJSON_AddItemToObject(temp_data, "channel-data", channel_data);
ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
ks_pool_free(bh->pool, &localid);
@ -1351,7 +1380,7 @@ done:
return ret;
}
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data)
ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -1384,6 +1413,13 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const
pool = blade_handle_pool_get(bh);
ks_assert(pool);
if (unsubscribe_channels) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, unsubscribe_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, protocol, realm, channel->valuestring, subscriber);
}
}
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
cJSON_AddStringToObject(req_params, "protocol", protocol);
@ -1406,7 +1442,7 @@ done:
}
// blade.subscribe request handler
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -1491,16 +1527,18 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
if (req_params_unsubscribe_channels) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
}
}
masterlocal = blade_upstreammgr_masterlocal(blade_handle_upstreammgr_get(bh));
if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
// @note This is normally handled by blade_handle_rpcsubscribe_raw() to ensure authorization removals are processed during the request path
// including on the node they start on, whether that is the master or the subscriber
if (req_params_unsubscribe_channels) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
}
}
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
@ -1532,13 +1570,12 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
} else {
cJSON *temp_data = cJSON_CreateObject();
// @note track this so that when this local node gets a response to this propagated request we know what messageid to propagate the response with
cJSON_AddStringToObject(temp_data, "messageid", blade_rpc_request_messageid_get(brpcreq));
blade_rpcsubscribe_data_t *temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t));
temp_data->pool = pool;
temp_data->relayed_messageid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq));
ks_pool_set_cleanup(pool, temp_data, NULL, blade_rpcsubscribe_data_cleanup);
blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
cJSON_Delete(temp_data);
}
done:
@ -1550,16 +1587,13 @@ done:
}
// blade.subscribe response handler
ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, void *data)
{
ks_bool_t ret = KS_FALSE;
blade_rpc_request_t *brpcreq = NULL;
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
blade_rpc_response_callback_t original_callback = NULL;
cJSON *original_data = NULL;
blade_rpc_request_callback_t channel_callback = NULL;
cJSON *channel_data = NULL;
const char *messageid = NULL;
blade_rpcsubscribe_data_t *temp_data = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
const char *res_result_protocol = NULL;
@ -1573,19 +1607,15 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
ks_assert(brpcres);
ks_assert(data);
brpcreq = blade_rpc_response_request_get(brpcres);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
bs = blade_sessionmgr_session_lookup(bh->sessionmgr, blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
original_data = cJSON_GetObjectItem(data, "data");
original_callback = (blade_rpc_response_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "callback");
channel_data = cJSON_GetObjectItem(data, "channel-data");
channel_callback = (blade_rpc_request_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "channel-callback");
// @note when messageid exists, it means this message is only intended to be examined and relayed, the local node is not the subscriber
messageid = cJSON_GetObjectCstr(data, "messageid");
temp_data = (blade_rpcsubscribe_data_t *)data;
res = blade_rpc_response_message_get(brpcres);
ks_assert(res);
@ -1631,15 +1661,15 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
cJSON_ArrayForEach(channel, res_result_subscribe_channels) {
blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, res_result_protocol, res_result_realm, channel->valuestring, res_result_subscriber_nodeid);
// @note these will only get assigned on the last response, received by the subscriber
if (channel_callback) blade_subscription_callback_set(bsub, channel_callback);
if (channel_data) blade_subscription_callback_data_set(bsub, channel_data);
if (temp_data && temp_data->channel_callback) blade_subscription_callback_set(bsub, temp_data->channel_callback);
if (temp_data && temp_data->channel_data) blade_subscription_callback_data_set(bsub, temp_data->channel_data);
}
}
// @note this will only happen on the last response, received by the subscriber
if (original_callback) ret = original_callback(brpcres, original_data);
if (temp_data && temp_data->original_callback) ret = temp_data->original_callback(brpcres, temp_data->original_data);
if (messageid) {
if (temp_data && temp_data->relayed_messageid) {
blade_session_t *relay = NULL;
if (downstream) {
if (!(relay = blade_upstreammgr_session_get(bh->upstreammgr))) {
@ -1651,7 +1681,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
}
}
blade_rpc_response_raw_create(&res, &res_result, messageid);
blade_rpc_response_raw_create(&res, &res_result, temp_data->relayed_messageid);
cJSON_AddStringToObject(res_result, "protocol", res_result_protocol);
cJSON_AddStringToObject(res_result, "realm", res_result_realm);
@ -1668,13 +1698,14 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS
}
done:
if (temp_data) ks_pool_free(temp_data->pool, &temp_data);
blade_session_read_unlock(bs);
return ret;
}
// blade.broadcast request generator
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
@ -1693,7 +1724,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
}
// blade.broadcast request handler
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;

View File

@ -292,7 +292,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con
}
}
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
const char *bsub_key = NULL;
blade_subscription_t *bsub = NULL;

View File

@ -36,14 +36,14 @@
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_rpc_destroy(blade_rpc_t **brpcP);
KS_DECLARE(blade_handle_t *) blade_rpc_handle_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_method_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_protocol_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_realm_get(blade_rpc_t *brpc);
KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brpc);
KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc);
KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc);
KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
blade_handle_t *bh,
@ -51,7 +51,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
const char *session_id,
cJSON *json,
blade_rpc_response_callback_t callback,
cJSON *data);
void *data);
KS_DECLARE(ks_status_t) blade_rpc_request_destroy(blade_rpc_request_t **brpcreqP);
KS_DECLARE(ks_status_t) blade_rpc_request_duplicate(blade_rpc_request_t **brpcreqP, blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *brpcreq);
@ -59,7 +59,7 @@ KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *br
KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_rpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);

View File

@ -62,7 +62,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);

View File

@ -59,25 +59,24 @@ KS_DECLARE(blade_sessionmgr_t *) blade_handle_sessionmgr_get(blade_handle_t *bh)
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data);
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
KS_END_EXTERN_C

View File

@ -43,7 +43,7 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C
#endif

View File

@ -62,8 +62,8 @@ typedef struct blade_connectionmgr_s blade_connectionmgr_t;
typedef struct blade_sessionmgr_s blade_sessionmgr_t;
typedef struct blade_session_callback_data_s blade_session_callback_data_t;
typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, cJSON *data);
typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, cJSON *data);
typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);
typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, void *data);
typedef enum {

View File

@ -18,6 +18,8 @@ struct command_def_s {
void command_quit(blade_handle_t *bh, char *args);
void command_locate(blade_handle_t *bh, char *args);
void command_join(blade_handle_t *bh, char *args);
void command_subscribe(blade_handle_t *bh, char *args);
void command_unsubscribe(blade_handle_t *bh, char *args);
void command_leave(blade_handle_t *bh, char *args);
void command_talk(blade_handle_t *bh, char *args);
@ -25,6 +27,8 @@ static const struct command_def_s command_defs[] = {
{ "quit", command_quit },
{ "locate", command_locate },
{ "join", command_join },
{ "subscribe", command_subscribe },
{ "unsubscribe", command_unsubscribe },
{ "leave", command_leave },
{ "talk", command_talk },
@ -33,7 +37,7 @@ static const struct command_def_s command_defs[] = {
const char *g_testcon_nodeid = NULL;
ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -87,7 +91,7 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *dat
return KS_FALSE;
}
ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -111,7 +115,7 @@ ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
return KS_FALSE;
}
ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -135,7 +139,7 @@ ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data
return KS_FALSE;
}
ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -159,7 +163,35 @@ ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
return KS_FALSE;
}
ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t test_subscribe_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
ks_assert(brpcres);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
res = blade_rpc_response_message_get(brpcres);
ks_assert(res);
res_result = cJSON_GetObjectItem(res, "result");
ks_assert(res_result);
ks_log(KS_LOG_DEBUG, "Session (%s) blade.subscribe response processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
return KS_FALSE;
}
ks_bool_t test_channel_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -177,7 +209,7 @@ ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
params = blade_rpcbroadcast_request_params_get(brpcreq);
ks_assert(params);
ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast processing\n", blade_session_id_get(bs));
ks_log(KS_LOG_DEBUG, "Session (%s) test channel event processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
@ -327,7 +359,6 @@ void command_locate(blade_handle_t *bh, char *args)
void command_join(blade_handle_t *bh, char *args)
{
cJSON *params = NULL;
cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
@ -337,14 +368,38 @@ void command_join(blade_handle_t *bh, char *args)
return;
}
params = cJSON_CreateObject();
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", "mydomain.com", params, test_join_response_handler, NULL);
cJSON_Delete(params);
}
void command_subscribe(blade_handle_t *bh, char *args)
{
cJSON *channels = NULL;
if (!g_testcon_nodeid) {
ks_log(KS_LOG_DEBUG, "Protocol controller has not been located\n");
return;
}
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_broadcast_handler, NULL);
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_channel_handler, NULL);
cJSON_Delete(channels);
}
void command_unsubscribe(blade_handle_t *bh, char *args)
{
cJSON *channels = NULL;
if (!g_testcon_nodeid) {
ks_log(KS_LOG_DEBUG, "Protocol controller has not been located\n");
return;
}
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL);
cJSON_Delete(channels);
}
@ -364,11 +419,6 @@ void command_leave(blade_handle_t *bh, char *args)
params = cJSON_CreateObject();
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", "mydomain.com", params, test_leave_response_handler, NULL);
cJSON_Delete(params);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, NULL, NULL, NULL, NULL);
cJSON_Delete(channels);
}
void command_talk(blade_handle_t *bh, char *args)

View File

@ -88,7 +88,7 @@ ks_status_t testproto_destroy(testproto_t **testP)
return KS_STATUS_SUCCESS;
}
ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *data)
{
//testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -97,7 +97,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *da
ks_assert(brpcres);
ks_assert(data);
//test = (testproto_t *)cJSON_GetPtrValue(data);
//test = (testproto_t *)data;
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
@ -112,7 +112,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *da
return KS_FALSE;
}
ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -126,7 +126,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_assert(brpcreq);
ks_assert(data);
test = (testproto_t *)cJSON_GetPtrValue(data);
test = (testproto_t *)data;
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
@ -183,7 +183,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
return KS_FALSE;
}
ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -191,12 +191,13 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
const char *requester_nodeid = NULL;
//const char *key = NULL;
cJSON *params = NULL;
cJSON *channels = NULL;
cJSON *result = NULL;
ks_assert(brpcreq);
ks_assert(data);
test = (testproto_t *)cJSON_GetPtrValue(data);
test = (testproto_t *)data;
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
@ -216,22 +217,36 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_hash_remove(test->participants, (void *)requester_nodeid);
ks_hash_write_unlock(test->participants);
blade_session_read_unlock(bs);
// deauthorize channels with the master for the requester
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcauthorize(bh, requester_nodeid, KS_TRUE, "test", "mydomain.com", channels, NULL, NULL);
cJSON_Delete(channels);
// send rpcexecute response to the requester
result = cJSON_CreateObject();
blade_rpcexecute_response_send(brpcreq, result);
cJSON_Delete(result);
blade_session_read_unlock(bs);
// broadcast to authorized nodes that have subscribed, that the requester has left
params = cJSON_CreateObject();
cJSON_AddStringToObject(params, "leaver-nodeid", requester_nodeid);
blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "leave", params, NULL, NULL);
cJSON_Delete(params);
return KS_FALSE;
}
ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
//testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -244,7 +259,7 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_assert(brpcreq);
ks_assert(data);
//test = (testproto_t *)cJSON_GetPtrValue(data);
//test = (testproto_t *)data;
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
@ -263,12 +278,16 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
ks_log(KS_LOG_DEBUG, "Session (%s) test.talk (%s) request processing\n", blade_session_id_get(bs), requester_nodeid);
blade_session_read_unlock(bs);
// send rpcexecute response to the requester
result = cJSON_CreateObject();
blade_rpcexecute_response_send(brpcreq, result);
cJSON_Delete(result);
blade_session_read_unlock(bs);
// broadcast to authorized nodes that have subscribed, that the requester has said something
params = cJSON_CreateObject();
cJSON_AddStringToObject(params, "text", text);
@ -277,6 +296,8 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "talk", params, NULL, NULL);
cJSON_Delete(params);
return KS_FALSE;
}
@ -345,19 +366,21 @@ int main(int argc, char **argv)
// @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
ks_sleep_ms(3000);
blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)test);
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, cJSON_CreatePtr((uintptr_t)test));
blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, (void *)test);
cJSON_Delete(channels);
}
}