diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 1a8b64fdb4..bb37cddc59 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -173,68 +173,74 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * ei_x_encode_atom(rbuf, "error"); ei_x_encode_atom(rbuf, "badarg"); } else { - ei_x_buff *nbuf = malloc(sizeof(nbuf)); - nbuf->buff = malloc(buf->buffsz); - memcpy(nbuf->buff, buf->buff, buf->buffsz); - nbuf->index = buf->index; - nbuf->buffsz = buf->buffsz; + /* TODO - maybe use a rwlock instead */ + if ((p = switch_core_hash_find_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex))) { + /* try to lock the mutex, so no other responder can */ + if (switch_mutex_trylock(p->mutex) == SWITCH_STATUS_SUCCESS) { + if (p->state == reply_waiting) { + /* alright, we've got the lock and we're the first to reply */ - switch_mutex_lock(globals.fetch_reply_mutex); - if ((p = switch_core_hash_find(globals.fetch_reply_hash, uuid_str))) { - /* Get the status and release the lock ASAP. */ - enum { is_timeout, is_waiting, is_filled } status; - if (p->state == reply_not_ready) { - switch_thread_cond_wait(p->ready_or_found, globals.fetch_reply_mutex); - } + /* clone the reply so it doesn't get destroyed on us */ + ei_x_buff *nbuf = malloc(sizeof(nbuf)); + nbuf->buff = malloc(buf->buffsz); + memcpy(nbuf->buff, buf->buff, buf->buffsz); + nbuf->index = buf->index; + nbuf->buffsz = buf->buffsz; - if (p->state == reply_waiting) { - /* update the key with a reply */ - status = is_waiting; - p->reply = nbuf; - p->state = reply_found; - strncpy(p->winner, listener->peer_nodename, MAXNODELEN); - switch_thread_cond_broadcast(p->ready_or_found); - } else if (p->state == reply_timeout) { - status = is_timeout; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str); + + /* copy info into the reply struct */ + p->state = reply_found; + p->reply = nbuf; + strncpy(p->winner, listener->peer_nodename, MAXNODELEN); + + /* signal waiting thread that its time to wake up */ + switch_thread_cond_signal(p->ready_or_found); + + /* reply OK */ + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "ok"); + _ei_x_encode_string(rbuf, uuid_str); + + /* unlock */ + switch_mutex_unlock(p->mutex); + } else { + if (p->state == reply_found) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str); + ei_x_encode_tuple_header(rbuf, 3); + ei_x_encode_atom(rbuf, "error"); + _ei_x_encode_string(rbuf, uuid_str); + ei_x_encode_atom(rbuf, "duplicate_response"); + } else if (p->state == reply_timeout) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str); + ei_x_encode_tuple_header(rbuf, 3); + ei_x_encode_atom(rbuf, "error"); + _ei_x_encode_string(rbuf, uuid_str); + ei_x_encode_atom(rbuf, "timeout"); + } else if (p->state == reply_not_ready) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str); + ei_x_encode_tuple_header(rbuf, 3); + ei_x_encode_atom(rbuf, "error"); + _ei_x_encode_string(rbuf, uuid_str); + ei_x_encode_atom(rbuf, "not_ready"); + } + switch_mutex_unlock(p->mutex); + } } else { - status = is_filled; - } - - put_reply_unlock(p, uuid_str); - - /* Relay the status back to the fetch responder. */ - if (status == is_waiting) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str); - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "ok"); - _ei_x_encode_string(rbuf, uuid_str); - /* Return here to avoid freeing the reply. */ - return SWITCH_STATUS_SUCCESS; - } else if (status == is_timeout) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", uuid_str); - ei_x_encode_tuple_header(rbuf, 3); - ei_x_encode_atom(rbuf, "error"); - _ei_x_encode_string(rbuf, uuid_str); - ei_x_encode_atom(rbuf, "timeout"); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", uuid_str); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not lock mutex for reply %s\n", uuid_str); ei_x_encode_tuple_header(rbuf, 3); ei_x_encode_atom(rbuf, "error"); _ei_x_encode_string(rbuf, uuid_str); ei_x_encode_atom(rbuf, "duplicate_response"); } } else { - /* nothing in the hash */ - switch_mutex_unlock(globals.fetch_reply_mutex); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", uuid_str); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str); ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error"); ei_x_encode_atom(rbuf, "invalid_uuid"); } - - switch_safe_free(nbuf->buff); - switch_safe_free(nbuf); } + return SWITCH_STATUS_SUCCESS; } @@ -1052,7 +1058,7 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, if (se->spawn_reply->state == reply_waiting) { se->spawn_reply->pid = pid; - switch_thread_cond_broadcast(se->spawn_reply->ready_or_found); + switch_thread_cond_signal(se->spawn_reply->ready_or_found); ei_x_encode_atom(rbuf, "ok"); switch_thread_rwlock_unlock(listener->session_rwlock); switch_mutex_unlock(se->spawn_reply->mutex); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 8565e1afa7..e375515f4f 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -366,13 +366,21 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c ei_x_buff buf; ei_x_new_with_version(&buf); + switch_uuid_get(&uuid); + switch_uuid_format(uuid_str, &uuid); + + ei_x_encode_tuple_header(&buf, 7); + ei_x_encode_atom(&buf, "fetch"); + ei_x_encode_atom(&buf, sectionstr); + _ei_x_encode_string(&buf, tag_name ? tag_name : "undefined"); + _ei_x_encode_string(&buf, key_name ? key_name : "undefined"); + _ei_x_encode_string(&buf, key_value ? key_value : "undefined"); + _ei_x_encode_string(&buf, uuid_str); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n"); section = switch_xml_parse_section_string((char *) sectionstr); - switch_uuid_get(&uuid); - switch_uuid_format(uuid_str, &uuid); - for (ptr = bindings.head; ptr; ptr = ptr->next) { if (ptr->section != section) continue; @@ -384,13 +392,6 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node); - ei_x_encode_tuple_header(&buf, 7); - ei_x_encode_atom(&buf, "fetch"); - ei_x_encode_atom(&buf, sectionstr); - _ei_x_encode_string(&buf, tag_name ? tag_name : "undefined"); - _ei_x_encode_string(&buf, key_name ? key_name : "undefined"); - _ei_x_encode_string(&buf, key_value ? key_value : "undefined"); - _ei_x_encode_string(&buf, uuid_str); if (params) { ei_encode_switch_event_headers(&buf, params); } else { @@ -401,41 +402,42 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /* Create a new fetch object. */ p = malloc(sizeof(*p)); switch_thread_cond_create(&p->ready_or_found, module_pool); - p->usecount = 1; + switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool); p->state = reply_not_ready; p->reply = NULL; switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex); + p->state = reply_waiting; now = switch_micro_time_now(); } /* We don't need to lock here because everybody is waiting on our condition before the action starts. */ - p->usecount ++; switch_mutex_lock(ptr->listener->sock_mutex); ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf); switch_mutex_unlock(ptr->listener->sock_mutex); } + ei_x_free(&buf); + if (!p) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr); goto cleanup; } /* Tell the threads to be ready, and wait five seconds for a reply. */ - switch_mutex_lock(globals.fetch_reply_mutex); - p->state = reply_waiting; - switch_thread_cond_broadcast(p->ready_or_found); + switch_mutex_lock(p->mutex); + //p->state = reply_waiting; switch_thread_cond_timedwait(p->ready_or_found, - globals.fetch_reply_mutex, 5000000); + p->mutex, 5000000); if (!p->reply) { p->state = reply_timeout; - switch_mutex_unlock(globals.fetch_reply_mutex); + switch_mutex_unlock(p->mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out after %d milliseconds when waiting for XML fetch response for %s\n", (int) (switch_micro_time_now() - now) / 1000, uuid_str); goto cleanup; } rep = p->reply; - switch_mutex_unlock(globals.fetch_reply_mutex); + switch_mutex_unlock(p->mutex); ei_get_type(rep->buff, &rep->index, &type, &size); @@ -450,7 +452,6 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c goto cleanup; } - if (!(xmlstr = malloc(size + 1))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n"); goto cleanup; @@ -471,29 +472,20 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /* cleanup */ cleanup: if (p) { - switch_mutex_lock(globals.fetch_reply_mutex); - put_reply_unlock(p, uuid_str); + /* lock so nothing can have it while we delete it */ + switch_mutex_lock(p->mutex); + switch_core_hash_delete_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex); + switch_mutex_unlock(p->mutex); + switch_mutex_destroy(p->mutex); + switch_thread_cond_destroy(p->ready_or_found); + switch_safe_free(p->reply); + switch_safe_free(p); } return xml; } -void put_reply_unlock(fetch_reply_t *p, char *uuid_str) -{ - if (-- p->usecount == 0) { - switch_core_hash_delete(globals.fetch_reply_hash, uuid_str); - switch_thread_cond_destroy(p->ready_or_found); - if (p->reply) { - switch_safe_free(p->reply->buff); - switch_safe_free(p->reply); - } - switch_safe_free(p); - } - switch_mutex_unlock(globals.fetch_reply_mutex); -} - - static switch_status_t notify_new_session(listener_t *listener, session_elem_t *session_element) { int result; diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 248e66b056..eb36612ce6 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -60,7 +60,7 @@ enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout }; struct fetch_reply_struct { switch_thread_cond_t *ready_or_found; - int usecount; + switch_mutex_t *mutex; enum reply_state state; ei_x_buff *reply; char winner[MAXNODELEN + 1];