From d9bb4dcc9c67cdfdeaa28ce5b521e3c63755e812 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Thu, 22 Jan 2009 23:07:37 +0000 Subject: [PATCH] Some inital work on abstracting registered processes and pids git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11429 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/handle_msg.c | 11 +++-- .../mod_erlang_event/mod_erlang_event.c | 42 ++++++++++++++----- .../mod_erlang_event/mod_erlang_event.h | 20 ++++++--- 3 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 5a00d58fb6..ff4689362f 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -488,7 +488,8 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei } else { binding->section = section; - binding->pid = msg->from; + binding->process.type = ERLANG_PID; + binding->process.pid = msg->from; binding->listener = listener; switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); @@ -530,7 +531,7 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei } else { switch_core_session_t *session; if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) { - /* create a new sesion list element and attach it to this listener */ + /* create a new session list element and attach it to this listener */ if (attach_call_to_listener(listener,reg_name,session)) { ei_x_encode_atom(rbuf, "ok"); } else { @@ -606,13 +607,15 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e ei_x_encode_atom(rbuf, "ok"); } else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) { ei_link(listener, ei_self(listener->ec), &msg->from); - listener->log_pid = msg->from; + listener->log_process.type = ERLANG_PID; + listener->log_process.pid = msg->from; listener->level = SWITCH_LOG_DEBUG; switch_set_flag(listener, LFLAG_LOG); ei_x_encode_atom(rbuf, "ok"); } else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) { ei_link(listener, ei_self(listener->ec), &msg->from); - listener->event_pid = msg->from; + listener->event_process.type = ERLANG_PID; + listener->event_process.pid = msg->from; if (!switch_test_flag(listener, LFLAG_EVENTS)) { switch_set_flag_locked(listener, LFLAG_EVENTS); } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 75d62fa0fa..607c73adc1 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -114,7 +114,7 @@ static void remove_binding(listener_t *listener, erlang_pid *pid) { for (ptr = bindings.head; ptr; lst = ptr, ptr = ptr->next) { if ((listener && ptr->listener == listener) || - (pid && (&ptr->pid) && (!strcmp(pid->node, ptr->pid.node)) && pid->creation == ptr->pid.creation && pid->num == ptr->pid.num && pid->serial == ptr->pid.serial)) { + (pid && (&ptr->process.type == ERLANG_PID) && (!strcmp(pid->node, ptr->process.pid.node)) && pid->creation == ptr->process.pid.creation && pid->num == ptr->process.pid.num && pid->serial == ptr->process.pid.serial)) { if (bindings.head == ptr) { if (ptr->next) { bindings.head = ptr->next; @@ -327,7 +327,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c return NULL; /* our pointer is trash */ } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->pid.node); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node); switch_uuid_get(&uuid); switch_uuid_format(uuid_str, &uuid); @@ -346,7 +346,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /*switch_core_hash_insert(ptr->reply_hash, uuid_str, );*/ switch_mutex_lock(ptr->listener->sock_mutex); - ei_send(ptr->listener->sockfd, &ptr->pid, buf.buff, buf.index); + ei_send(ptr->listener->sockfd, &ptr->process.pid, buf.buff, buf.index); switch_mutex_unlock(ptr->listener->sock_mutex); int i = 0; @@ -392,8 +392,9 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c } -static switch_status_t notify_new_session(listener_t *listener, switch_core_session_t *session, char* reg_name) +static switch_status_t notify_new_session(listener_t *listener, switch_core_session_t *session, struct erlang_process process) { + int result; switch_event_t *call_event=NULL; switch_channel_t *channel=NULL; @@ -417,7 +418,13 @@ static switch_status_t notify_new_session(listener_t *listener, switch_core_sess ei_encode_switch_event(&lbuf, call_event); switch_mutex_lock(listener->sock_mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending initial call event\n"); - if (ei_reg_send(listener->ec,listener->sockfd, reg_name, lbuf.buff, lbuf.index)==ERL_ERROR) { + if (process.type == ERLANG_PID) { + result = ei_send(listener->sockfd, &process.pid, lbuf.buff, lbuf.index); + } else { + result = ei_reg_send(listener->ec, listener->sockfd, process.reg_name, lbuf.buff, lbuf.index); + } + + if (result) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to send call event\n"); } switch_mutex_unlock(listener->sock_mutex); @@ -441,7 +448,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) last = NULL; while(sp) { if (!switch_test_flag(sp, LFLAG_OUTBOUND_INIT)) { - status = notify_new_session(listener, sp->session, sp->reg_name); + status = notify_new_session(listener, sp->session, sp->process); if (status != SWITCH_STATUS_SUCCESS) break; switch_set_flag(sp, LFLAG_OUTBOUND_INIT); @@ -460,7 +467,11 @@ static switch_status_t check_attached_sessions(listener_t *listener) ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - ei_reg_send(listener->ec, listener->sockfd, sp->reg_name, ebuf.buff, ebuf.index); + if (sp->process.type == ERLANG_PID) { + ei_send(listener->sockfd, &sp->process.pid, ebuf.buff, ebuf.index); + } else { + ei_reg_send(listener->ec, listener->sockfd, sp->process.reg_name, ebuf.buff, ebuf.index); + } switch_mutex_unlock(listener->sock_mutex); /* event is a hangup, so this session can be removed */ @@ -535,7 +546,11 @@ static void check_log_queue(listener_t *listener) ei_x_encode_empty_list(&lbuf); switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index); + if (listener->log_process.type == ERLANG_PID) { + ei_send(listener->sockfd, &listener->log_process.pid, lbuf.buff, lbuf.index); + } else { + ei_reg_send(listener->ec, listener->sockfd, listener->log_process.reg_name, lbuf.buff, lbuf.index); + } switch_mutex_unlock(listener->sock_mutex); ei_x_free(&lbuf); @@ -561,9 +576,13 @@ static void check_event_queue(listener_t *listener) ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index); + if (listener->log_process.type == ERLANG_PID) { + ei_send(listener->sockfd, &listener->log_process.pid, ebuf.buff, ebuf.index); + } else { + ei_reg_send(listener->ec, listener->sockfd, listener->log_process.reg_name, ebuf.buff, ebuf.index); + } switch_mutex_unlock(listener->sock_mutex); - + ei_x_free(&ebuf); switch_event_destroy(&pevent); } @@ -880,7 +899,8 @@ session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, sw } else { session_element->session = session; - session_element->reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); + session_element->process.type = ERLANG_REG_PROCESS; + session_element->process.reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); switch_set_flag(session_element, LFLAG_SESSION_ALIVE); switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index cbf982bdfd..f1373e5d8b 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -38,12 +38,23 @@ typedef enum { LFLAG_SESSION_ALIVE } session_flag_t; +typedef enum { + ERLANG_PID = 0, + ERLANG_REG_PROCESS +} process_type; + +struct erlang_process { + process_type type; + char *reg_name; + erlang_pid pid; +}; + struct session_elem { switch_core_session_t *session; switch_mutex_t *flag_mutex; uint32_t flags; /* registered process name that will receive call notifications from this session */ - char* reg_name; + struct erlang_process process; switch_queue_t *event_queue; struct session_elem *next; }; @@ -68,8 +79,8 @@ typedef enum { struct listener { int sockfd; struct ei_cnode_s *ec; - erlang_pid log_pid; - erlang_pid event_pid; + struct erlang_process log_process; + struct erlang_process event_process; char *peer_nodename; switch_queue_t *event_queue; switch_queue_t *log_queue; @@ -99,8 +110,7 @@ typedef struct listener listener_t; struct erlang_binding { switch_xml_section_t section; - erlang_pid pid; - char *registered_process; /* TODO */ + struct erlang_process process; listener_t *listener; struct erlang_binding *next; };