From eb08eef0eaac6e292e475952b23e1c35ac5683bf Mon Sep 17 00:00:00 2001 From: Luis Azedo <luis@2600hz.com> Date: Mon, 5 Sep 2016 10:52:39 +0000 Subject: [PATCH] FS-9480 [mod_kazoo] add api enhancements --- src/mod/event_handlers/mod_kazoo/kazoo_node.c | 260 ++++++++++++++++-- .../event_handlers/mod_kazoo/kazoo_utils.c | 66 ++++- src/mod/event_handlers/mod_kazoo/mod_kazoo.h | 10 +- 3 files changed, 308 insertions(+), 28 deletions(-) diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_node.c b/src/mod/event_handlers/mod_kazoo/kazoo_node.c index 1efe7e3e82..ac3a42ee3a 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_node.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_node.c @@ -59,7 +59,10 @@ static char *REQUEST_ATOMS[] = { "bgapi", "api", "event", - "fetch_reply" + "fetch_reply", + "config", + "bgapi4", + "api4" }; typedef enum { @@ -76,6 +79,9 @@ typedef enum { REQUEST_API, REQUEST_EVENT, REQUEST_FETCH_REPLY, + REQUEST_CONFIG, + REQUEST_BGAPI4, + REQUEST_API4, REQUEST_MAX } request_atoms_t; @@ -181,35 +187,109 @@ static switch_status_t remove_from_ei_nodes(ei_node_t *this_ei_node) { return SWITCH_STATUS_SUCCESS; } +void kazoo_event_add_unique_header_string(switch_event_t *event, switch_stack_t stack, const char *header_name, const char *data) { + if(!switch_event_get_header_ptr(event, header_name)) + switch_event_add_header_string(event, stack, header_name, data); +} + + +SWITCH_DECLARE(switch_status_t) kazoo_api_execute(const char *cmd, const char *arg, switch_core_session_t *session, switch_stream_handle_t *stream, char** reply) +{ + switch_api_interface_t *api; + switch_status_t status; + char *arg_used; + char *cmd_used; + int fire_event = 0; + + switch_assert(stream != NULL); + switch_assert(stream->data != NULL); + switch_assert(stream->write_function != NULL); + + cmd_used = (char *) cmd; + arg_used = (char *) arg; + + + if (!stream->param_event) { + switch_event_create(&stream->param_event, SWITCH_EVENT_API); + fire_event = 1; + } + + if (stream->param_event) { + if (cmd_used && *cmd_used) { + switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command", cmd_used); + } + if (arg_used && *arg_used) { + switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", arg_used); + } + } + + + if (cmd_used && (api = switch_loadable_module_get_api_interface(cmd_used)) != 0) { + if ((status = api->function(arg_used, session, stream)) != SWITCH_STATUS_SUCCESS) { + kazoo_event_add_unique_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Result", "error"); + kazoo_event_add_unique_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Error", stream->data); + } else { + kazoo_event_add_unique_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Result", "success"); + kazoo_event_add_unique_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Output", stream->data); + } + UNPROTECT_INTERFACE(api); + } else { + status = SWITCH_STATUS_FALSE; + kazoo_event_add_unique_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Result", "error"); + kazoo_event_add_unique_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Error", "invalid command"); + } + + if (stream->param_event && fire_event) { + switch_event_fire(&stream->param_event); + } + + if (cmd_used != cmd) { + switch_safe_free(cmd_used); + } + + if (arg_used != arg) { + switch_safe_free(arg_used); + } + + return status; +} + +static switch_status_t api_exec_stream(char *cmd, char *arg, switch_stream_handle_t *stream, char **reply) { + switch_status_t status = SWITCH_STATUS_FALSE; + + if (kazoo_api_execute(cmd, arg, NULL, stream, reply) != SWITCH_STATUS_SUCCESS) { + if(stream->data && strlen(stream->data)) { + *reply = strdup(stream->data); + status = SWITCH_STATUS_FALSE; + } else { + *reply = switch_mprintf("%s: Command not found", cmd); + status = SWITCH_STATUS_NOTFOUND; + } + } else if (!stream->data || !strlen(stream->data)) { + *reply = switch_mprintf("%s: Command returned no output", cmd); + status = SWITCH_STATUS_FALSE; + } else { + *reply = strdup(stream->data); + status = SWITCH_STATUS_SUCCESS; + } + + return status; +} + static switch_status_t api_exec(char *cmd, char *arg, char **reply) { switch_stream_handle_t stream = { 0 }; switch_status_t status = SWITCH_STATUS_FALSE; SWITCH_STANDARD_STREAM(stream); - if (switch_api_execute(cmd, arg, NULL, &stream) != SWITCH_STATUS_SUCCESS) { - *reply = switch_mprintf("%s: Command not found", cmd); - status = SWITCH_STATUS_NOTFOUND; - } else if (!stream.data || !strlen(stream.data)) { - *reply = switch_mprintf("%s: Command returned no output", cmd); - status = SWITCH_STATUS_FALSE; - } else { - *reply = strdup(stream.data); - status = SWITCH_STATUS_SUCCESS; - } - - /* if the reply starts with the char "-" (the start of -USAGE ...) */ - /* the args were missing or incorrect */ - if (**reply == '-') { - status = SWITCH_STATUS_FALSE; - } + status = api_exec_stream(cmd, arg, &stream, reply); switch_safe_free(stream.data); return status; } -static void *SWITCH_THREAD_FUNC bgapi_exec(switch_thread_t *thread, void *obj) { +static void *SWITCH_THREAD_FUNC bgapi3_exec(switch_thread_t *thread, void *obj) { api_command_struct_t *acs = (api_command_struct_t *) obj; switch_memory_pool_t *pool = acs->pool; char *reply = NULL; @@ -260,6 +340,71 @@ static void *SWITCH_THREAD_FUNC bgapi_exec(switch_thread_t *thread, void *obj) { return NULL; } + +static void *SWITCH_THREAD_FUNC bgapi4_exec(switch_thread_t *thread, void *obj) { + api_command_struct_t *acs = (api_command_struct_t *) obj; + switch_memory_pool_t *pool = acs->pool; + char *reply = NULL; + char *cmd = acs->cmd; + char *arg = acs->arg; + ei_node_t *ei_node = acs->ei_node; + ei_send_msg_t *send_msg; + switch_stream_handle_t stream = { 0 }; + + if(!switch_test_flag(ei_node, LFLAG_RUNNING) || !switch_test_flag(&globals, LFLAG_RUNNING)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Ignoring command while shuting down\n"); + switch_atomic_dec(&ei_node->pending_bgapi); + return NULL; + } + + SWITCH_STANDARD_STREAM(stream); + switch_event_create(&stream.param_event, SWITCH_EVENT_API); + + switch_malloc(send_msg, sizeof(*send_msg)); + memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid)); + + ei_x_new_with_version(&send_msg->buf); + + ei_x_encode_tuple_header(&send_msg->buf, (stream.param_event ? 4 : 3)); + + if (api_exec_stream(cmd, arg, &stream, &reply) == SWITCH_STATUS_SUCCESS) { + ei_x_encode_atom(&send_msg->buf, "bgok"); + } else { + ei_x_encode_atom(&send_msg->buf, "bgerror"); + } + + _ei_x_encode_string(&send_msg->buf, acs->uuid_str); + _ei_x_encode_string(&send_msg->buf, reply); + + if (stream.param_event) { + ei_encode_switch_event_headers(&send_msg->buf, stream.param_event); + } + + if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to send bgapi response %s to %s <%d.%d.%d>\n" + ,acs->uuid_str + ,acs->pid.node + ,acs->pid.creation + ,acs->pid.num + ,acs->pid.serial); + ei_x_free(&send_msg->buf); + switch_safe_free(send_msg); + } + + switch_atomic_dec(&ei_node->pending_bgapi); + + if (stream.param_event) { + switch_event_fire(&stream.param_event); + } + + switch_safe_free(reply); + switch_safe_free(acs->arg); + switch_core_destroy_memory_pool(&pool); + switch_safe_free(stream.data); + + return NULL; +} + static void log_sendmsg_request(char *uuid, switch_event_t *event) { char *cmd = switch_event_get_header(event, "call-command"); @@ -512,6 +657,12 @@ static switch_status_t handle_request_sendmsg(ei_node_t *ei_node, erlang_pid *pi return erlang_response_ok(rbuf); } +static switch_status_t handle_request_config(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { + + fetch_config_filters(); + return erlang_response_ok(rbuf); +} + static switch_status_t handle_request_bind(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { char section_str[MAXATOMLEN + 1]; switch_xml_section_t section; @@ -524,6 +675,8 @@ static switch_status_t handle_request_bind(ei_node_t *ei_node, erlang_pid *pid, switch(section) { case SWITCH_XML_SECTION_CONFIG: add_fetch_handler(ei_node, pid, globals.config_fetch_binding); + if(!globals.config_filters_fetched) + fetch_config_filters(); break; case SWITCH_XML_SECTION_DIRECTORY: add_fetch_handler(ei_node, pid, globals.directory_fetch_binding); @@ -564,7 +717,7 @@ static switch_status_t handle_request_version(ei_node_t *ei_node, erlang_pid *pi return SWITCH_STATUS_SUCCESS; } -static switch_status_t handle_request_bgapi(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { +static switch_status_t handle_request_bgapi(switch_thread_start_t func, ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { api_command_struct_t *acs = NULL; switch_memory_pool_t *pool; switch_thread_t *thread; @@ -597,7 +750,7 @@ static switch_status_t handle_request_bgapi(ei_node_t *ei_node, erlang_pid *pid, switch_uuid_get(&uuid); switch_uuid_format(acs->uuid_str, &uuid); - switch_thread_create(&thread, thd_attr, bgapi_exec, acs, acs->pool); + switch_thread_create(&thread, thd_attr, func, acs, acs->pool); switch_atomic_inc(&ei_node->pending_bgapi); @@ -610,6 +763,65 @@ static switch_status_t handle_request_bgapi(ei_node_t *ei_node, erlang_pid *pid, return SWITCH_STATUS_SUCCESS; } +static switch_status_t handle_request_bgapi3(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { + return handle_request_bgapi(bgapi3_exec, ei_node, pid, buf, rbuf); +} + +static switch_status_t handle_request_bgapi4(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { + return handle_request_bgapi(bgapi4_exec, ei_node, pid, buf, rbuf); +} + +static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { + char cmd[MAXATOMLEN + 1]; + char *arg; + switch_stream_handle_t stream = { 0 }; + + SWITCH_STANDARD_STREAM(stream); + switch_event_create(&stream.param_event, SWITCH_EVENT_API); + + if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) { + return erlang_response_badarg(rbuf); + } + + if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) { + return erlang_response_badarg(rbuf); + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg); + + if (rbuf) { + char *reply; + switch_status_t status; + + status = api_exec_stream(cmd, arg, &stream, &reply); + + if (status == SWITCH_STATUS_SUCCESS) { + ei_x_encode_tuple_header(buf, 2); + ei_x_encode_atom(rbuf, "ok"); + } else { + ei_x_encode_tuple_header(buf, (stream.param_event ? 3 : 2)); + ei_x_encode_atom(rbuf, "error"); + } + + _ei_x_encode_string(rbuf, reply); + + if (stream.param_event && status != SWITCH_STATUS_SUCCESS) { + ei_encode_switch_event_headers(rbuf, stream.param_event); + } + + switch_safe_free(reply); + } + + if (stream.param_event) { + switch_event_fire(&stream.param_event); + } + + switch_safe_free(arg); + switch_safe_free(stream.data); + + return SWITCH_STATUS_SUCCESS; +} + static switch_status_t handle_request_api(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { char cmd[MAXATOMLEN + 1]; char *arg; @@ -805,13 +1017,19 @@ static switch_status_t handle_kazoo_request(ei_node_t *ei_node, erlang_pid *pid, case REQUEST_VERSION: return handle_request_version(ei_node, pid, buf, rbuf); case REQUEST_BGAPI: - return handle_request_bgapi(ei_node, pid, buf, rbuf); + return handle_request_bgapi3(ei_node, pid, buf, rbuf); case REQUEST_API: return handle_request_api(ei_node, pid, buf, rbuf); case REQUEST_EVENT: return handle_request_event(ei_node, pid, buf, rbuf); case REQUEST_FETCH_REPLY: return handle_request_fetch_reply(ei_node, pid, buf, rbuf); + case REQUEST_CONFIG: + return handle_request_config(ei_node, pid, buf, rbuf); + case REQUEST_BGAPI4: + return handle_request_bgapi4(ei_node, pid, buf, rbuf); + case REQUEST_API4: + return handle_request_api4(ei_node, pid, buf, rbuf); default: return erlang_response_notimplemented(rbuf); } diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_utils.c b/src/mod/event_handlers/mod_kazoo/kazoo_utils.c index 127f849cdc..962e87f6f1 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_utils.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_utils.c @@ -32,12 +32,6 @@ * ei_helpers.c -- helper functions for ei * */ -#include <switch.h> -#include <ei.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <arpa/nameser.h> -#include <resolv.h> #include "mod_kazoo.h" /* Stolen from code added to ei in R12B-5. @@ -634,6 +628,66 @@ switch_hash_t *create_default_filter() { return filter; } +static void *SWITCH_THREAD_FUNC fetch_config_filters_exec(switch_thread_t *thread, void *obj) +{ + char *cf = "kazoo.conf"; + switch_xml_t cfg, xml, child, param; + switch_event_t *params; + switch_memory_pool_t *pool = (switch_memory_pool_t *)obj; + + switch_event_create(¶ms, SWITCH_EVENT_REQUEST_PARAMS); + switch_event_add_header_string(params, SWITCH_STACK_BOTTOM, "Action", "request-filter"); + + if (!(xml = switch_xml_open_cfg(cf, &cfg, params))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to open configuration file %s\n", cf); + } else if ((child = switch_xml_child(cfg, "event-filter"))) { + switch_hash_t *filter; + switch_hash_t *old_filter; + + switch_core_hash_init(&filter); + for (param = switch_xml_child(child, "header"); param; param = param->next) { + char *var = (char *) switch_xml_attr_soft(param, "name"); + switch_core_hash_insert(filter, var, "1"); + } + + old_filter = globals.event_filter; + globals.config_filters_fetched = 1; + globals.event_filter = filter; + if (old_filter) { + switch_core_hash_destroy(&old_filter); + } + + switch_xml_free(xml); + } + + if (params) switch_event_destroy(¶ms); + switch_core_destroy_memory_pool(&pool); + + return NULL; +} + +void fetch_config_filters() { + switch_memory_pool_t *pool; + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + switch_uuid_t uuid; + + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "fetching kazoo filters\n"); + + switch_core_new_memory_pool(&pool); + + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + + switch_uuid_get(&uuid); + switch_thread_create(&thread, thd_attr, fetch_config_filters_exec, pool, pool); + +} + + + /* For Emacs: * Local Variables: * mode:c diff --git a/src/mod/event_handlers/mod_kazoo/mod_kazoo.h b/src/mod/event_handlers/mod_kazoo/mod_kazoo.h index da27ed9dc4..5200255bbd 100644 --- a/src/mod/event_handlers/mod_kazoo/mod_kazoo.h +++ b/src/mod/event_handlers/mod_kazoo/mod_kazoo.h @@ -1,12 +1,17 @@ #include <switch.h> +#include <switch_event.h> #include <ei.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/nameser.h> +#include <resolv.h> #define MAX_ACL 100 #define CMD_BUFLEN 1024 * 1000 #define MAX_QUEUE_LEN 25000 #define MAX_MISSED 500 #define MAX_PID_CHARS 255 -#define VERSION "mod_kazoo v1.2.10-14" +#define VERSION "mod_kazoo v1.3.0-1" #define API_COMMAND_DISCONNECT 0 #define API_COMMAND_REMOTE_IP 1 @@ -112,6 +117,7 @@ struct globals_s { int send_msg_batch; short event_stream_framing; switch_port_t port; + int config_filters_fetched; }; typedef struct globals_s globals_t; extern globals_t globals; @@ -162,6 +168,8 @@ void add_kz_dptools(switch_loadable_module_interface_t **module_interface, switc #define _ei_x_encode_string(buf, string) { ei_x_encode_binary(buf, string, strlen(string)); } +void fetch_config_filters(); + /* For Emacs: * Local Variables: * mode:c