Tony said I should use a rwlock for this, so now I do
This commit is contained in:
parent
7b5803f71f
commit
0ec9ffe649
|
@ -148,7 +148,11 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid))) {
|
switch_thread_rwlock_rdlock(listener->session_rwlock);
|
||||||
|
s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid);
|
||||||
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
|
|
||||||
|
if (s) {
|
||||||
int send = 0;
|
int send = 0;
|
||||||
if (s->event_list[SWITCH_EVENT_ALL]) {
|
if (s->event_list[SWITCH_EVENT_ALL]) {
|
||||||
send = 1;
|
send = 1;
|
||||||
|
@ -315,7 +319,9 @@ static listener_t *find_listener(char *nodename)
|
||||||
|
|
||||||
static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element)
|
static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element)
|
||||||
{
|
{
|
||||||
switch_core_hash_insert_locked(listener->sessions, session_element->uuid_str, (void*) session_element, listener->session_mutex);
|
switch_thread_rwlock_wrlock(listener->session_rwlock);
|
||||||
|
switch_core_hash_insert(listener->sessions, session_element->uuid_str, (void*) session_element);
|
||||||
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -340,9 +346,9 @@ static void destroy_session_elem(session_elem_t *session_element)
|
||||||
|
|
||||||
static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
|
static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element)
|
||||||
{
|
{
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_thread_rwlock_wrlock(listener->session_rwlock);
|
||||||
remove_session_elem_from_listener(listener, session_element);
|
remove_session_elem_from_listener(listener, session_element);
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -353,16 +359,16 @@ session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid)
|
||||||
void *val = NULL;
|
void *val = NULL;
|
||||||
session_elem_t *session = NULL;
|
session_elem_t *session = NULL;
|
||||||
|
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_thread_rwlock_rdlock(listener->session_rwlock);
|
||||||
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
switch_hash_this(iter, &key, NULL, &val);
|
switch_hash_this(iter, &key, NULL, &val);
|
||||||
session = (session_elem_t*)val;
|
session = (session_elem_t*)val;
|
||||||
if (session->process.type == ERLANG_PID && !ei_compare_pids(pid, &session->process.pid)) {
|
if (session->process.type == ERLANG_PID && !ei_compare_pids(pid, &session->process.pid)) {
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
return session;
|
return session;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -579,7 +585,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
|
||||||
if they have pending events in their queues then send them
|
if they have pending events in their queues then send them
|
||||||
if the session has finished then clean it up
|
if the session has finished then clean it up
|
||||||
*/
|
*/
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_thread_rwlock_rdlock(listener->session_rwlock);
|
||||||
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
switch_hash_this(iter, &key, NULL, &value);
|
switch_hash_this(iter, &key, NULL, &value);
|
||||||
sp = (session_elem_t*)value;
|
sp = (session_elem_t*)value;
|
||||||
|
@ -659,6 +665,9 @@ static switch_status_t check_attached_sessions(listener_t *listener)
|
||||||
switch_event_destroy(&pevent);
|
switch_event_destroy(&pevent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
|
/* release the read lock and get a write lock */
|
||||||
|
switch_thread_rwlock_wrlock(listener->session_rwlock);
|
||||||
/* do the deferred remove */
|
/* do the deferred remove */
|
||||||
for (header = event->headers; header; header = header->next) {
|
for (header = event->headers; header; header = header->next) {
|
||||||
if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) {
|
if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) {
|
||||||
|
@ -667,7 +676,7 @@ static switch_status_t check_attached_sessions(listener_t *listener)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
if (prefs.done) {
|
if (prefs.done) {
|
||||||
return SWITCH_STATUS_FALSE; /* we're shutting down */
|
return SWITCH_STATUS_FALSE; /* we're shutting down */
|
||||||
} else {
|
} else {
|
||||||
|
@ -996,14 +1005,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
|
||||||
remove_binding(listener, NULL);
|
remove_binding(listener, NULL);
|
||||||
|
|
||||||
/* clean up all the attached sessions */
|
/* clean up all the attached sessions */
|
||||||
switch_mutex_lock(listener->session_mutex); // TODO is iterating thread safe?
|
switch_thread_rwlock_wrlock(listener->session_rwlock);
|
||||||
/* TODO destroy memory pools since they're not children of the listener's pool */
|
|
||||||
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
switch_hash_this(iter, &key, NULL, &value);
|
switch_hash_this(iter, &key, NULL, &value);
|
||||||
s = (session_elem_t*)value;
|
s = (session_elem_t*)value;
|
||||||
destroy_session_elem(s);
|
destroy_session_elem(s);
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
|
|
||||||
if (listener->pool) {
|
if (listener->pool) {
|
||||||
switch_memory_pool_t *pool = listener->pool;
|
switch_memory_pool_t *pool = listener->pool;
|
||||||
|
@ -1134,7 +1142,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
|
||||||
listener->level = SWITCH_LOG_DEBUG;
|
listener->level = SWITCH_LOG_DEBUG;
|
||||||
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
||||||
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
||||||
switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_thread_rwlock_create(&listener->session_rwlock, listener->pool);
|
||||||
switch_core_hash_init(&listener->event_hash, listener->pool);
|
switch_core_hash_init(&listener->event_hash, listener->pool);
|
||||||
switch_core_hash_init(&listener->sessions, listener->pool);
|
switch_core_hash_init(&listener->sessions, listener->pool);
|
||||||
|
|
||||||
|
@ -1349,12 +1357,11 @@ int count_listener_sessions(listener_t *listener)
|
||||||
int count = 0;
|
int count = 0;
|
||||||
switch_hash_index_t *iter;
|
switch_hash_index_t *iter;
|
||||||
|
|
||||||
switch_mutex_lock(listener->session_mutex);
|
switch_thread_rwlock_rdlock(listener->session_rwlock);
|
||||||
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
switch_thread_rwlock_unlock(listener->session_rwlock);
|
||||||
switch_mutex_unlock(listener->session_mutex);
|
|
||||||
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
@ -1559,7 +1566,7 @@ SWITCH_STANDARD_API(erlang_cmd)
|
||||||
void *value;
|
void *value;
|
||||||
|
|
||||||
found = 1;
|
found = 1;
|
||||||
switch_mutex_lock(l->session_mutex);
|
switch_thread_rwlock_rdlock(l->session_rwlock);
|
||||||
for (iter = switch_hash_first(NULL, l->sessions); iter; iter = switch_hash_next(iter)) {
|
for (iter = switch_hash_first(NULL, l->sessions); iter; iter = switch_hash_next(iter)) {
|
||||||
empty = 0;
|
empty = 0;
|
||||||
switch_hash_this(iter, &key, NULL, &value);
|
switch_hash_this(iter, &key, NULL, &value);
|
||||||
|
@ -1571,7 +1578,7 @@ SWITCH_STANDARD_API(erlang_cmd)
|
||||||
if (empty) {
|
if (empty) {
|
||||||
stream->write_function(stream, "No active sessions for %s\n", argv[1]);
|
stream->write_function(stream, "No active sessions for %s\n", argv[1]);
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(l->session_mutex);
|
switch_thread_rwlock_unlock(l->session_rwlock);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1581,7 +1588,8 @@ SWITCH_STANDARD_API(erlang_cmd)
|
||||||
stream->write_function(stream, "Could not find a listener for %s\n", argv[1]);
|
stream->write_function(stream, "Could not find a listener for %s\n", argv[1]);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
stream->write_function(stream, "USAGE: erlang sessions <nodename>\n");
|
stream->write_function(stream, "USAGE: erlang sessions <nodename>\n"
|
||||||
|
" erlang listeners\n");
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ struct listener {
|
||||||
switch_hash_t *event_hash;
|
switch_hash_t *event_hash;
|
||||||
switch_hash_t *spawn_pid_hash;
|
switch_hash_t *spawn_pid_hash;
|
||||||
switch_thread_rwlock_t *rwlock;
|
switch_thread_rwlock_t *rwlock;
|
||||||
switch_mutex_t *session_mutex;
|
switch_thread_rwlock_t *session_rwlock;
|
||||||
//session_elem_t *session_list;
|
//session_elem_t *session_list;
|
||||||
switch_hash_t *sessions;
|
switch_hash_t *sessions;
|
||||||
int lost_events;
|
int lost_events;
|
||||||
|
|
Loading…
Reference in New Issue