FS-10167: Added preliminary support for blade.register, currently it is only used to update routes upstream, when a new nodeid is introduced or no longer available a blade.register is passed to update the routing tables. Edge cases are not handled yet.

This commit is contained in:
Shane Bryldt 2017-06-06 15:50:38 -06:00
parent 50caeda5fe
commit 6795fd2e45
4 changed files with 120 additions and 112 deletions

View File

@ -257,25 +257,25 @@ KS_DECLARE(ks_hash_t *) blade_session_realms_get(blade_session_t *bs)
return bs->realms;
}
KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *identity)
KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid)
{
char *key = NULL;
ks_assert(bs);
ks_assert(identity);
ks_assert(nodeid);
key = ks_pstrdup(bs->pool, identity);
key = ks_pstrdup(bs->pool, nodeid);
ks_hash_insert(bs->routes, (void *)key, (void *)KS_TRUE);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *identity)
KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *nodeid)
{
ks_assert(bs);
ks_assert(identity);
ks_assert(nodeid);
ks_hash_remove(bs->routes, (void *)identity);
ks_hash_remove(bs->routes, (void *)nodeid);
return KS_STATUS_SUCCESS;
}
@ -774,12 +774,11 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
blade_handle_requests_remove(brpcreq);
callback = blade_rpc_request_callback_get(brpcreq);
ks_assert(callback);
blade_rpc_response_create(&brpcres, bs->handle, bs->pool, bs->id, brpcreq, json);
ks_assert(brpcres);
disconnect = callback(brpcres, blade_rpc_request_callback_data_get(brpcreq));
if (callback) disconnect = callback(brpcres, blade_rpc_request_callback_data_get(brpcreq));
blade_rpc_response_destroy(&brpcres);
}

View File

