[mod_kazoo] Fix potential memory leaks

* fixes several potential memory leaks
* formatting
* fixes parameter name
* logs unsupported parameters
* fixes default event profile
* fixes unconfigured bindings
This commit is contained in:
lazedo 2020-01-20 16:12:05 +00:00 committed by Andrey Volk
parent e3b1fb91c8
commit 93fdf2d9d7
7 changed files with 159 additions and 77 deletions

View File

@ -419,6 +419,7 @@ SWITCH_STANDARD_API(kz_http_put)
if (fstat(fd, &file_info) == -1) { if (fstat(fd, &file_info) == -1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fstat() error: %s\n", strerror(errno)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fstat() error: %s\n", strerror(errno));
stream->write_function(stream, "-ERR fstat error\n"); stream->write_function(stream, "-ERR fstat error\n");
close(fd);
goto done; goto done;
} }
close(fd); close(fd);

View File

@ -158,7 +158,7 @@ struct kz_globals_s {
switch_hash_t *event_filter; switch_hash_t *event_filter;
int epmdfd; int epmdfd;
int num_worker_threads; int node_worker_threads;
switch_bool_t nat_map; switch_bool_t nat_map;
switch_bool_t ei_shortname; switch_bool_t ei_shortname;
int ei_compat_rel; int ei_compat_rel;
@ -233,6 +233,8 @@ int ei_decode_string_or_binary_limited(char *buf, int *index, int maxsize, char
int ei_decode_string_or_binary(char *buf, int *index, char **dst); int ei_decode_string_or_binary(char *buf, int *index, char **dst);
switch_status_t create_acceptor(); switch_status_t create_acceptor();
switch_hash_t *create_default_filter(); switch_hash_t *create_default_filter();
void kz_erl_init();
void kz_erl_shutdown();
void fetch_config(); void fetch_config();

View File

@ -205,9 +205,9 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
} else if (!strcmp(var, "io-fault-tolerance")) { } else if (!strcmp(var, "io-fault-tolerance")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val);
kazoo_globals.io_fault_tolerance = atoi(val); kazoo_globals.io_fault_tolerance = atoi(val);
} else if (!strcmp(var, "num-worker-threads")) { } else if (!strcmp(var, "node-worker-threads")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set num-worker-threads: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set node-worker-threads: %s\n", val);
kazoo_globals.num_worker_threads = atoi(val); kazoo_globals.node_worker_threads = atoi(val);
} else if (!strcmp(var, "json-term-encoding")) { } else if (!strcmp(var, "json-term-encoding")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set json-term-encoding: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set json-term-encoding: %s\n", val);
if(!strcmp(val, "map")) { if(!strcmp(val, "map")) {
@ -219,6 +219,8 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
} else if (!strcmp(var, "expand-headers-on-fetch")) { } else if (!strcmp(var, "expand-headers-on-fetch")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set expand-headers-on-fetch: %s\n", val);
kazoo_globals.expand_headers_on_fetch = switch_true(val); kazoo_globals.expand_headers_on_fetch = switch_true(val);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val);
} }
} }
} }
@ -319,9 +321,9 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
kazoo_globals.profile_vars_prefixes[i] = switch_core_strdup(kazoo_globals.pool, sep_array[i]); kazoo_globals.profile_vars_prefixes[i] = switch_core_strdup(kazoo_globals.pool, sep_array[i]);
} }
if (!kazoo_globals.num_worker_threads) { if (!kazoo_globals.node_worker_threads) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Number of worker threads not found in configuration, using default\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Number of node worker threads not found in configuration, using default\n");
kazoo_globals.num_worker_threads = 10; kazoo_globals.node_worker_threads = 10;
} }
if (zstr(kazoo_globals.ip)) { if (zstr(kazoo_globals.ip)) {
@ -422,12 +424,9 @@ switch_status_t kazoo_config_handlers(switch_xml_t cfg)
if(events == NULL) { if(events == NULL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get default handler for events\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get default handler for events\n");
if(kazoo_globals.event_handlers != event_handlers) destroy_config(&event_handlers); destroy_config(&event_handlers);
if(kazoo_globals.fetch_handlers != fetch_handlers) destroy_config(&fetch_handlers); event_handlers = kazoo_config_event_handlers(definitions, def);
if(kazoo_globals.definitions != definitions) destroy_config(&definitions); events = (kazoo_event_profile_ptr) switch_core_hash_find(event_handlers->hash, "default");
switch_xml_free(def);
switch_safe_free(xml);
return SWITCH_STATUS_GENERR;
} }
if(kazoo_globals.events != events) { if(kazoo_globals.events != events) {

View File

@ -514,10 +514,6 @@ switch_status_t create_acceptor() {
char ipbuf[48]; char ipbuf[48];
const char *ip_addr; const char *ip_addr;
#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11
ei_init();
#endif
/* if the config has specified an erlang release compatibility then pass that along to the erlang interface */ /* if the config has specified an erlang release compatibility then pass that along to the erlang interface */
if (kazoo_globals.ei_compat_rel) { if (kazoo_globals.ei_compat_rel) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Compatability with OTP R%d requested\n", kazoo_globals.ei_compat_rel); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Compatability with OTP R%d requested\n", kazoo_globals.ei_compat_rel);
@ -882,6 +878,7 @@ static void fetch_config_filters(switch_memory_pool_t *pool)
kazoo_globals.config_fetched = 1; kazoo_globals.config_fetched = 1;
switch_xml_free(xml); switch_xml_free(xml);
} }
switch_event_destroy(&params);
} }
@ -901,6 +898,7 @@ static void fetch_config_handlers(switch_memory_pool_t *pool)
kazoo_globals.config_fetched = 1; kazoo_globals.config_fetched = 1;
switch_xml_free(xml); switch_xml_free(xml);
} }
switch_event_destroy(&params);
} }
@ -935,6 +933,60 @@ void fetch_config() {
} }
#ifdef WITH_KAZOO_ERL_SHUTDOWN
#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11
typedef struct ei_mutex_s {
#ifdef __WIN32__
HANDLE lock;
#elif VXWORKS
SEM_ID lock;
#else /* unix */
#if defined(HAVE_MIT_PTHREAD_H) || defined(HAVE_PTHREAD_H)
pthread_mutex_t *lock;
#else /* ! (HAVE_MIT_PTHREAD_H || HAVE_PTHREAD_H) */
void *dummy; /* Actually never used */
#endif /* ! (HAVE_MIT_PTHREAD_H || HAVE_PTHREAD_H) */
#endif /* unix */
} ei_mutex_t;
typedef struct ei_socket_info_s {
int socket;
ei_socket_callbacks *cbs;
void *ctx;
int dist_version;
ei_cnode cnode; /* A copy, not a pointer. We don't know when freed */
char cookie[EI_MAX_COOKIE_SIZE+1];
} ei_socket_info;
extern ei_socket_info *ei_sockets;
extern ei_mutex_t* ei_sockets_lock;
extern int ei_n_sockets;
extern int ei_sz_sockets;
int ei_mutex_free(ei_mutex_t *l, int nblock);
#endif
#endif
void kz_erl_init()
{
#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11
ei_init();
#endif
}
void kz_erl_shutdown()
{
#ifdef WITH_KAZOO_ERL_SHUTDOWN
#if (ERLANG_MAJOR == 10 && ERLANG_MINOR >= 3) || ERLANG_MAJOR >= 11
ei_mutex_free(ei_sockets_lock, 1);
ei_sockets_lock = NULL;
free(ei_sockets);
ei_sockets = NULL;
ei_n_sockets = ei_sz_sockets = 0;
#endif
#endif
}
SWITCH_MODULE_RUNTIME_FUNCTION(mod_kazoo_runtime) { SWITCH_MODULE_RUNTIME_FUNCTION(mod_kazoo_runtime) {
switch_os_socket_t os_socket; switch_os_socket_t os_socket;

View File

@ -309,8 +309,9 @@ static switch_xml_t fetch_handler(const char *section, const char *tag_name, con
,reply.uuid_str ,reply.uuid_str
,(unsigned int) (switch_micro_time_now() - now) / 1000 ,(unsigned int) (switch_micro_time_now() - now) / 1000
,reply.xml_str); ,reply.xml_str);
if ((xml = switch_xml_parse_str_dynamic(reply.xml_str, SWITCH_FALSE)) == NULL) {
xml = switch_xml_parse_str_dynamic(reply.xml_str, SWITCH_FALSE); switch_safe_free(reply.xml_str);
}
} else { } else {
/* facepalm */ /* facepalm */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Request for %s XML (%s) timed-out after %dms\n" switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Request for %s XML (%s) timed-out after %dms\n"
@ -327,6 +328,8 @@ void bind_fetch_profile(ei_xml_agent_t *agent, kazoo_config_ptr fetch_handlers)
switch_hash_index_t *hi; switch_hash_index_t *hi;
kazoo_fetch_profile_ptr val = NULL, ptr = NULL; kazoo_fetch_profile_ptr val = NULL, ptr = NULL;
if (!fetch_handlers) return;
for (hi = switch_core_hash_first(fetch_handlers->hash); hi; hi = switch_core_hash_next(&hi)) { for (hi = switch_core_hash_first(fetch_handlers->hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, (void**) &val); switch_core_hash_this(hi, NULL, NULL, (void**) &val);
if (val && val->section == agent->section) { if (val && val->section == agent->section) {
@ -335,6 +338,7 @@ void bind_fetch_profile(ei_xml_agent_t *agent, kazoo_config_ptr fetch_handlers)
} }
} }
agent->profile = ptr; agent->profile = ptr;
switch_safe_free(hi);
} }
void rebind_fetch_profiles(kazoo_config_ptr fetch_handlers) void rebind_fetch_profiles(kazoo_config_ptr fetch_handlers)

