FS-2775 Rewrite XML fetch conditional wait to be more sane (Reported by James Aimonetti)
This commit is contained in:
parent
04e57577b3
commit
6941c6eb71
|
@ -173,68 +173,74 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *
|
||||||
ei_x_encode_atom(rbuf, "error");
|
ei_x_encode_atom(rbuf, "error");
|
||||||
ei_x_encode_atom(rbuf, "badarg");
|
ei_x_encode_atom(rbuf, "badarg");
|
||||||
} else {
|
} else {
|
||||||
ei_x_buff *nbuf = malloc(sizeof(nbuf));
|
/* TODO - maybe use a rwlock instead */
|
||||||
nbuf->buff = malloc(buf->buffsz);
|
if ((p = switch_core_hash_find_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex))) {
|
||||||
memcpy(nbuf->buff, buf->buff, buf->buffsz);
|
/* try to lock the mutex, so no other responder can */
|
||||||
nbuf->index = buf->index;
|
if (switch_mutex_trylock(p->mutex) == SWITCH_STATUS_SUCCESS) {
|
||||||
nbuf->buffsz = buf->buffsz;
|
if (p->state == reply_waiting) {
|
||||||
|
/* alright, we've got the lock and we're the first to reply */
|
||||||
|
|
||||||
switch_mutex_lock(globals.fetch_reply_mutex);
|
/* clone the reply so it doesn't get destroyed on us */
|
||||||
if ((p = switch_core_hash_find(globals.fetch_reply_hash, uuid_str))) {
|
ei_x_buff *nbuf = malloc(sizeof(nbuf));
|
||||||
/* Get the status and release the lock ASAP. */
|
nbuf->buff = malloc(buf->buffsz);
|
||||||
enum { is_timeout, is_waiting, is_filled } status;
|
memcpy(nbuf->buff, buf->buff, buf->buffsz);
|
||||||
if (p->state == reply_not_ready) {
|
nbuf->index = buf->index;
|
||||||
switch_thread_cond_wait(p->ready_or_found, globals.fetch_reply_mutex);
|
nbuf->buffsz = buf->buffsz;
|
||||||
}
|
|
||||||
|
|
||||||
if (p->state == reply_waiting) {
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str);
|
||||||
/* update the key with a reply */
|
|
||||||
status = is_waiting;
|
/* copy info into the reply struct */
|
||||||
p->reply = nbuf;
|
p->state = reply_found;
|
||||||
p->state = reply_found;
|
p->reply = nbuf;
|
||||||
strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
|
strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
|
||||||
switch_thread_cond_broadcast(p->ready_or_found);
|
|
||||||
} else if (p->state == reply_timeout) {
|
/* signal waiting thread that its time to wake up */
|
||||||
status = is_timeout;
|
switch_thread_cond_signal(p->ready_or_found);
|
||||||
|
|
||||||
|
/* reply OK */
|
||||||
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
|
_ei_x_encode_string(rbuf, uuid_str);
|
||||||
|
|
||||||
|
/* unlock */
|
||||||
|
switch_mutex_unlock(p->mutex);
|
||||||
|
} else {
|
||||||
|
if (p->state == reply_found) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str);
|
||||||
|
ei_x_encode_tuple_header(rbuf, 3);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
_ei_x_encode_string(rbuf, uuid_str);
|
||||||
|
ei_x_encode_atom(rbuf, "duplicate_response");
|
||||||
|
} else if (p->state == reply_timeout) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str);
|
||||||
|
ei_x_encode_tuple_header(rbuf, 3);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
_ei_x_encode_string(rbuf, uuid_str);
|
||||||
|
ei_x_encode_atom(rbuf, "timeout");
|
||||||
|
} else if (p->state == reply_not_ready) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str);
|
||||||
|
ei_x_encode_tuple_header(rbuf, 3);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
_ei_x_encode_string(rbuf, uuid_str);
|
||||||
|
ei_x_encode_atom(rbuf, "not_ready");
|
||||||
|
}
|
||||||
|
switch_mutex_unlock(p->mutex);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
status = is_filled;
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not lock mutex for reply %s\n", uuid_str);
|
||||||
}
|
|
||||||
|
|
||||||
put_reply_unlock(p, uuid_str);
|
|
||||||
|
|
||||||
/* Relay the status back to the fetch responder. */
|
|
||||||
if (status == is_waiting) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str);
|
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
|
||||||
_ei_x_encode_string(rbuf, uuid_str);
|
|
||||||
/* Return here to avoid freeing the reply. */
|
|
||||||
return SWITCH_STATUS_SUCCESS;
|
|
||||||
} else if (status == is_timeout) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", uuid_str);
|
|
||||||
ei_x_encode_tuple_header(rbuf, 3);
|
|
||||||
ei_x_encode_atom(rbuf, "error");
|
|
||||||
_ei_x_encode_string(rbuf, uuid_str);
|
|
||||||
ei_x_encode_atom(rbuf, "timeout");
|
|
||||||
} else {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", uuid_str);
|
|
||||||
ei_x_encode_tuple_header(rbuf, 3);
|
ei_x_encode_tuple_header(rbuf, 3);
|
||||||
ei_x_encode_atom(rbuf, "error");
|
ei_x_encode_atom(rbuf, "error");
|
||||||
_ei_x_encode_string(rbuf, uuid_str);
|
_ei_x_encode_string(rbuf, uuid_str);
|
||||||
ei_x_encode_atom(rbuf, "duplicate_response");
|
ei_x_encode_atom(rbuf, "duplicate_response");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* nothing in the hash */
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str);
|
||||||
switch_mutex_unlock(globals.fetch_reply_mutex);
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", uuid_str);
|
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
ei_x_encode_atom(rbuf, "error");
|
ei_x_encode_atom(rbuf, "error");
|
||||||
ei_x_encode_atom(rbuf, "invalid_uuid");
|
ei_x_encode_atom(rbuf, "invalid_uuid");
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_safe_free(nbuf->buff);
|
|
||||||
switch_safe_free(nbuf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1052,7 +1058,7 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg,
|
||||||
|
|
||||||
if (se->spawn_reply->state == reply_waiting) {
|
if (se->spawn_reply->state == reply_waiting) {
|
||||||
se->spawn_reply->pid = pid;
|
se->spawn_reply->pid = pid;
|
||||||
switch_thread_cond_broadcast(se->spawn_reply->ready_or_found);
|
switch_thread_cond_signal(se->spawn_reply->ready_or_found);
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
switch_thread_rwlock_unlock(listener->session_rwlock);
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
switch_mutex_unlock(se->spawn_reply->mutex);
|
switch_mutex_unlock(se->spawn_reply->mutex);
|
||||||
|
|
|
@ -366,13 +366,21 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
||||||
ei_x_buff buf;
|
ei_x_buff buf;
|
||||||
ei_x_new_with_version(&buf);
|
ei_x_new_with_version(&buf);
|
||||||
|
|
||||||
|
switch_uuid_get(&uuid);
|
||||||
|
switch_uuid_format(uuid_str, &uuid);
|
||||||
|
|
||||||
|
ei_x_encode_tuple_header(&buf, 7);
|
||||||
|
ei_x_encode_atom(&buf, "fetch");
|
||||||
|
ei_x_encode_atom(&buf, sectionstr);
|
||||||
|
_ei_x_encode_string(&buf, tag_name ? tag_name : "undefined");
|
||||||
|
_ei_x_encode_string(&buf, key_name ? key_name : "undefined");
|
||||||
|
_ei_x_encode_string(&buf, key_value ? key_value : "undefined");
|
||||||
|
_ei_x_encode_string(&buf, uuid_str);
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n");
|
||||||
|
|
||||||
section = switch_xml_parse_section_string((char *) sectionstr);
|
section = switch_xml_parse_section_string((char *) sectionstr);
|
||||||
|
|
||||||
switch_uuid_get(&uuid);
|
|
||||||
switch_uuid_format(uuid_str, &uuid);
|
|
||||||
|
|
||||||
for (ptr = bindings.head; ptr; ptr = ptr->next) {
|
for (ptr = bindings.head; ptr; ptr = ptr->next) {
|
||||||
if (ptr->section != section)
|
if (ptr->section != section)
|
||||||
continue;
|
continue;
|
||||||
|
@ -384,13 +392,6 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node);
|
||||||
|
|
||||||
ei_x_encode_tuple_header(&buf, 7);
|
|
||||||
ei_x_encode_atom(&buf, "fetch");
|
|
||||||
ei_x_encode_atom(&buf, sectionstr);
|
|
||||||
_ei_x_encode_string(&buf, tag_name ? tag_name : "undefined");
|
|
||||||
_ei_x_encode_string(&buf, key_name ? key_name : "undefined");
|
|
||||||
_ei_x_encode_string(&buf, key_value ? key_value : "undefined");
|
|
||||||
_ei_x_encode_string(&buf, uuid_str);
|
|
||||||
if (params) {
|
if (params) {
|
||||||
ei_encode_switch_event_headers(&buf, params);
|
ei_encode_switch_event_headers(&buf, params);
|
||||||
} else {
|
} else {
|
||||||
|
@ -401,41 +402,42 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
||||||
/* Create a new fetch object. */
|
/* Create a new fetch object. */
|
||||||
p = malloc(sizeof(*p));
|
p = malloc(sizeof(*p));
|
||||||
switch_thread_cond_create(&p->ready_or_found, module_pool);
|
switch_thread_cond_create(&p->ready_or_found, module_pool);
|
||||||
p->usecount = 1;
|
switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool);
|
||||||
p->state = reply_not_ready;
|
p->state = reply_not_ready;
|
||||||
p->reply = NULL;
|
p->reply = NULL;
|
||||||
switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex);
|
switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex);
|
||||||
|
p->state = reply_waiting;
|
||||||
now = switch_micro_time_now();
|
now = switch_micro_time_now();
|
||||||
}
|
}
|
||||||
/* We don't need to lock here because everybody is waiting
|
/* We don't need to lock here because everybody is waiting
|
||||||
on our condition before the action starts. */
|
on our condition before the action starts. */
|
||||||
p->usecount ++;
|
|
||||||
|
|
||||||
switch_mutex_lock(ptr->listener->sock_mutex);
|
switch_mutex_lock(ptr->listener->sock_mutex);
|
||||||
ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf);
|
ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf);
|
||||||
switch_mutex_unlock(ptr->listener->sock_mutex);
|
switch_mutex_unlock(ptr->listener->sock_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ei_x_free(&buf);
|
||||||
|
|
||||||
if (!p) {
|
if (!p) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tell the threads to be ready, and wait five seconds for a reply. */
|
/* Tell the threads to be ready, and wait five seconds for a reply. */
|
||||||
switch_mutex_lock(globals.fetch_reply_mutex);
|
switch_mutex_lock(p->mutex);
|
||||||
p->state = reply_waiting;
|
//p->state = reply_waiting;
|
||||||
switch_thread_cond_broadcast(p->ready_or_found);
|
|
||||||
switch_thread_cond_timedwait(p->ready_or_found,
|
switch_thread_cond_timedwait(p->ready_or_found,
|
||||||
globals.fetch_reply_mutex, 5000000);
|
p->mutex, 5000000);
|
||||||
if (!p->reply) {
|
if (!p->reply) {
|
||||||
p->state = reply_timeout;
|
p->state = reply_timeout;
|
||||||
switch_mutex_unlock(globals.fetch_reply_mutex);
|
switch_mutex_unlock(p->mutex);
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out after %d milliseconds when waiting for XML fetch response for %s\n", (int) (switch_micro_time_now() - now) / 1000, uuid_str);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out after %d milliseconds when waiting for XML fetch response for %s\n", (int) (switch_micro_time_now() - now) / 1000, uuid_str);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
rep = p->reply;
|
rep = p->reply;
|
||||||
switch_mutex_unlock(globals.fetch_reply_mutex);
|
switch_mutex_unlock(p->mutex);
|
||||||
|
|
||||||
ei_get_type(rep->buff, &rep->index, &type, &size);
|
ei_get_type(rep->buff, &rep->index, &type, &size);
|
||||||
|
|
||||||
|
@ -450,7 +452,6 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!(xmlstr = malloc(size + 1))) {
|
if (!(xmlstr = malloc(size + 1))) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
|
@ -471,29 +472,20 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
|
||||||
/* cleanup */
|
/* cleanup */
|
||||||
cleanup:
|
cleanup:
|
||||||
if (p) {
|
if (p) {
|
||||||
switch_mutex_lock(globals.fetch_reply_mutex);
|
/* lock so nothing can have it while we delete it */
|
||||||
put_reply_unlock(p, uuid_str);
|
switch_mutex_lock(p->mutex);
|
||||||
|
switch_core_hash_delete_locked(globals.fetch_reply_hash, uuid_str, globals.fetch_reply_mutex);
|
||||||
|
switch_mutex_unlock(p->mutex);
|
||||||
|
switch_mutex_destroy(p->mutex);
|
||||||
|
switch_thread_cond_destroy(p->ready_or_found);
|
||||||
|
switch_safe_free(p->reply);
|
||||||
|
switch_safe_free(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
return xml;
|
return xml;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void put_reply_unlock(fetch_reply_t *p, char *uuid_str)
|
|
||||||
{
|
|
||||||
if (-- p->usecount == 0) {
|
|
||||||
switch_core_hash_delete(globals.fetch_reply_hash, uuid_str);
|
|
||||||
switch_thread_cond_destroy(p->ready_or_found);
|
|
||||||
if (p->reply) {
|
|
||||||
switch_safe_free(p->reply->buff);
|
|
||||||
switch_safe_free(p->reply);
|
|
||||||
}
|
|
||||||
switch_safe_free(p);
|
|
||||||
}
|
|
||||||
switch_mutex_unlock(globals.fetch_reply_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static switch_status_t notify_new_session(listener_t *listener, session_elem_t *session_element)
|
static switch_status_t notify_new_session(listener_t *listener, session_elem_t *session_element)
|
||||||
{
|
{
|
||||||
int result;
|
int result;
|
||||||
|
|
|
@ -60,7 +60,7 @@ enum reply_state { reply_not_ready, reply_waiting, reply_found, reply_timeout };
|
||||||
struct fetch_reply_struct
|
struct fetch_reply_struct
|
||||||
{
|
{
|
||||||
switch_thread_cond_t *ready_or_found;
|
switch_thread_cond_t *ready_or_found;
|
||||||
int usecount;
|
switch_mutex_t *mutex;
|
||||||
enum reply_state state;
|
enum reply_state state;
|
||||||
ei_x_buff *reply;
|
ei_x_buff *reply;
|
||||||
char winner[MAXNODELEN + 1];
|
char winner[MAXNODELEN + 1];
|
||||||
|
|
Loading…
Reference in New Issue