diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 5651f0d975..c4da69eab5 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -151,8 +151,8 @@ static void send_event_to_attached_sessions(listener_t* listener, switch_event_t switch_mutex_lock(listener->session_mutex); for (s = listener->session_list; s; s = s->next) { /* check the event uuid against the uuid of each session */ - if (!strcmp(uuid, switch_core_session_get_uuid(s->session))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending event to attached session for %s\n", switch_core_session_get_uuid(s->session)); + if (!strcmp(uuid, s->uuid_str)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending event to attached session for %s\n", s->uuid_str); if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { /* add the event to the queue for this session */ if (switch_queue_trypush(s->event_queue, clone) != SWITCH_STATUS_SUCCESS) { @@ -303,26 +303,32 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s switch_mutex_unlock(listener->session_mutex); } -static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session) +static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element) { session_elem_t *s, *last = NULL; + switch_core_session_t *session; - if (!session) + if (!session_element) + return; + return; switch_mutex_lock(listener->session_mutex); for(s = listener->session_list; s; s = s->next) { - if (s == session) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Removing session\n"); + if (s == session_element) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Removing session element\n"); if (last) { last->next = s->next; } else { listener->session_list = s->next; } - switch_channel_clear_flag(switch_core_session_get_channel(s->session), CF_CONTROLLED); - /* this allows the application threads to exit */ - switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE); - switch_core_session_rwunlock(s->session); + if (!(session = switch_core_session_locate(session_element->uuid_str))) { + switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); + /* this allows the application threads to exit */ + switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE); + switch_core_session_rwunlock(session); + } + break; } last = s; } @@ -444,16 +450,21 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c } -static switch_status_t notify_new_session(listener_t *listener, switch_core_session_t *session, struct erlang_process process) +static switch_status_t notify_new_session(listener_t *listener, session_elem_t *session_element) { int result; + switch_core_session_t *session; switch_event_t *call_event=NULL; switch_channel_t *channel=NULL; /* Send a message to the associated registered process to let it know there is a call. Message is a tuple of the form {call, } */ + if (!(session = switch_core_session_locate(session_element->uuid_str))) + return SWITCH_STATUS_FALSE; + channel = switch_core_session_get_channel(session); + switch_core_session_rwunlock(session); if (switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n"); return SWITCH_STATUS_MEMERR; @@ -470,7 +481,7 @@ static switch_status_t notify_new_session(listener_t *listener, switch_core_sess ei_encode_switch_event(&lbuf, call_event); switch_mutex_lock(listener->sock_mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending initial call event\n"); - result = ei_sendto(listener->ec, listener->sockfd, &process, &lbuf); + result = ei_sendto(listener->ec, listener->sockfd, &session_element->process, &lbuf); switch_mutex_unlock(listener->sock_mutex); @@ -503,7 +514,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) } if (!switch_test_flag(sp, LFLAG_OUTBOUND_INIT)) { - status = notify_new_session(listener, sp->session, sp->process); + status = notify_new_session(listener, sp); if (status != SWITCH_STATUS_SUCCESS) break; switch_set_flag(sp, LFLAG_OUTBOUND_INIT); @@ -516,6 +527,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) to distinguish them from normal events (if they are sent to the same process) */ ei_x_buff ebuf; + ei_x_new_with_version(&ebuf); ei_x_encode_tuple_header(&ebuf, 2); ei_x_encode_atom(&ebuf, "call_event"); @@ -528,7 +540,9 @@ static switch_status_t check_attached_sessions(listener_t *listener) /* event is a hangup, so this session can be removed */ if (pevent->event_id == SWITCH_EVENT_CHANNEL_HANGUP) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hangup event for attached session for %s\n", switch_core_session_get_uuid(sp->session)); + switch_core_session_t *session; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hangup event for attached session for %s\n", sp->uuid_str); /* remove session from list */ if (last) @@ -536,10 +550,12 @@ static switch_status_t check_attached_sessions(listener_t *listener) else listener->session_list = sp->next; - switch_channel_clear_flag(switch_core_session_get_channel(sp->session), CF_CONTROLLED); - /* this allows the application threads to exit */ - switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE); - switch_core_session_rwunlock(sp->session); + if ((session = switch_core_session_locate(sp->uuid_str))) { + switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); + /* this allows the application threads to exit */ + switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE); + switch_core_session_rwunlock(session); + } removed = 1; ei_x_new_with_version(&ebuf); @@ -650,7 +666,7 @@ static void handle_exit(listener_t *listener, erlang_pid *pid) remove_binding(NULL, pid); /* TODO - why don't we pass the listener as the first argument? */ if ((s = find_session_elem_by_pid(listener, pid))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Outbound session for %s exited unexpectedly!\n", - switch_core_session_get_uuid(s->session)); + s->uuid_str); /* TODO - if a spawned process that was handling an outbound call fails.. what do we do with the call? */ remove_session_elem_from_listener(listener, s); } @@ -821,6 +837,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) { listener_t *listener = (listener_t *) obj; session_elem_t* s; + switch_core_session_t *session; switch_mutex_lock(globals.listener_mutex); prefs.threads++; @@ -863,11 +880,12 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) /* clean up all the attached sessions */ switch_mutex_lock(listener->session_mutex); for (s = listener->session_list; s; s = s->next) { - switch_channel_clear_flag(switch_core_session_get_channel(s->session), CF_CONTROLLED); - /* this allows the application threads to exit */ - switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE); - /* */ - switch_core_session_rwunlock(s->session); + if ((session = switch_core_session_locate(s->uuid_str))) { + switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); + /* this allows the application threads to exit */ + switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE); + switch_core_session_rwunlock(session); + } } switch_mutex_unlock(listener->session_mutex); @@ -1030,7 +1048,6 @@ session_elem_t* attach_call_to_registered_process(listener_t* listener, char* re switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); } else { - session_element->session = session; session_element->process.type = ERLANG_REG_PROCESS; session_element->process.reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); switch_set_flag(session_element, LFLAG_SESSION_ALIVE); @@ -1056,7 +1073,7 @@ session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); } else { - session_element->session = session; + memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); session_element->process.type = ERLANG_PID; memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); switch_set_flag(session_element, LFLAG_SESSION_ALIVE); @@ -1078,8 +1095,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul session_elem_t* session_element=NULL; if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); - } - else { + } else { if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); } @@ -1087,7 +1103,8 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul char hash[100]; int i = 0; void *p = NULL; - session_element->session = session; + + memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); erlang_pid *pid; erlang_ref ref; @@ -1131,7 +1148,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { if (i > 50) { /* half a second timeout */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); - switch_core_session_rwunlock(session); + /*switch_core_session_rwunlock(session);*/ remove_session_elem_from_listener(listener,session_element); switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ return NULL; @@ -1152,8 +1169,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); - /* this hangs because it can never get hold of the socket mutex */ - ei_link(listener, ei_self(listener->ec), pid); + ei_link(listener, ei_self(listener->ec), pid); } } return session_element; @@ -1327,7 +1343,7 @@ SWITCH_STANDARD_API(erlang_cmd) switch_mutex_lock(l->session_mutex); if ((sp = l->session_list)) { while(sp) { - stream->write_function(stream, "Outbound session for %s\n", switch_core_session_get_uuid(sp->session)); + stream->write_function(stream, "Outbound session for %s\n", sp->uuid_str); sp = sp->next; } } else { diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 70f9d90015..72c208b8c6 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -55,7 +55,7 @@ struct erlang_process { }; struct session_elem { - switch_core_session_t *session; + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; switch_mutex_t *flag_mutex; uint32_t flags; struct erlang_process process;