View File

@ -106,6 +106,7 @@ static switch_status_t find_request(char *atom, int *request) {
static void destroy_node_handler(ei_node_t *ei_node) { static void destroy_node_handler(ei_node_t *ei_node) {
int pending = 0; int pending = 0;
void *pop; void *pop;
switch_memory_pool_t *pool = ei_node->pool;
switch_clear_flag(ei_node, LFLAG_RUNNING); switch_clear_flag(ei_node, LFLAG_RUNNING);
@ -145,7 +146,7 @@ static void destroy_node_handler(ei_node_t *ei_node) {
switch_mutex_destroy(ei_node->event_streams_mutex); switch_mutex_destroy(ei_node->event_streams_mutex);
switch_core_destroy_memory_pool(&ei_node->pool); switch_core_destroy_memory_pool(&pool);
} }
static switch_status_t add_to_ei_nodes(ei_node_t *this_ei_node) { static switch_status_t add_to_ei_nodes(ei_node_t *this_ei_node) {
@ -319,15 +320,15 @@ static void *SWITCH_THREAD_FUNC bgapi3_exec(switch_thread_t *thread, void *obj)
ei_node_t *ei_node = acs->ei_node; ei_node_t *ei_node = acs->ei_node;
ei_send_msg_t *send_msg; ei_send_msg_t *send_msg;
switch_malloc(send_msg, sizeof(*send_msg));
memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid));
if(!switch_test_flag(ei_node, LFLAG_RUNNING) || !switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { if(!switch_test_flag(ei_node, LFLAG_RUNNING) || !switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Ignoring command while shuting down\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Ignoring command while shuting down\n");
switch_atomic_dec(&ei_node->pending_bgapi); switch_atomic_dec(&ei_node->pending_bgapi);
return NULL; return NULL;
} }
switch_malloc(send_msg, sizeof(*send_msg));
memcpy(&send_msg->pid, &acs->pid, sizeof(erlang_pid));
ei_x_new_with_version(&send_msg->buf); ei_x_new_with_version(&send_msg->buf);
ei_x_encode_tuple_header(&send_msg->buf, 3); ei_x_encode_tuple_header(&send_msg->buf, 3);
@ -464,6 +465,7 @@ static void log_sendmsg_request(char *uuid, switch_event_t *event)
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|building xferext extension: %s %s\n", uuid, app_name, app_arg); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|building xferext extension: %s %s\n", uuid, app_name, app_arg);
switch_safe_free(app_name);
} }
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|transfered call to xferext extension\n", uuid); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "log|%s|transfered call to xferext extension\n", uuid);
@ -692,20 +694,22 @@ static switch_status_t handle_request_command(ei_node_t *ei_node, erlang_pid *pi
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
} }
if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
return erlang_response_baduuid(rbuf);
}
switch_uuid_get(&cmd_uuid); switch_uuid_get(&cmd_uuid);
switch_uuid_format(cmd_uuid_str, &cmd_uuid); switch_uuid_format(cmd_uuid_str, &cmd_uuid);
switch_event_create(&event, SWITCH_EVENT_COMMAND); switch_event_create(&event, SWITCH_EVENT_COMMAND);
if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) { if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) {
switch_core_session_rwunlock(session);
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
} }
log_sendmsg_request(uuid_str, event); log_sendmsg_request(uuid_str, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event-uuid", cmd_uuid_str); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event-uuid", cmd_uuid_str);
if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
return erlang_response_baduuid(rbuf);
}
switch_core_session_queue_private_event(session, &event, SWITCH_FALSE); switch_core_session_queue_private_event(session, &event, SWITCH_FALSE);
switch_core_session_rwunlock(session); switch_core_session_rwunlock(session);
@ -767,16 +771,18 @@ static switch_status_t handle_request_sendmsg(ei_node_t *ei_node, erlang_pid *pi
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
} }
if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
return erlang_response_baduuid(rbuf);
}
switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE); switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE);
if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) { if (build_event(event, buf) != SWITCH_STATUS_SUCCESS) {
switch_core_session_rwunlock(session);
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
} }
log_sendmsg_request(uuid_str, event); log_sendmsg_request(uuid_str, event);
if (zstr_buf(uuid_str) || !(session = switch_core_session_locate(uuid_str))) {
return erlang_response_baduuid(rbuf);
}
switch_core_session_queue_private_event(session, &event, SWITCH_FALSE); switch_core_session_queue_private_event(session, &event, SWITCH_FALSE);
switch_core_session_rwunlock(session); switch_core_session_rwunlock(session);
@ -903,25 +909,25 @@ static switch_status_t handle_request_bgapi4(ei_node_t *ei_node, erlang_pid *pid
} }
static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) { static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) {
char cmd[MAXATOMLEN + 1]; char cmd[MAXATOMLEN + 1];
char *arg; char *arg;
switch_stream_handle_t stream = { 0 }; switch_stream_handle_t stream = { 0 };
SWITCH_STANDARD_STREAM(stream); SWITCH_STANDARD_STREAM(stream);
switch_event_create(&stream.param_event, SWITCH_EVENT_API); switch_event_create(&stream.param_event, SWITCH_EVENT_API);
if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) { if (ei_decode_atom_safe(buf->buff, &buf->index, cmd)) {
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
} }
if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) { if (ei_decode_string_or_binary(buf->buff, &buf->index, &arg)) {
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "exec: %s(%s)\n", cmd, arg);
if (rbuf) { if (rbuf) {
char *reply; char *reply;
switch_status_t status; switch_status_t status;
status = api_exec_stream(cmd, arg, &stream, &reply); status = api_exec_stream(cmd, arg, &stream, &reply);
@ -940,17 +946,17 @@ static switch_status_t handle_request_api4(ei_node_t *ei_node, erlang_pid *pid,
ei_encode_switch_event_headers(rbuf, stream.param_event); ei_encode_switch_event_headers(rbuf, stream.param_event);
} }
switch_safe_free(reply); switch_safe_free(reply);
} }
if (stream.param_event) { if (stream.param_event) {
switch_event_fire(&stream.param_event); switch_event_fire(&stream.param_event);
} }
switch_safe_free(arg); switch_safe_free(arg);
switch_safe_free(stream.data); switch_safe_free(stream.data);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
static switch_status_t handle_request_json_api(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf) static switch_status_t handle_request_json_api(ei_node_t *ei_node, erlang_pid *pid, ei_x_buff *buf, ei_x_buff *rbuf)
@ -976,11 +982,14 @@ static switch_status_t handle_request_json_api(ei_node_t *ei_node, erlang_pid *p
ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "parse_error"); ei_x_encode_atom(rbuf, "parse_error");
_ei_x_encode_string(rbuf, parse_end); _ei_x_encode_string(rbuf, parse_end);
switch_safe_free(arg);
return status; return status;
} }
if ((uuid = cJSON_GetObjectCstr(jcmd, "uuid"))) { if ((uuid = cJSON_GetObjectCstr(jcmd, "uuid"))) {
if (!(session = switch_core_session_locate(uuid))) { if (!(session = switch_core_session_locate(uuid))) {
cJSON_Delete(jcmd);
switch_safe_free(arg);
return erlang_response_baduuid(rbuf); return erlang_response_baduuid(rbuf);
} }
} }
@ -1060,7 +1069,6 @@ static switch_status_t handle_request_event(ei_node_t *ei_node, erlang_pid *pid,
} }
for (i = 1; i <= length; i++) { for (i = 1; i <= length; i++) {
if (ei_decode_atom_safe(buf->buff, &buf->index, event_name)) { if (ei_decode_atom_safe(buf->buff, &buf->index, event_name)) {
switch_mutex_unlock(ei_node->event_streams_mutex); switch_mutex_unlock(ei_node->event_streams_mutex);
return erlang_response_badarg(rbuf); return erlang_response_badarg(rbuf);
@ -1237,13 +1245,10 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg *
/* {'$gen_call', {_, _}, {_, _}} = Buf */ /* {'$gen_call', {_, _}, {_, _}} = Buf */
} else if (arity == 3 && !strncmp(atom, "$gen_call", 9)) { } else if (arity == 3 && !strncmp(atom, "$gen_call", 9)) {
switch_status_t status; switch_status_t status;
ei_send_msg_t *send_msg; ei_send_msg_t *send_msg = NULL;
erlang_ref ref; erlang_ref ref;
switch_malloc(send_msg, sizeof(*send_msg)); switch_malloc(send_msg, sizeof(*send_msg));
ei_x_new(&send_msg->buf);
ei_x_new_with_version(&send_msg->buf); ei_x_new_with_version(&send_msg->buf);
/* ...{_, _}, {_, _}} = Buf */ /* ...{_, _}, {_, _}} = Buf */
@ -1252,6 +1257,8 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg *
/* is_tuple(Type) */ /* is_tuple(Type) */
if (type != ERL_SMALL_TUPLE_EXT) { if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call message of an unexpected type (ensure you are using Kazoo v2.14+).\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call message of an unexpected type (ensure you are using Kazoo v2.14+).\n");
ei_x_free(&send_msg->buf);
switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
@ -1261,12 +1268,16 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg *
/* ...pid(), _}, {_, _}} = Buf */ /* ...pid(), _}, {_, _}} = Buf */
if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid)) { if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply pid (ensure you are using Kazoo v2.14+).\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply pid (ensure you are using Kazoo v2.14+).\n");
ei_x_free(&send_msg->buf);
switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
/* ...ref()}, {_, _}} = Buf */ /* ...ref()}, {_, _}} = Buf */
if (ei_decode_ref(buf->buff, &buf->index, &ref)) { if (ei_decode_ref(buf->buff, &buf->index, &ref)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply tag (ensure you are using Kazoo v2.14+).\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received erlang call without a reply tag (ensure you are using Kazoo v2.14+).\n");
ei_x_free(&send_msg->buf);
switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR; return SWITCH_STATUS_GENERR;
} }
@ -1277,6 +1288,7 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg *
status = handle_kazoo_request(ei_node, &msg->from, buf, &send_msg->buf); status = handle_kazoo_request(ei_node, &msg->from, buf, &send_msg->buf);
if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) { if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error queuing reply\n");
ei_x_free(&send_msg->buf); ei_x_free(&send_msg->buf);
switch_safe_free(send_msg); switch_safe_free(send_msg);
} }
@ -1292,15 +1304,14 @@ static switch_status_t handle_mod_kazoo_request(ei_node_t *ei_node, erlang_msg *
static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) { static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) {
int version, size, type, arity; int version, size, type, arity;
char atom[MAXATOMLEN + 1]; char atom[MAXATOMLEN + 1];
ei_send_msg_t *send_msg; ei_send_msg_t *send_msg = NULL;
erlang_ref ref; erlang_ref ref;
switch_malloc(send_msg, sizeof(*send_msg));
ei_x_new(&send_msg->buf);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message, attempting to reply\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message, attempting to reply\n");
switch_malloc(send_msg, sizeof(*send_msg));
ei_x_new_with_version(&send_msg->buf);
buf->index = 0; buf->index = 0;
ei_decode_version(buf->buff, &buf->index, &version); ei_decode_version(buf->buff, &buf->index, &version);
ei_get_type(buf->buff, &buf->index, &type, &size); ei_get_type(buf->buff, &buf->index, &type, &size);
@ -1308,7 +1319,7 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg
/* is_tuple(Buff) */ /* is_tuple(Buff) */
if (type != ERL_SMALL_TUPLE_EXT) { if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message of an unexpected type\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message of an unexpected type\n");
return SWITCH_STATUS_GENERR; goto error;
} }
ei_decode_tuple_header(buf->buff, &buf->index, &arity); ei_decode_tuple_header(buf->buff, &buf->index, &arity);
@ -1316,13 +1327,13 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg
/* {_, _, _} = Buf */ /* {_, _, _} = Buf */
if (arity != 3) { if (arity != 3) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel tuple has an unexpected arity\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel tuple has an unexpected arity\n");
return SWITCH_STATUS_GENERR; goto error;
} }
/* {'$gen_call', _, _} = Buf */ /* {'$gen_call', _, _} = Buf */
if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "$gen_call", 9)) { if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "$gen_call", 9)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message tuple does not begin with the atom '$gen_call'\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received net_kernel message tuple does not begin with the atom '$gen_call'\n");
return SWITCH_STATUS_GENERR; goto error;
} }
ei_get_type(buf->buff, &buf->index, &type, &size); ei_get_type(buf->buff, &buf->index, &type, &size);
@ -1330,7 +1341,7 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg
/* {_, Sender, _}=Buff, is_tuple(Sender) */ /* {_, Sender, _}=Buff, is_tuple(Sender) */
if (type != ERL_SMALL_TUPLE_EXT) { if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel tuple is an unexpected type\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel tuple is an unexpected type\n");
return SWITCH_STATUS_GENERR; goto error;
} }
ei_decode_tuple_header(buf->buff, &buf->index, &arity); ei_decode_tuple_header(buf->buff, &buf->index, &arity);
@ -1338,13 +1349,13 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg
/* {_, _}=Sender */ /* {_, _}=Sender */
if (arity != 2) { if (arity != 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel message has an unexpected arity\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Second element of the net_kernel message has an unexpected arity\n");
return SWITCH_STATUS_GENERR; goto error;
} }
/* {Pid, Ref}=Sender */ /* {Pid, Ref}=Sender */
if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid) || ei_decode_ref(buf->buff, &buf->index, &ref)) { if (ei_decode_pid(buf->buff, &buf->index, &send_msg->pid) || ei_decode_ref(buf->buff, &buf->index, &ref)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to decode erlang pid or ref of the net_kernel tuple second element\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to decode erlang pid or ref of the net_kernel tuple second element\n");
return SWITCH_STATUS_GENERR; goto error;
} }
ei_get_type(buf->buff, &buf->index, &type, &size); ei_get_type(buf->buff, &buf->index, &type, &size);
@ -1352,7 +1363,7 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg
/* {_, _, Request}=Buff, is_tuple(Request) */ /* {_, _, Request}=Buff, is_tuple(Request) */
if (type != ERL_SMALL_TUPLE_EXT) { if (type != ERL_SMALL_TUPLE_EXT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message is an unexpected type\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message is an unexpected type\n");
return SWITCH_STATUS_GENERR; goto error;
} }
ei_decode_tuple_header(buf->buff, &buf->index, &arity); ei_decode_tuple_header(buf->buff, &buf->index, &arity);
@ -1360,27 +1371,31 @@ static switch_status_t handle_net_kernel_request(ei_node_t *ei_node, erlang_msg
/* {_, _}=Request */ /* {_, _}=Request */
if (arity != 2) { if (arity != 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message has an unexpected arity\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Third element of the net_kernel message has an unexpected arity\n");
return SWITCH_STATUS_GENERR; goto error;
} }
/* {is_auth, _}=Request */ /* {is_auth, _}=Request */
if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) { if (ei_decode_atom_safe(buf->buff, &buf->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "The net_kernel message third element does not begin with the atom 'is_auth'\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "The net_kernel message third element does not begin with the atom 'is_auth'\n");
return SWITCH_STATUS_GENERR; goto error;
} }
/* To ! {Tag, Reply} */ /* To ! {Tag, Reply} */
ei_x_new_with_version(&send_msg->buf);
ei_x_encode_tuple_header(&send_msg->buf, 2); ei_x_encode_tuple_header(&send_msg->buf, 2);
ei_x_encode_ref(&send_msg->buf, &ref); ei_x_encode_ref(&send_msg->buf, &ref);
ei_x_encode_atom(&send_msg->buf, "yes"); ei_x_encode_atom(&send_msg->buf, "yes");
if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) { if (switch_queue_trypush(ei_node->send_msgs, send_msg) != SWITCH_STATUS_SUCCESS) {
ei_x_free(&send_msg->buf); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "unable to queue net kernel message\n");
switch_safe_free(send_msg); goto error;
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
error:
ei_x_free(&send_msg->buf);
switch_safe_free(send_msg);
return SWITCH_STATUS_GENERR;
} }
static switch_status_t handle_erl_send(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) { static switch_status_t handle_erl_send(ei_node_t *ei_node, erlang_msg *msg, ei_x_buff *buf) {
@ -1435,7 +1450,7 @@ static void *SWITCH_THREAD_FUNC receive_handler(switch_thread_t *thread, void *o
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting erlang receive handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting erlang receive handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port);
while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
void *pop; void *pop = NULL;
if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) { if (switch_queue_pop_timeout(ei_node->received_msgs, &pop, 100000) == SWITCH_STATUS_SUCCESS) {
ei_received_msg_t *received_msg = (ei_received_msg_t *) pop; ei_received_msg_t *received_msg = (ei_received_msg_t *) pop;
@ -1469,13 +1484,13 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) { while (switch_test_flag(ei_node, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
int status; int status;
int send_msg_count = 0; int send_msg_count = 0;
void *pop; void *pop = NULL;
if (!received_msg) { if (!received_msg) {
switch_malloc(received_msg, sizeof(*received_msg)); switch_malloc(received_msg, sizeof(*received_msg));
/* create a new buf for the erlang message and a rbuf for the reply */ /* create a new buf for the erlang message and a rbuf for the reply */
if(kazoo_globals.receive_msg_preallocate > 0) { if(kazoo_globals.receive_msg_preallocate > 0) {
received_msg->buf.buff = malloc(kazoo_globals.receive_msg_preallocate); switch_malloc(received_msg->buf.buff, kazoo_globals.receive_msg_preallocate);
received_msg->buf.buffsz = kazoo_globals.receive_msg_preallocate; received_msg->buf.buffsz = kazoo_globals.receive_msg_preallocate;
received_msg->buf.index = 0; received_msg->buf.index = 0;
if(received_msg->buf.buff == NULL) { if(received_msg->buf.buff == NULL) {
@ -1485,6 +1500,8 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
} else { } else {
ei_x_new(&received_msg->buf); ei_x_new(&received_msg->buf);
} }
} else {
received_msg->buf.index = 0;
} }
while (++send_msg_count <= kazoo_globals.send_msg_batch while (++send_msg_count <= kazoo_globals.send_msg_batch
@ -1510,15 +1527,16 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
case ERL_MSG: case ERL_MSG:
fault_count = 0; fault_count = 0;
if (switch_queue_trypush(ei_node->received_msgs, received_msg) != SWITCH_STATUS_SUCCESS) {
ei_x_free(&received_msg->buf);
switch_safe_free(received_msg);
}
if (kazoo_globals.receive_msg_preallocate > 0 && received_msg->buf.buffsz > kazoo_globals.receive_msg_preallocate) { if (kazoo_globals.receive_msg_preallocate > 0 && received_msg->buf.buffsz > kazoo_globals.receive_msg_preallocate) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased received message buffer size to %d\n", received_msg->buf.buffsz); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased received message buffer size to %d\n", received_msg->buf.buffsz);
} }
if (switch_queue_trypush(ei_node->received_msgs, received_msg) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "failed to push erlang received message from %s <%d.%d.%d> into queue\n", received_msg->msg.from.node, received_msg->msg.from.creation, received_msg->msg.from.num, received_msg->msg.from.serial);
ei_x_free(&received_msg->buf);
switch_safe_free(received_msg);
}
received_msg = NULL; received_msg = NULL;
break; break;
case ERL_ERROR: case ERL_ERROR:
@ -1573,6 +1591,9 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
destroy_node_handler(ei_node); destroy_node_handler(ei_node);
switch_atomic_dec(&kazoo_globals.threads); switch_atomic_dec(&kazoo_globals.threads);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown Complete for erlang node handler %p: %s (%s:%d)\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port);
return NULL; return NULL;
} }
@ -1630,7 +1651,7 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "New erlang connection from node %s (%s:%d) -> (%s:%d)\n", ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port, ei_node->local_ip, ei_node->local_port); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "New erlang connection from node %s (%s:%d) -> (%s:%d)\n", ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port, ei_node->local_ip, ei_node->local_port);
for(i = 0; i < kazoo_globals.num_worker_threads; i++) { for(i = 0; i < kazoo_globals.node_worker_threads; i++) {
switch_threadattr_create(&thd_attr, ei_node->pool); switch_threadattr_create(&thd_attr, ei_node->pool);
switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);

View File

@ -38,7 +38,9 @@ kz_globals_t kazoo_globals = {0};
SWITCH_MODULE_DEFINITION(mod_kazoo, mod_kazoo_load, mod_kazoo_shutdown, mod_kazoo_runtime); SWITCH_MODULE_DEFINITION(mod_kazoo, mod_kazoo_load, mod_kazoo_shutdown, mod_kazoo_runtime);
SWITCH_MODULE_LOAD_FUNCTION(mod_kazoo_load) { SWITCH_MODULE_LOAD_FUNCTION(mod_kazoo_load)
{
kz_erl_init();
memset(&kazoo_globals, 0, sizeof(kazoo_globals)); memset(&kazoo_globals, 0, sizeof(kazoo_globals));
kazoo_globals.pool = pool; kazoo_globals.pool = pool;
@ -84,7 +86,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_kazoo_load) {
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_kazoo_shutdown) { SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_kazoo_shutdown) {
int sanity = 0; int sanity = 0;
remove_cli_api(); remove_cli_api();
kz_tweaks_stop(); kz_tweaks_stop();
@ -128,6 +129,8 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_kazoo_shutdown) {
switch_safe_free(kazoo_globals.ei_cookie); switch_safe_free(kazoo_globals.ei_cookie);
switch_safe_free(kazoo_globals.ei_nodename); switch_safe_free(kazoo_globals.ei_nodename);
kz_erl_shutdown();
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }