diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 4bceaaf4e8..38c0e4e4eb 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -878,9 +878,12 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, { erlang_ref ref; erlang_pid *pid; - void *p; char hash[100]; int arity; + const void *key; + void *val; + session_elem_t *se; + switch_hash_index_t *iter; ei_decode_tuple_header(buf->buff, &buf->index, &arity); @@ -906,32 +909,35 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash); - if ((p = switch_core_hash_find(listener->spawn_pid_hash, hash))) { - if (p == &globals.TIMEOUT) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", hash); - switch_core_hash_delete(listener->spawn_pid_hash, hash); - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "timeout"); - } else if (p == &globals.WAITING) { - /* update the key to point at a pid */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", hash); - switch_core_hash_delete(listener->spawn_pid_hash, hash); - switch_core_hash_insert(listener->spawn_pid_hash, hash, pid); - return SWITCH_STATUS_FALSE; /*no reply */ - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash); - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "duplicate_response"); + switch_thread_rwlock_rdlock(listener->session_rwlock); + for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { + switch_hash_this(iter, &key, NULL, &val); + se = (session_elem_t*)val; + if (se->spawn_reply && !strncmp(se->spawn_reply->hash, hash, 100)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "found matching session for %s : %s\n", hash, se->uuid_str); + switch_mutex_lock(se->spawn_reply->mutex); + if (se->spawn_reply->state == reply_not_ready) { + switch_thread_cond_wait(se->spawn_reply->ready_or_found, se->spawn_reply->mutex); + } + + if (se->spawn_reply->state == reply_waiting) { + se->spawn_reply->pid = pid; + switch_thread_cond_broadcast(se->spawn_reply->ready_or_found); + ei_x_encode_atom(rbuf, "ok"); + switch_thread_rwlock_unlock(listener->session_rwlock); + switch_mutex_unlock(se->spawn_reply->mutex); + return SWITCH_STATUS_SUCCESS; + } + switch_mutex_unlock(se->spawn_reply->mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "get_pid came in too late for %s\n", hash); + break; } - } else { - /* nothin in the hash */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", hash); - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "invalid_ref"); } + switch_thread_rwlock_unlock(listener->session_rwlock); + + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "notfound"); switch_safe_free(pid); /* don't need it */ diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 053f33033b..60acc96517 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -1217,6 +1217,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, session_element->pool); switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, session_element->pool); switch_core_hash_init(&session_element->event_hash, session_element->pool); + session_element->spawn_reply = NULL; for (x = 0; x <= SWITCH_EVENT_ALL; x++) { session_element->event_list[x] = 0; @@ -1266,9 +1267,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul /* create a session list element */ session_elem_t *session_element = session_elem_create(listener, session); char hash[100]; - int i = 0; - void *p = NULL; - erlang_pid *pid; + //void *p = NULL; + spawn_reply_t *p; erlang_ref ref; switch_set_flag(session_element, LFLAG_WAITING_FOR_PID); @@ -1279,13 +1279,25 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul ei_init_ref(listener->ec, &ref); ei_hash_ref(&ref, hash); /* insert the waiting marker */ - switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING); + + p = switch_core_alloc(session_element->pool, sizeof(*p)); + switch_thread_cond_create(&p->ready_or_found, session_element->pool); + switch_mutex_init(&p->mutex, SWITCH_MUTEX_DEFAULT, session_element->pool); + p->state = reply_not_ready; + p->hash = hash; + p->pid = NULL; + + session_element->spawn_reply = p; + + switch_mutex_lock(p->mutex); if (!strcmp(function, "!")) { /* send a message to request a pid */ ei_x_buff rbuf; ei_x_new_with_version(&rbuf); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "get_pid\n"); + ei_x_encode_tuple_header(&rbuf, 4); ei_x_encode_atom(&rbuf, "get_pid"); _ei_x_encode_string(&rbuf, switch_core_session_get_uuid(session)); @@ -1307,33 +1319,31 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul */ } - /* loop until either we timeout or we get a value that's not the waiting marker */ - while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { - if (i > 500) { /* 5 second timeout */ - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s\n", hash); - remove_session_elem_from_listener_locked(listener, session_element); - switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ - destroy_session_elem(session_element); - return NULL; - } - i++; - switch_yield(10000); /* 10ms */ + p->state = reply_waiting; + switch_thread_cond_broadcast(p->ready_or_found); + switch_thread_cond_timedwait(p->ready_or_found, + p->mutex, 5000000); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "wtf\n"); + if (!p->pid) { + p->state = reply_timeout; + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s\n", hash); + remove_session_elem_from_listener_locked(listener, session_element); + destroy_session_elem(session_element); + return NULL; } - switch_core_hash_delete(listener->spawn_pid_hash, hash); - - pid = (erlang_pid *) p; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "got pid! %s\n", hash); session_element->process.type = ERLANG_PID; - memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + memcpy(&session_element->process.pid, p->pid, sizeof(erlang_pid)); switch_set_flag(session_element, LFLAG_SESSION_ALIVE); switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); - ei_link(listener, ei_self(listener->ec), pid); - switch_safe_free(pid); /* malloced in handle_ref_tuple */ + ei_link(listener, ei_self(listener->ec), &session_element->process.pid); + + switch_safe_free(p->pid); return session_element; } @@ -1425,7 +1435,6 @@ SWITCH_STANDARD_APP(erlang_outbound_function) } if (module && function) { - switch_core_hash_init(&listener->spawn_pid_hash, listener->pool); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n"); session_element = attach_call_to_spawned_process(listener, module, function, session); } else { diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index a900f02dce..03b66d3b73 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -55,6 +55,28 @@ struct erlang_process { erlang_pid pid; }; +enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout }; + +struct fetch_reply_struct +{ + switch_thread_cond_t *ready_or_found; + int usecount; + enum reply_state state; + ei_x_buff *reply; + char winner[MAXNODELEN + 1]; +}; +typedef struct fetch_reply_struct fetch_reply_t; + +struct spawn_reply_struct +{ + switch_thread_cond_t *ready_or_found; + switch_mutex_t *mutex; + enum reply_state state; + erlang_pid *pid; + char *hash; +}; +typedef struct spawn_reply_struct spawn_reply_t; + struct session_elem { char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; switch_mutex_t *flag_mutex; @@ -65,6 +87,7 @@ struct session_elem { switch_memory_pool_t *pool; uint8_t event_list[SWITCH_EVENT_ALL + 1]; switch_hash_t *event_hash; + spawn_reply_t *spawn_reply; //struct session_elem *next; }; @@ -105,7 +128,6 @@ struct listener { switch_log_level_t level; uint8_t event_list[SWITCH_EVENT_ALL + 1]; switch_hash_t *event_hash; - switch_hash_t *spawn_pid_hash; switch_thread_rwlock_t *rwlock; switch_thread_rwlock_t *session_rwlock; //session_elem_t *session_list; @@ -153,16 +175,6 @@ struct globals_struct { }; typedef struct globals_struct globals_t; -struct fetch_reply_struct -{ - switch_thread_cond_t *ready_or_found; - int usecount; - enum { reply_not_ready, reply_waiting, reply_found, reply_timeout } state; - ei_x_buff *reply; - char winner[MAXNODELEN + 1]; -}; -typedef struct fetch_reply_struct fetch_reply_t; - struct listen_list_struct { #ifdef WIN32 SOCKET sockfd;