Switch from a busy-wait loop to crazy conditional timedwait stuff for speed

This commit is contained in:
Andrew Thompson 2010-08-06 01:31:07 -04:00
parent 2519493aab
commit 6397f898fd
3 changed files with 85 additions and 58 deletions

View File

@ -878,9 +878,12 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg,
{
erlang_ref ref;
erlang_pid *pid;
void *p;
char hash[100];
int arity;
const void *key;
void *val;
session_elem_t *se;
switch_hash_index_t *iter;
ei_decode_tuple_header(buf->buff, &buf->index, &arity);
@ -906,32 +909,35 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg,
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
if ((p = switch_core_hash_find(listener->spawn_pid_hash, hash))) {
if (p == &globals.TIMEOUT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", hash);
switch_core_hash_delete(listener->spawn_pid_hash, hash);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "timeout");
} else if (p == &globals.WAITING) {
/* update the key to point at a pid */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", hash);
switch_core_hash_delete(listener->spawn_pid_hash, hash);
switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
return SWITCH_STATUS_FALSE; /*no reply */
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "duplicate_response");
switch_thread_rwlock_rdlock(listener->session_rwlock);
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &val);
se = (session_elem_t*)val;
if (se->spawn_reply && !strncmp(se->spawn_reply->hash, hash, 100)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "found matching session for %s : %s\n", hash, se->uuid_str);
switch_mutex_lock(se->spawn_reply->mutex);
if (se->spawn_reply->state == reply_not_ready) {
switch_thread_cond_wait(se->spawn_reply->ready_or_found, se->spawn_reply->mutex);
}
if (se->spawn_reply->state == reply_waiting) {
se->spawn_reply->pid = pid;
switch_thread_cond_broadcast(se->spawn_reply->ready_or_found);
ei_x_encode_atom(rbuf, "ok");
switch_thread_rwlock_unlock(listener->session_rwlock);
switch_mutex_unlock(se->spawn_reply->mutex);
return SWITCH_STATUS_SUCCESS;
}
switch_mutex_unlock(se->spawn_reply->mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "get_pid came in too late for %s\n", hash);
break;
}
} else {
/* nothin in the hash */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", hash);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "invalid_ref");
}
switch_thread_rwlock_unlock(listener->session_rwlock);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "notfound");
switch_safe_free(pid); /* don't need it */

View File

@ -1217,6 +1217,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t
switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, session_element->pool);
switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, session_element->pool);
switch_core_hash_init(&session_element->event_hash, session_element->pool);
session_element->spawn_reply = NULL;
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
session_element->event_list[x] = 0;
@ -1266,9 +1267,8 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
/* create a session list element */
session_elem_t *session_element = session_elem_create(listener, session);
char hash[100];
int i = 0;
void *p = NULL;
erlang_pid *pid;
//void *p = NULL;
spawn_reply_t *p;
erlang_ref ref;
switch_set_flag(session_element, LFLAG_WAITING_FOR_PID);
@ -1279,13 +1279,25 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
ei_init_ref(listener->ec, &ref);
ei_hash_ref(&ref, hash);
/* insert the waiting marker */
switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING);
p = switch_core_alloc(session_element->pool, sizeof(*p));
switch_thread_cond_create(&p->ready_or_found, session_element->pool);
switch_mutex_init(&p->mutex, SWITCH_MUTEX_DEFAULT, session_element->pool);
p->state = reply_not_ready;
p->hash = hash;
p->pid = NULL;
session_element->spawn_reply = p;
switch_mutex_lock(p->mutex);
if (!strcmp(function, "!")) {
/* send a message to request a pid */
ei_x_buff rbuf;
ei_x_new_with_version(&rbuf);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "get_pid\n");
ei_x_encode_tuple_header(&rbuf, 4);
ei_x_encode_atom(&rbuf, "get_pid");
_ei_x_encode_string(&rbuf, switch_core_session_get_uuid(session));
@ -1307,33 +1319,31 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul
*/
}
/* loop until either we timeout or we get a value that's not the waiting marker */
while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) {
if (i > 500) { /* 5 second timeout */
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s\n", hash);
remove_session_elem_from_listener_locked(listener, session_element);
switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */
destroy_session_elem(session_element);
return NULL;
}
i++;
switch_yield(10000); /* 10ms */
p->state = reply_waiting;
switch_thread_cond_broadcast(p->ready_or_found);
switch_thread_cond_timedwait(p->ready_or_found,
p->mutex, 5000000);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "wtf\n");
if (!p->pid) {
p->state = reply_timeout;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s\n", hash);
remove_session_elem_from_listener_locked(listener, session_element);
destroy_session_elem(session_element);
return NULL;
}
switch_core_hash_delete(listener->spawn_pid_hash, hash);
pid = (erlang_pid *) p;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "got pid! %s\n", hash);
session_element->process.type = ERLANG_PID;
memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
memcpy(&session_element->process.pid, p->pid, sizeof(erlang_pid));
switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
ei_link(listener, ei_self(listener->ec), pid);
switch_safe_free(pid); /* malloced in handle_ref_tuple */
ei_link(listener, ei_self(listener->ec), &session_element->process.pid);
switch_safe_free(p->pid);
return session_element;
}
@ -1425,7 +1435,6 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
}
if (module && function) {
switch_core_hash_init(&listener->spawn_pid_hash, listener->pool);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
session_element = attach_call_to_spawned_process(listener, module, function, session);
} else {

View File

@ -55,6 +55,28 @@ struct erlang_process {
erlang_pid pid;
};
enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout };
struct fetch_reply_struct
{
switch_thread_cond_t *ready_or_found;
int usecount;
enum reply_state state;
ei_x_buff *reply;
char winner[MAXNODELEN + 1];
};
typedef struct fetch_reply_struct fetch_reply_t;
struct spawn_reply_struct
{
switch_thread_cond_t *ready_or_found;
switch_mutex_t *mutex;
enum reply_state state;
erlang_pid *pid;
char *hash;
};
typedef struct spawn_reply_struct spawn_reply_t;
struct session_elem {
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
switch_mutex_t *flag_mutex;
@ -65,6 +87,7 @@ struct session_elem {
switch_memory_pool_t *pool;
uint8_t event_list[SWITCH_EVENT_ALL + 1];
switch_hash_t *event_hash;
spawn_reply_t *spawn_reply;
//struct session_elem *next;
};
@ -105,7 +128,6 @@ struct listener {
switch_log_level_t level;
uint8_t event_list[SWITCH_EVENT_ALL + 1];
switch_hash_t *event_hash;
switch_hash_t *spawn_pid_hash;
switch_thread_rwlock_t *rwlock;
switch_thread_rwlock_t *session_rwlock;
//session_elem_t *session_list;
@ -153,16 +175,6 @@ struct globals_struct {
};
typedef struct globals_struct globals_t;
struct fetch_reply_struct
{
switch_thread_cond_t *ready_or_found;
int usecount;
enum { reply_not_ready, reply_waiting, reply_found, reply_timeout } state;
ei_x_buff *reply;
char winner[MAXNODELEN + 1];
};
typedef struct fetch_reply_struct fetch_reply_t;
struct listen_list_struct {
#ifdef WIN32
SOCKET sockfd;