diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index b1caa67e3a..9960e2499d 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -55,6 +55,8 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename); static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj); static void launch_listener_thread(listener_t *listener); +session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid); + static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level) { listener_t *l; @@ -135,14 +137,7 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t return; } - - switch_thread_rwlock_rdlock(listener->session_rwlock); - if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) { - switch_thread_rwlock_rdlock(s->rwlock); - } - switch_thread_rwlock_unlock(listener->session_rwlock); - - if (s) { + if ((s = (session_elem_t*)find_session_elem_by_uuid(listener, uuid))) { int send = 0; switch_thread_rwlock_rdlock(s->event_rwlock); @@ -296,7 +291,7 @@ static void remove_listener(listener_t *listener) } /* Search for a listener already talking to the specified node and lock for reading*/ -static listener_t *find_listener_locked(char *nodename) +static listener_t *find_listener(char *nodename) { listener_t *l = NULL; @@ -321,14 +316,9 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element) -{ - switch_core_hash_delete(listener->sessions, session_element->uuid_str); -} - -static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element) { switch_thread_rwlock_wrlock(listener->session_rwlock); - remove_session_elem_from_listener(listener, session_element); + switch_core_hash_delete(listener->sessions, session_element->uuid_str); switch_thread_rwlock_unlock(listener->session_rwlock); } @@ -343,10 +333,6 @@ static void destroy_session_elem(session_elem_t *session_element) if ((session = switch_core_session_locate(session_element->uuid_str))) { switch_channel_t *channel = switch_core_session_get_channel(session); - if (switch_channel_get_state(channel) < CS_HANGUP) { - switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Outbound session for %s exited unexpectedly!\n", session_element->uuid_str); - } - switch_channel_set_private(channel, "_erlang_session_", NULL); switch_channel_clear_flag(channel, CF_CONTROLLED); switch_core_session_rwunlock(session); @@ -354,6 +340,19 @@ static void destroy_session_elem(session_elem_t *session_element) switch_core_destroy_memory_pool(&session_element->pool); } +session_elem_t *find_session_elem_by_uuid(listener_t *listener, const char *uuid) +{ + session_elem_t *session = NULL; + + switch_thread_rwlock_rdlock(listener->session_rwlock); + if ((session = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) { + switch_thread_rwlock_rdlock(session->rwlock); + } + + switch_thread_rwlock_unlock(listener->session_rwlock); + + return session; + } session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) { @@ -362,14 +361,17 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) void *val = NULL; session_elem_t *session = NULL; + switch_thread_rwlock_rdlock(listener->session_rwlock); for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { switch_hash_this(iter, &key, NULL, &val); if (((session_elem_t*)val)->process.type == ERLANG_PID && !ei_compare_pids(pid, &((session_elem_t*)val)->process.pid)) { session = (session_elem_t*)val; + switch_thread_rwlock_rdlock(session->rwlock); break; } } + switch_thread_rwlock_unlock(listener->session_rwlock); return session; } @@ -536,6 +538,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t * if (!(session = switch_core_session_locate(session_element->uuid_str))) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(session_element->uuid_str), SWITCH_LOG_WARNING, "Can't locate session %s\n", session_element->uuid_str); + switch_event_destroy(&call_event); return SWITCH_STATUS_FALSE; } @@ -672,19 +675,16 @@ static switch_status_t check_attached_sessions(listener_t *listener) } } switch_thread_rwlock_unlock(listener->session_rwlock); - /* release the read lock and get a write lock */ - switch_thread_rwlock_wrlock(listener->session_rwlock); - /* do the deferred remove */ + /* do the deferred remove */ for (header = event->headers; header; header = header->next) { - if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) { + if ((sp = (session_elem_t*)find_session_elem_by_uuid(listener, header->value))) { remove_session_elem_from_listener(listener, sp); + switch_thread_rwlock_unlock(sp->rwlock); destroy_session_elem(sp); } } - switch_thread_rwlock_unlock(listener->session_rwlock); - /* remove the temporary event */ switch_event_destroy(&event); @@ -783,13 +783,23 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) remove_binding(NULL, pid); /* TODO - why don't we pass the listener as the first argument? */ - /* TODO - eliminate session destroy races and we shouldn't lock the session hash */ - switch_thread_rwlock_wrlock(listener->session_rwlock); if ((s = find_session_elem_by_pid(listener, pid))) { - remove_session_elem_from_listener(listener, s); - destroy_session_elem(s); + switch_core_session_t *session = NULL; + + if ((session = switch_core_session_locate(s->uuid_str))) { + switch_channel_t *channel = switch_core_session_get_channel(session); + + if (switch_channel_get_state(channel) < CS_HANGUP) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Outbound session exited unexpectedly %s!\n", s->uuid_str); + } + + switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); + switch_core_session_rwunlock(session); + } + + switch_thread_rwlock_unlock(s->rwlock); + } - switch_thread_rwlock_wrlock(listener->session_rwlock); if (listener->log_process.type == ERLANG_PID && !ei_compare_pids(&listener->log_process.pid, pid)) { @@ -1214,7 +1224,6 @@ static listener_t *new_outbound_listener_locked(char *node) return NULL; } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node); listener = new_listener(&ec, clientfd); listener->peer_nodename = switch_core_strdup(listener->pool, node); } @@ -1410,10 +1419,11 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000); if (!p->pid) { + switch_channel_t *channel = switch_core_session_get_channel(session); + p->state = reply_timeout; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str); - remove_session_elem_from_listener_locked(listener, session_element); - destroy_session_elem(session_element); + switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); return NULL; } @@ -1500,7 +1510,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node); /* first work out if there is a listener already talking to the node we want to talk to */ - listener = find_listener_locked(node); + listener = find_listener(node); /* if there is no listener, then create one */ if (!listener) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n"); @@ -1567,7 +1577,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) ei_x_encode_atom(&buf, "freeswitch_sendmsg"); _ei_x_encode_string(&buf, argv[2]); - listener = find_listener_locked(node); + listener = find_listener(node); if (!listener) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node); listener = new_outbound_listener_locked(node);