@ -87,6 +87,7 @@ struct blade_handle_s {
};
ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
@ -340,6 +341,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
}
// register internal core rpcs for blade.xxx
blade_rpc_create(&brpc, bh, "blade.register", NULL, NULL, blade_protocol_register_request_handler, NULL);
blade_handle_corerpc_register(brpc);
blade_rpc_create(&brpc, bh, "blade.publish", NULL, NULL, blade_protocol_publish_request_handler, NULL);
blade_handle_corerpc_register(brpc);
@ -448,7 +452,7 @@ KS_DECLARE(ks_status_t) blade_handle_local_nodeid_set(blade_handle_t *bh, const
goto done;
}
if (bh->master_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid);
if (bh->local_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid);
if (nodeid) bh->local_nodeid = ks_pstrdup(bh->pool, nodeid);
ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid);
@ -563,8 +567,7 @@ KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *n
ks_log(KS_LOG_DEBUG, "Route Added: %s through %s\n", key, value);
// @todo when a route is added, upstream needs to be notified that the identity can be found through the session to the
// upstream router, and likewise up the chain to the Master Router Node, to create a complete route from anywhere else
blade_protocol_register(bh, nodeid, KS_FALSE, NULL, NULL);
return KS_STATUS_SUCCESS;
}
@ -579,10 +582,7 @@ KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char
ks_hash_remove(bh->routes, (void *)nodeid);
// @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through
// this node so the routes leading here need to be cleared by passing a "blade.register" upstream to remove the routes, this
// should actually happen only for local sessions, and blade.register should be always passed upstream AND processed locally, so
// we don't want to duplicate blade.register calls already being passed up if this route is not a local session
blade_protocol_register(bh, nodeid, KS_TRUE, NULL, NULL);
// @note everything below here is for master-only cleanup when a node is no longer routable
@ -1038,7 +1038,7 @@ KS_DECLARE(blade_session_t *) blade_handle_sessions_upstream(blade_handle_t *bh)
ks_assert(bh);
ks_rwl_read_lock(bh->local_nodeid_rwl);
bs = blade_handle_sessions_lookup(bh, bh->local_nodeid);
if (bh->local_nodeid) bs = blade_handle_sessions_lookup(bh, bh->local_nodeid);
ks_rwl_read_unlock(bh->local_nodeid_rwl);
return bs;
@ -1126,6 +1126,107 @@ KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *b
// @todo all higher level errors should be handled by each of the calls internally so that a normal result response can be sent with an error block inside the result
// which is important for implementation of blade.execute where errors can be relayed back to the requester properly
// blade.register request generator
KS_DECLARE(ks_status_t) blade_protocol_register(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_assert(bh);
ks_assert(nodeid);
if (!(bs = blade_handle_sessions_upstream(bh))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
pool = blade_handle_pool_get(bh);
ks_assert(pool);
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.register");
// fill in the req_params
cJSON_AddStringToObject(req_params, "nodeid", nodeid);
if (remove) cJSON_AddTrueToObject(req_params, "remove");
ks_log(KS_LOG_DEBUG, "Session (%s) register request (%s %s) started\n", blade_session_id_get(bs), remove ? "removing" : "adding", nodeid);
ret = blade_session_send(bs, req, callback, data);
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
return ret;
}
// blade.register request handler
ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_nodeid = NULL;
cJSON *req_params_remove = NULL;
ks_bool_t remove = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
req = blade_rpc_request_message_get(brpcreq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (!req_params) {
ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'params' object\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_nodeid = cJSON_GetObjectCstr(req_params, "nodeid");
if (!req_params_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'nodeid'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_remove = cJSON_GetObjectItem(req_params, "remove");
remove = req_params_remove && req_params_remove->type == cJSON_True;
ks_log(KS_LOG_DEBUG, "Session (%s) register request (%s %s) processing\n", blade_session_id_get(bs), remove ? "removing" : "adding", req_params_nodeid);
if (remove) {
blade_session_route_remove(bs, req_params_nodeid);
blade_handle_route_remove(bh, req_params_nodeid);
} else {
blade_session_route_add(bs, req_params_nodeid);
blade_handle_route_add(bh, req_params_nodeid, blade_session_id_get(bs));
}
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
blade_session_send(bs, res, NULL, NULL);
done:
if (res) cJSON_Delete(res);
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
// blade.publish request generator
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
{
@ -1167,10 +1268,6 @@ KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *n
ret = blade_session_send(bs, req, callback, data);
// @todo upon return, a provider should register the methods for this protocol to be locally available
// prior to receiving a response, if the response is an error then unregister, but if it is successful
// then the node is already primed to receive any immediate requests
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
@ -1724,96 +1821,6 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
blade_session_read_unlock(bs);
}
// blade.locate response handler
//ks_bool_t blade_protocol_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
//{
// ks_bool_t ret = KS_FALSE;
// blade_handle_t *bh = NULL;
// blade_session_t *bs = NULL;
// blade_rpc_response_callback_wrapper_t *wrapper = NULL;
// blade_rpc_response_callback_t callback = NULL;
// cJSON *res = NULL;
// cJSON *res_error = NULL;
// cJSON *res_result = NULL;
// cJSON *res_object = NULL;
// //cJSON *res_result_providers = NULL;
// const char *requester_nodeid = NULL;
// const char *responder_nodeid = NULL;
// //const char *res_result_protocol = NULL;
// //const char *res_result_realm = NULL;
//
// ks_assert(brpcres);
// ks_assert(data);
//
// wrapper = (blade_rpc_response_callback_wrapper_t *)data;
// callback = wrapper->callback;
// data = wrapper->data;
// ks_pool_free(wrapper->pool, &wrapper);
//
// bh = blade_rpc_response_handle_get(brpcres);
// ks_assert(bh);
//
// bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres));
// ks_assert(bs);
//
// res = blade_rpc_response_message_get(brpcres);
// ks_assert(res);
//
// res_error = cJSON_GetObjectItem(res, "error");
// res_result = cJSON_GetObjectItem(res, "result");
//
// if (!res_error && !res_result) {
// ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs));
// goto done;
// }
// res_object = res_error ? res_error : res_result;
//
// requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid");
// responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid");
//
// ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs));
//
// if (callback) ret = callback(brpcres, data);
//
// //if (res_error) {
// // // @todo process error response
// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs));
// // goto done;
// //}
//
// // process result response
//
// //res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
// //if (!res_result_protocol) {
// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs));
// // goto done;
// //}
//
// //res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
// //if (!res_result_realm) {
// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs));
// // goto done;
// //}
//
// //res_result_providers = cJSON_GetObjectItem(res_result, "providers");
// //if (!res_result_providers) {
// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs));
// // goto done;
// //}
//
// //for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) {
// // cJSON *elem = cJSON_GetArrayItem(res_result_providers, index);
// // if (elem->type == cJSON_String) {
// // ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring);
// // }
// //}
//
//done:
// if (bs) blade_session_read_unlock(bs);
//
// return ret;
//}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -46,8 +46,8 @@ KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_realm_add(blade_session_t *bs, const char *realm);
KS_DECLARE(ks_status_t) blade_session_realm_remove(blade_session_t *bs, const char *realm);
KS_DECLARE(ks_hash_t *) blade_session_realms_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *identity);
KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *identity);
KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid);
KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *nodeid);
KS_DECLARE(cJSON *) blade_session_properties_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block);
KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs);

View File

@ -95,6 +95,8 @@ KS_DECLARE(ks_status_t) blade_handle_session_state_callback_register(blade_handl
KS_DECLARE(ks_status_t) blade_handle_session_state_callback_unregister(blade_handle_t *bh, const char *id);
KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *bs, blade_session_state_condition_t condition);
KS_DECLARE(ks_status_t) blade_protocol_register(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);