From 0ec9ffe6490a2cda6d2d7020a502db4cdca2f9e0 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Wed, 4 Aug 2010 10:44:40 -0400 Subject: [PATCH] Tony said I should use a rwlock for this, so now I do --- .../mod_erlang_event/mod_erlang_event.c | 46 +++++++++++-------- .../mod_erlang_event/mod_erlang_event.h | 2 +- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 48068bd1dd..22c78887d1 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -148,7 +148,11 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t return; } - if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) { + switch_thread_rwlock_rdlock(listener->session_rwlock); + s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid); + switch_thread_rwlock_unlock(listener->session_rwlock); + + if (s) { int send = 0; if (s->event_list[SWITCH_EVENT_ALL]) { send = 1; @@ -315,7 +319,9 @@ static listener_t *find_listener(char *nodename) static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element) { - switch_core_hash_insert_locked(listener->sessions, session_element->uuid_str, (void*) session_element, listener->session_mutex); + switch_thread_rwlock_wrlock(listener->session_rwlock); + switch_core_hash_insert(listener->sessions, session_element->uuid_str, (void*) session_element); + switch_thread_rwlock_unlock(listener->session_rwlock); } @@ -340,9 +346,9 @@ static void destroy_session_elem(session_elem_t *session_element) static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element) { - switch_mutex_lock(listener->session_mutex); + switch_thread_rwlock_wrlock(listener->session_rwlock); remove_session_elem_from_listener(listener, session_element); - switch_mutex_unlock(listener->session_mutex); + switch_thread_rwlock_unlock(listener->session_rwlock); } @@ -353,16 +359,16 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) void *val = NULL; session_elem_t *session = NULL; - switch_mutex_lock(listener->session_mutex); + switch_thread_rwlock_rdlock(listener->session_rwlock); for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { switch_hash_this(iter, &key, NULL, &val); session = (session_elem_t*)val; if (session->process.type == ERLANG_PID && !ei_compare_pids(pid, &session->process.pid)) { - switch_mutex_unlock(listener->session_mutex); + switch_thread_rwlock_unlock(listener->session_rwlock); return session; } } - switch_mutex_unlock(listener->session_mutex); + switch_thread_rwlock_unlock(listener->session_rwlock); return NULL; } @@ -579,7 +585,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) if they have pending events in their queues then send them if the session has finished then clean it up */ - switch_mutex_lock(listener->session_mutex); + switch_thread_rwlock_rdlock(listener->session_rwlock); for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { switch_hash_this(iter, &key, NULL, &value); sp = (session_elem_t*)value; @@ -659,6 +665,9 @@ static switch_status_t check_attached_sessions(listener_t *listener) switch_event_destroy(&pevent); } } + switch_thread_rwlock_unlock(listener->session_rwlock); + /* release the read lock and get a write lock */ + switch_thread_rwlock_wrlock(listener->session_rwlock); /* do the deferred remove */ for (header = event->headers; header; header = header->next) { if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) { @@ -667,7 +676,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) } } - switch_mutex_unlock(listener->session_mutex); + switch_thread_rwlock_unlock(listener->session_rwlock); if (prefs.done) { return SWITCH_STATUS_FALSE; /* we're shutting down */ } else { @@ -996,14 +1005,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) remove_binding(listener, NULL); /* clean up all the attached sessions */ - switch_mutex_lock(listener->session_mutex); // TODO is iterating thread safe? - /* TODO destroy memory pools since they're not children of the listener's pool */ + switch_thread_rwlock_wrlock(listener->session_rwlock); for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { switch_hash_this(iter, &key, NULL, &value); s = (session_elem_t*)value; destroy_session_elem(s); } - switch_mutex_unlock(listener->session_mutex); + switch_thread_rwlock_unlock(listener->session_rwlock); if (listener->pool) { switch_memory_pool_t *pool = listener->pool; @@ -1134,7 +1142,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) listener->level = SWITCH_LOG_DEBUG; switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); - switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_thread_rwlock_create(&listener->session_rwlock, listener->pool); switch_core_hash_init(&listener->event_hash, listener->pool); switch_core_hash_init(&listener->sessions, listener->pool); @@ -1349,12 +1357,11 @@ int count_listener_sessions(listener_t *listener) int count = 0; switch_hash_index_t *iter; - switch_mutex_lock(listener->session_mutex); + switch_thread_rwlock_rdlock(listener->session_rwlock); for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { count++; } - - switch_mutex_unlock(listener->session_mutex); + switch_thread_rwlock_unlock(listener->session_rwlock); return count; } @@ -1559,7 +1566,7 @@ SWITCH_STANDARD_API(erlang_cmd) void *value; found = 1; - switch_mutex_lock(l->session_mutex); + switch_thread_rwlock_rdlock(l->session_rwlock); for (iter = switch_hash_first(NULL, l->sessions); iter; iter = switch_hash_next(iter)) { empty = 0; switch_hash_this(iter, &key, NULL, &value); @@ -1571,7 +1578,7 @@ SWITCH_STANDARD_API(erlang_cmd) if (empty) { stream->write_function(stream, "No active sessions for %s\n", argv[1]); } - switch_mutex_unlock(l->session_mutex); + switch_thread_rwlock_unlock(l->session_rwlock); break; } } @@ -1581,7 +1588,8 @@ SWITCH_STANDARD_API(erlang_cmd) stream->write_function(stream, "Could not find a listener for %s\n", argv[1]); } else { - stream->write_function(stream, "USAGE: erlang sessions \n"); + stream->write_function(stream, "USAGE: erlang sessions \n" + " erlang listeners\n"); goto done; } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 65f8a89e49..a900f02dce 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -107,7 +107,7 @@ struct listener { switch_hash_t *event_hash; switch_hash_t *spawn_pid_hash; switch_thread_rwlock_t *rwlock; - switch_mutex_t *session_mutex; + switch_thread_rwlock_t *session_rwlock; //session_elem_t *session_list; switch_hash_t *sessions; int lost_events;