Ton of stuff, mainly laying groundwork for bind_search functionality

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10403 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Andrew Thompson 2008-11-14 17:55:20 +00:00
parent a2e2c8ae46
commit b2bf73de6e

View File

@ -53,14 +53,6 @@ typedef enum {
LFLAG_STATEFUL = (1 << 8)
} event_flag_t;
/* TODO - support multiple event handlers per erlang connection each with their own event filters? */
struct event_handler {
erlang_pid pid;
switch_hash_t *event_hash;
struct event_handler *next;
};
struct listener {
int sockfd;
struct ei_cnode_s *ec;
@ -70,6 +62,7 @@ struct listener {
switch_queue_t *log_queue;
switch_memory_pool_t *pool;
switch_mutex_t *flag_mutex;
switch_mutex_t *sock_mutex;
char *ebuf;
uint32_t flags;
switch_log_level_t level;
@ -98,6 +91,19 @@ static struct {
#define MAX_ACL 100
struct erlang_binding {
switch_xml_section_t section;
erlang_pid pid;
char *registered_process; /* TODO */
listener_t *listener;
struct erlang_binding *next;
};
static struct {
struct erlang_binding *head;
switch_xml_binding_t *search_binding;
} bindings;
static struct {
switch_mutex_t *mutex;
char *ip;
@ -163,6 +169,49 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
}
/* Stolen from code added to ei in R12B-5.
* Since not everyone has this verison yet;
* provide our own version.
* */
#define put8(s,n) do { \
(s)[0] = (char)((n) & 0xff); \
(s) += 1; \
} while (0)
#define put32be(s,n) do { \
(s)[0] = ((n) >> 24) & 0xff; \
(s)[1] = ((n) >> 16) & 0xff; \
(s)[2] = ((n) >> 8) & 0xff; \
(s)[3] = (n) & 0xff; \
(s) += 4; \
} while (0)
static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) {
char msgbuf[2048];
char *s;
int index = 0;
/*int n;*/
index = 5; /* max sizes: */
ei_encode_version(msgbuf,&index); /* 1 */
ei_encode_tuple_header(msgbuf,&index,3);
ei_encode_long(msgbuf,&index,ERL_LINK);
ei_encode_pid(msgbuf,&index,from); /* 268 */
ei_encode_pid(msgbuf,&index,to); /* 268 */
/* 5 byte header missing */
s = msgbuf;
put32be(s, index - 4); /* 4 */
put8(s, ERL_PASS_THROUGH); /* 1 */
/* sum: 542 */
switch_mutex_lock(listener->sock_mutex);
write(listener->sockfd, msgbuf, index);
switch_mutex_unlock(listener->sock_mutex);
}
static void expire_listener(listener_t **listener)
{
void *pop;
@ -180,6 +229,38 @@ static void expire_listener(listener_t **listener)
}
static void remove_binding(listener_t *listener) {
struct erlang_binding *ptr, *lst = NULL;
switch_mutex_lock(globals.listener_mutex);
switch_xml_set_binding_sections(bindings.search_binding, (1 << sizeof(switch_xml_section_enum_t)));
for (ptr = bindings.head; ptr; lst = ptr, ptr = ptr->next) {
if (ptr->listener == listener) {
if (bindings.head == ptr) {
if (ptr->next) {
bindings.head = ptr->next;
} else {
bindings.head = NULL;
break;
}
} else {
if (ptr->next) {
lst->next = ptr->next;
} else {
lst->next = NULL;
}
}
} else {
switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | ptr->section);
}
}
switch_mutex_unlock(globals.listener_mutex);
}
static void ei_encode_switch_event(ei_x_buff *ebuf, switch_event_t *event)
{
int i;
@ -295,40 +376,6 @@ static void close_socket(int *sock)
}
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
{
listener_t *l;
int sanity = 0;
prefs.done = 1;
switch_log_unbind_logger(socket_logger);
/*close_socket(&listen_list.sockfd);*/
while (prefs.threads || prefs.done == 1) {
switch_yield(10000);
if (++sanity == 1000) {
break;
}
}
switch_event_unbind(&globals.node);
switch_mutex_lock(globals.listener_mutex);
for (l = listen_list.listeners; l; l = l->next) {
close_socket(&l->sockfd);
}
switch_mutex_unlock(globals.listener_mutex);
switch_sleep(1500000); /* sleep for 1.5 seconds */
return SWITCH_STATUS_SUCCESS;
}
static void add_listener(listener_t *listener)
{
/* add me to the listeners so I get events */
@ -357,25 +404,6 @@ static void remove_listener(listener_t *listener)
switch_mutex_unlock(globals.listener_mutex);
}
SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
{
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
close_socket(&listen_list.sockfd);
return SWITCH_STATUS_GENERR;
}
switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
struct api_command_struct {
char *api_cmd;
@ -456,7 +484,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
ei_x_encode_string(&ebuf, acs->uuid_str);
ei_x_encode_string(&ebuf, reply);
switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&ebuf);
}
@ -478,7 +508,11 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
ei_x_encode_string(&rbuf, reply);
switch_mutex_lock(acs->listener->sock_mutex);
ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index);
switch_mutex_unlock(acs->listener->sock_mutex);
ei_x_free(&rbuf);
}
@ -500,6 +534,38 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
}
static switch_xml_t erlang_fetch (const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value,
switch_event_t *params, void *user_data)
{
switch_xml_t xml = NULL;
struct erlang_binding *ptr;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n");
switch_xml_section_t section = switch_xml_parse_section_string((char *) sectionstr);
for (ptr = bindings.head; ptr && ptr->section != section; ptr = ptr->next); /* just get the first match */
if (!ptr) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
return NULL;
}
if (!ptr->listener) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n");
return NULL; /* our pointer is trash */
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->pid.node);
switch_mutex_lock(ptr->listener->sock_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "It's a lock!\n");
switch_mutex_unlock(ptr->listener->sock_mutex);
return xml;
}
static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{
@ -829,6 +895,57 @@ sendmsg_fail:
break;
}
} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
/* format is (result|config|directory|dialplan|phrases) */
char sectionstr[MAXATOMLEN];
if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
switch_xml_section_t section;
if (!(section = switch_xml_parse_section_string(sectionstr))) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
break;
}
struct erlang_binding *binding, *ptr;
if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badmem");
break;
}
binding->section = section;
binding->pid = msg->from;
binding->listener = listener;
switch_mutex_lock(globals.listener_mutex);
for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
if (ptr) {
ptr->next = binding;
} else {
bindings.head = binding;
}
switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
switch_mutex_unlock(globals.listener_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
ei_link(listener, ei_self(listener->ec), &msg->from);
} else {
ei_x_encode_tuple_header(rbuf, 2);
@ -852,10 +969,12 @@ sendmsg_fail:
switch_clear_flag_locked(listener, LFLAG_LOG);
}
} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
ei_link(listener, ei_self(listener->ec), &msg->from);
listener->log_pid = msg->from;
listener->level = SWITCH_LOG_DEBUG;
switch_set_flag(listener, LFLAG_LOG);
} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
ei_link(listener, ei_self(listener->ec), &msg->from);
listener->event_pid = msg->from;
if (!switch_test_flag(listener, LFLAG_EVENTS)) {
switch_set_flag_locked(listener, LFLAG_EVENTS);
@ -888,6 +1007,10 @@ sendmsg_fail:
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "ok");
ei_x_encode_pid(rbuf, ei_self(listener->ec));
} else if (!strncmp(atom, "link", MAXATOMLEN)) {
/* debugging */
ei_link(listener, ei_self(listener->ec), &msg->from);
goto noreply;
} else {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
@ -905,12 +1028,17 @@ sendmsg_fail:
break;
}
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex);
noreply:
return 0;
event_done:
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index);
switch_mutex_unlock(listener->sock_mutex);
return 1;
}
@ -985,7 +1113,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
switch_mutex_lock(listener->sock_mutex);
status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100);
switch_mutex_unlock(listener->sock_mutex);
switch(status) {
case ERL_TICK :
@ -1043,7 +1173,11 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
ei_x_encode_tuple_header(&lbuf, 2);
ei_x_encode_atom(&lbuf, "log");
ei_x_encode_string(&lbuf, data);
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index);
switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&lbuf);
}
}
@ -1060,7 +1194,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
ei_encode_switch_event(&ebuf, pevent);
switch_mutex_lock(listener->sock_mutex);
ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index);
switch_mutex_unlock(listener->sock_mutex);
ei_x_free(&ebuf);
switch_event_destroy(&pevent);
@ -1084,6 +1220,9 @@ done:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
switch_core_hash_destroy(&listener->event_hash);
/* remove any bindings for this connection */
remove_binding(listener);
if (listener->session) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
switch_clear_flag_locked(listener, LFLAG_SESSION);
@ -1093,6 +1232,7 @@ done:
switch_core_destroy_memory_pool(&pool);
}
switch_mutex_lock(globals.listener_mutex);
prefs.threads--;
switch_mutex_unlock(globals.listener_mutex);
@ -1111,6 +1251,7 @@ static void launch_listener_thread(listener_t *listener)
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
}
@ -1173,6 +1314,39 @@ static int config(void)
}
/* Module Hooks */
SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
{
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
close_socket(&listen_list.sockfd);
return SWITCH_STATUS_GENERR;
}
switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE);
memset(&bindings, 0, sizeof(bindings));
if (switch_xml_bind_search_function_ret(erlang_fetch, (1 << sizeof(switch_xml_section_enum_t)), NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
close_socket(&listen_list.sockfd);
return SWITCH_STATUS_GENERR;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
{
switch_memory_pool_t *pool = NULL, *listener_pool = NULL;
@ -1328,6 +1502,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
listener_pool = NULL;
listener->level = SWITCH_LOG_DEBUG;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_core_hash_init(&listener->event_hash, listener->pool);
launch_listener_thread(listener);
@ -1358,6 +1533,41 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
return SWITCH_STATUS_TERM;
}
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
{
listener_t *l;
int sanity = 0;
prefs.done = 1;
switch_log_unbind_logger(socket_logger);
/*close_socket(&listen_list.sockfd);*/
while (prefs.threads || prefs.done == 1) {
switch_yield(10000);
if (++sanity == 1000) {
break;
}
}
switch_event_unbind(&globals.node);
switch_xml_unbind_search_function_ptr(erlang_fetch);
switch_mutex_lock(globals.listener_mutex);
for (l = listen_list.listeners; l; l = l->next) {
close_socket(&l->sockfd);
}
switch_mutex_unlock(globals.listener_mutex);
switch_sleep(1500000); /* sleep for 1.5 seconds */
return SWITCH_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c