diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index 28441811e0..78c9068d7f 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -79,6 +79,7 @@ struct listener { int lost_events; int lost_logs; time_t last_flush; + time_t expire_time; uint32_t timeout; uint32_t id; switch_sockaddr_t *sa; @@ -190,11 +191,23 @@ static void flush_listener(listener_t *listener, switch_bool_t flush_log, switch } } -static void expire_listener(listener_t **listener) +static switch_status_t expire_listener(listener_t **listener) { + + if (!(*listener)->expire_time) { + (*listener)->expire_time = switch_epoch_time_now(NULL); + return SWITCH_STATUS_FALSE; + } + + if (switch_thread_rwlock_trywrlock((*listener)->rwlock) != SWITCH_STATUS_SUCCESS) { + return SWITCH_STATUS_FALSE; + } + if (globals.debug > 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG((*listener)->session), SWITCH_LOG_DEBUG, "Stateful Listener %u has expired\n", (*listener)->id); + } + flush_listener(*listener, SWITCH_TRUE, SWITCH_TRUE); - switch_thread_rwlock_unlock((*listener)->rwlock); switch_core_hash_destroy(&(*listener)->event_hash); switch_core_destroy_memory_pool(&(*listener)->pool); switch_mutex_lock((*listener)->filter_mutex); @@ -202,13 +215,15 @@ static void expire_listener(listener_t **listener) switch_event_destroy(&(*listener)->filters); } switch_mutex_unlock((*listener)->filter_mutex); + switch_thread_rwlock_unlock((*listener)->rwlock); *listener = NULL; + return SWITCH_STATUS_SUCCESS; } static void event_handler(switch_event_t *event) { switch_event_t *clone = NULL; - listener_t *l, *lp; + listener_t *l, *lp, *last = NULL; switch_assert(event != NULL); @@ -221,21 +236,26 @@ static void event_handler(switch_event_t *event) switch_mutex_lock(globals.listener_mutex); while(lp) { int send = 0; - + time_t now = switch_epoch_time_now(NULL); + l = lp; lp = lp->next; if (!switch_test_flag(l, LFLAG_EVENTS)) { + last = lp; continue; } - if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && switch_epoch_time_now(NULL) - l->last_flush > l->timeout) { - if (globals.debug > 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_WARNING, "Stateful Listener %u has expired\n", l->id); + if (switch_test_flag(l, LFLAG_STATEFUL) && l->timeout && now - l->last_flush > l->timeout) { + if (expire_listener(&l) == SWITCH_STATUS_SUCCESS) { + if (last) { + last->next = lp; + } else { + listen_list.listeners = lp; + } + + continue; } - remove_listener(l); - expire_listener(&l); - continue; } @@ -327,6 +347,7 @@ static void event_handler(switch_event_t *event) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_ERROR, "Memory Error!\n"); } } + last = lp; } switch_mutex_unlock(globals.listener_mutex); } @@ -540,7 +561,7 @@ static listener_t *find_listener(uint32_t id) switch_mutex_lock(globals.listener_mutex); for (l = listen_list.listeners; l; l = l->next) { - if (l->id && l->id == id) { + if (l->id && l->id == id && !l->expire_time) { if (switch_thread_rwlock_tryrdlock(l->rwlock) == SWITCH_STATUS_SUCCESS) { r = l; } @@ -823,11 +844,10 @@ SWITCH_STANDARD_API(event_sink_function) if (globals.debug > 0) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Destroying event-sink listener [%u]\n", idl); } - remove_listener(listener); stream->write_function(stream, "\n listener %u destroyed\n", listener->id); xmlize_listener(listener, stream); stream->write_function(stream, "\n"); - expire_listener(&listener); + listener->expire_time = switch_epoch_time_now(NULL); goto end; } else { if (globals.debug > 0) {