From 121e57a1db243aee9c24f5ec3f16b55e5d30614a Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Sun, 22 Aug 2010 16:51:15 -0400 Subject: [PATCH] Switch from mutex to a rwlock to increase throughput --- .../mod_erlang_event/handle_msg.c | 4 +- .../mod_erlang_event/mod_erlang_event.c | 47 ++++++++++--------- .../mod_erlang_event/mod_erlang_event.h | 2 +- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 40d5455e7d..128c5fca52 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -646,7 +646,7 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg * msg, e binding->process.pid = msg->from; binding->listener = listener; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_wrlock(globals.listener_rwlock); for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next); @@ -657,7 +657,7 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg * msg, e } switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section); - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding)); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 5f96cf8b5c..1b77e2ca05 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -57,7 +57,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l { listener_t *l; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { @@ -80,7 +80,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l } } } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); return SWITCH_STATUS_SUCCESS; } @@ -107,7 +107,7 @@ static void remove_binding(listener_t *listener, erlang_pid * pid) { struct erlang_binding *ptr, *lst = NULL; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_wrlock(globals.listener_rwlock); switch_xml_set_binding_sections(bindings.search_binding, SWITCH_XML_SECTION_MAX); @@ -134,7 +134,7 @@ static void remove_binding(listener_t *listener, erlang_pid * pid) } } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); } @@ -194,7 +194,7 @@ static void event_handler(switch_event_t *event) lp = listen_list.listeners; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_rdlock(globals.listener_rwlock); while (lp) { uint8_t send = 0; @@ -250,7 +250,7 @@ static void event_handler(switch_event_t *event) } } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); } @@ -276,10 +276,10 @@ static void close_socket(int *sock) static void add_listener(listener_t *listener) { /* add me to the listeners so I get events */ - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_wrlock(globals.listener_rwlock); listener->next = listen_list.listeners; listen_list.listeners = listener; - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); } @@ -287,7 +287,7 @@ static void remove_listener(listener_t *listener) { listener_t *l, *last = NULL; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_wrlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (l == listener) { if (last) { @@ -298,7 +298,7 @@ static void remove_listener(listener_t *listener) } last = l; } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); } /* Search for a listener already talking to the specified node */ @@ -306,13 +306,13 @@ static listener_t *find_listener(char *nodename) { listener_t *l = NULL; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) { break; } } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); return l; } @@ -968,9 +968,10 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) void *value; switch_hash_index_t *iter; - switch_mutex_lock(globals.listener_mutex); + /* TODO - should we have a different mutex for this? */ + switch_thread_rwlock_wrlock(globals.listener_rwlock); prefs.threads++; - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); switch_assert(listener != NULL); @@ -1018,9 +1019,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) switch_core_destroy_memory_pool(&pool); } - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_wrlock(globals.listener_rwlock); prefs.threads--; - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); return NULL; } @@ -1537,7 +1538,7 @@ SWITCH_STANDARD_API(erlang_cmd) if (!strcasecmp(argv[0], "listeners")) { listener_t *l; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_rdlock(globals.listener_rwlock); if (listen_list.listeners) { for (l = listen_list.listeners; l; l = l->next) { @@ -1547,12 +1548,12 @@ SWITCH_STANDARD_API(erlang_cmd) stream->write_function(stream, "No active listeners\n"); } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); } else if (!strcasecmp(argv[0], "sessions") && argc == 2) { listener_t *l; int found = 0; - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (!strcasecmp(l->peer_nodename, argv[1])) { session_elem_t *sp; @@ -1578,7 +1579,7 @@ SWITCH_STANDARD_API(erlang_cmd) break; } } - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); if (!found) stream->write_function(stream, "Could not find a listener for %s\n", argv[1]); @@ -1604,7 +1605,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) memset(&prefs, 0, sizeof(prefs)); - switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); + switch_thread_rwlock_create(&globals.listener_rwlock, pool); switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool); switch_core_hash_init(&globals.fetch_reply_hash, pool); @@ -1853,7 +1854,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) switch_event_unbind(&globals.node); switch_xml_unbind_search_function_ptr(erlang_fetch); - switch_mutex_lock(globals.listener_mutex); + switch_thread_rwlock_wrlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { close_socket(&l->sockfd); @@ -1863,7 +1864,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) WSACleanup(); #endif - switch_mutex_unlock(globals.listener_mutex); + switch_thread_rwlock_unlock(globals.listener_rwlock); switch_sleep(1500000); /* sleep for 1.5 seconds */ diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 03b66d3b73..acea53f1d4 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -162,7 +162,7 @@ struct api_command_struct { }; struct globals_struct { - switch_mutex_t *listener_mutex; + switch_thread_rwlock_t *listener_rwlock; switch_event_node_t *node; switch_mutex_t *ref_mutex; switch_mutex_t *fetch_reply_mutex;