diff --git a/src/mod/timers/mod_posix_timer/mod_posix_timer.c b/src/mod/timers/mod_posix_timer/mod_posix_timer.c index 425e98c93d..04d694b5f7 100644 --- a/src/mod/timers/mod_posix_timer/mod_posix_timer.c +++ b/src/mod/timers/mod_posix_timer/mod_posix_timer.c @@ -1,5 +1,6 @@ /* * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2012, Anthony Minessale II * * Version: MPL 1.1 * @@ -28,91 +29,153 @@ * */ #include -#include -#include +#include /* timer_* */ +#include /* sigaction(), timer_*, etc. */ +#include /* pipe() */ +#include /* fcntl() */ +#include /* strerror() */ +#include /* uint8_t */ +#include /* errno */ +#include /* select() */ +#include /* pthread_sigmask() */ SWITCH_MODULE_LOAD_FUNCTION(mod_posix_timer_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_posix_timer_shutdown); -SWITCH_MODULE_DEFINITION(mod_posix_timer, mod_posix_timer_load, mod_posix_timer_shutdown, NULL); +SWITCH_MODULE_RUNTIME_FUNCTION(mod_posix_timer_runtime); +SWITCH_MODULE_DEFINITION(mod_posix_timer, mod_posix_timer_load, mod_posix_timer_shutdown, mod_posix_timer_runtime); +#define SIG SIGRTMAX #define MAX_INTERVAL 2000 /* ms */ #define TIMERS_PER_INTERVAL 4 - -typedef struct { - int users; - timer_t timer; - switch_size_t tick; - switch_mutex_t *mutex; - switch_thread_cond_t *cond; - int interval; - int id; -} interval_timer_t; - -static struct { - switch_memory_pool_t *pool; - int shutdown; - interval_timer_t interval_timers[MAX_INTERVAL + 1][TIMERS_PER_INTERVAL]; - int next_interval_timer_id[MAX_INTERVAL + 1]; - switch_mutex_t *interval_timers_mutex; -} globals; +#define MAX_ACTIVE_TIMERS 256 /* one byte */ /** - * Notified by POSIX timer of a tick + * Module's internal timer data. + * Keeps track of how many users are using the timer + * and the condvar to signal threads waiting on the timer. */ -static void posix_timer_notify(union sigval data) -{ - interval_timer_t *it = (interval_timer_t *)data.sival_ptr; - switch_mutex_lock(it->mutex); - if (it->users) { - it->tick += 1 + timer_getoverrun(it->timer); - switch_thread_cond_broadcast(it->cond); - } - switch_mutex_unlock(it->mutex); +typedef struct { + /** Number of users of this timer */ + int users; + /** The POSIX timer handle */ + timer_t timer; + /** Number of ticks */ + switch_size_t tick; + /** synchronizes access to condvar, users */ + switch_mutex_t *mutex; + /** condvar for threads waiting on timer */ + switch_thread_cond_t *cond; + /** The timer period in ms */ + int interval; + /** Which timer for this interval */ + int num; + /** The timer's index into the active_interval_timers array */ + int active_id; +} interval_timer_t; - if (globals.shutdown) { - switch_mutex_lock(globals.interval_timers_mutex); - if (it->users) { - timer_delete(it->timer); - memset(&it->timer, 0, sizeof(it->timer)); - it->users = 0; +/** + * Module global data + */ +static struct { + /** Module memory pool */ + switch_memory_pool_t *pool; + /** True if module is shutting down */ + int shutdown; + /** Maps intervals to timers */ + interval_timer_t interval_timers[MAX_INTERVAL + 1][TIMERS_PER_INTERVAL]; + /** Maps IDs to timers */ + interval_timer_t *active_interval_timers[MAX_ACTIVE_TIMERS]; + /** Next timer to assign for a particular interval */ + int next_interval_timer_num[MAX_INTERVAL + 1]; + /** Synchronizes access to timer creation / deletion */ + switch_mutex_t *interval_timers_mutex; + /** Synchronizes access to active timers array */ + switch_mutex_t *active_timers_mutex; + /** number of active timers */ + int active_timers_count; + /** self-pipe to notify thread of tick from a signal handler */ + int timer_tick_pipe[2]; +} globals; + + +/** + * Handle timer signal + * @param sig the signal + * @param si the signal information + * @param cu unused + */ +static void timer_signal_handler(int sig, siginfo_t *si, void *cu) +{ + if (sig == SIG && si->si_code == SI_TIMER) { + int val = si->si_value.sival_int; + if (val >= 0 && val <= MAX_ACTIVE_TIMERS) { + uint8_t active_id = (uint8_t)val; + /* notify runtime thread that timer identified by active_id has ticked */ + write(globals.timer_tick_pipe[1], &active_id, 1); } - switch_mutex_unlock(globals.interval_timers_mutex); } } /** - * Start a new timer + * Start a new interval timer + * @param it the timer + * @param interval the timer interval + * @return SWITCH_STATUS_SUCCESS if successful */ -static switch_status_t posix_timer_start_interval(interval_timer_t *it, int interval) +static switch_status_t interval_timer_start(interval_timer_t *it, int interval) { - struct sigevent sigev; - struct itimerspec val; - if (globals.shutdown) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "module is shutting down, ignoring request\n"); return SWITCH_STATUS_GENERR; } if (it->users <= 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "starting %d ms timer #%d\n", it->interval, it->id + 1); - /* reset */ + struct sigevent sigev; + struct itimerspec val; + int active_id = -1; + int i; + + /* find an available id for this timer */ + for (i = 0; i < MAX_ACTIVE_TIMERS && active_id == -1; i++) { + switch_mutex_lock(globals.active_timers_mutex); + if(globals.active_interval_timers[i] == NULL) { + active_id = i; + } + switch_mutex_unlock(globals.active_timers_mutex); + } + if (active_id == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "no more timers can be created!\n"); + return SWITCH_STATUS_GENERR; + } + it->active_id = active_id; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "starting %d ms timer #%d (%d)\n", it->interval, it->num + 1, it->active_id); + + /* reset timer data */ it->tick = 0; it->users = 0; - /* reuse, if possible */ + /* reuse mutex/condvar */ if (it->mutex == NULL) { switch_mutex_init(&it->mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_thread_cond_create(&it->cond, globals.pool); } - /* create the POSIX timer. Will notify the posix_timer_notify thread on ticks. */ + /* create the POSIX timer. Will send SIG on each tick. */ memset(&sigev, 0, sizeof(sigev)); - sigev.sigev_notify = SIGEV_THREAD; - sigev.sigev_notify_function = posix_timer_notify; - sigev.sigev_value.sival_ptr = (void *)it; + sigev.sigev_notify = SIGEV_SIGNAL; + sigev.sigev_signo = SIG; + sigev.sigev_value.sival_int = active_id; if (timer_create(CLOCK_MONOTONIC, &sigev, &it->timer) == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "failed to create timer: %s\n", strerror(errno)); return SWITCH_STATUS_GENERR; } + switch_mutex_lock(globals.active_timers_mutex); + globals.active_interval_timers[it->active_id] = it; + globals.active_timers_count++; + switch_mutex_unlock(globals.active_timers_mutex); + /* start the timer to tick at interval */ memset(&val, 0, sizeof(val)); val.it_interval.tv_sec = interval / 1000; @@ -120,6 +183,11 @@ static switch_status_t posix_timer_start_interval(interval_timer_t *it, int inte val.it_value.tv_sec = 0; val.it_value.tv_nsec = 100000; if (timer_settime(it->timer, 0, &val, NULL) == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "failed to start timer: %s\n", strerror(errno)); + switch_mutex_lock(globals.active_timers_mutex); + globals.active_interval_timers[it->active_id] = NULL; + globals.active_timers_count--; + switch_mutex_unlock(globals.active_timers_mutex); return SWITCH_STATUS_GENERR; } } @@ -129,18 +197,39 @@ static switch_status_t posix_timer_start_interval(interval_timer_t *it, int inte } /** - * Stop a timer + * Delete an interval timer + * @param it the interval timer */ -static switch_status_t posix_timer_stop_interval(interval_timer_t *it) +static void interval_timer_delete(interval_timer_t *it) +{ + /* remove from active timers */ + switch_mutex_lock(globals.active_timers_mutex); + if (globals.active_interval_timers[it->active_id]) { + globals.active_interval_timers[it->active_id] = NULL; + globals.active_timers_count--; + } + switch_mutex_unlock(globals.active_timers_mutex); + + /* delete the POSIX timer and mark interval timer as destroyed (users == 0) */ + switch_mutex_lock(it->mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "deleting %d ms timer #%d (%d)\n", it->interval, it->num + 1, it->active_id); + timer_delete(it->timer); + memset(&it->timer, 0, sizeof(it->timer)); + it->users = 0; + switch_mutex_unlock(it->mutex); +} + +/** + * Remove a user from interval timer. Delete if no more users remain. + * @param it the interval timer + * @return SWITCH_STATUS_SUCCESS + */ +static switch_status_t interval_timer_stop(interval_timer_t *it) { if (it->users > 0) { it->users--; if (it->users == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "stopping %d ms timer #%d\n", it->interval, it->id + 1); - switch_mutex_lock(it->mutex); - timer_delete(it->timer); - memset(&it->timer, 0, sizeof(it->timer)); - switch_mutex_unlock(it->mutex); + interval_timer_delete(it); } } return SWITCH_STATUS_SUCCESS; @@ -151,26 +240,27 @@ static switch_status_t posix_timer_stop_interval(interval_timer_t *it) * @param timer the timer * @return SWITCH_STATUS_SUCCESS if successful otherwise SWITCH_STATUS_GENERR */ -static switch_status_t posix_timer_init(switch_timer_t *timer) +static switch_status_t mod_posix_timer_init(switch_timer_t *timer) { interval_timer_t *it; switch_status_t status; - int interval_timer_id; + int interval_timer_num; if (timer->interval < 1 || timer->interval > MAX_INTERVAL) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Bad interval: %d\n", timer->interval); return SWITCH_STATUS_GENERR; } switch_mutex_lock(globals.interval_timers_mutex); - interval_timer_id = globals.next_interval_timer_id[timer->interval]++; - if (globals.next_interval_timer_id[timer->interval] >= TIMERS_PER_INTERVAL) { - globals.next_interval_timer_id[timer->interval] = 0; + interval_timer_num = globals.next_interval_timer_num[timer->interval]++; + if (globals.next_interval_timer_num[timer->interval] >= TIMERS_PER_INTERVAL) { + globals.next_interval_timer_num[timer->interval] = 0; } - it = &globals.interval_timers[timer->interval][interval_timer_id]; - it->id = interval_timer_id; + it = &globals.interval_timers[timer->interval][interval_timer_num]; + it->num = interval_timer_num; it->interval = timer->interval; - status = posix_timer_start_interval(it, timer->interval); + status = interval_timer_start(it, timer->interval); timer->private_info = it; switch_mutex_unlock(globals.interval_timers_mutex); @@ -182,7 +272,7 @@ static switch_status_t posix_timer_init(switch_timer_t *timer) * @param timer the timer * @return SWITCH_STATUS_SUCCESS */ -static switch_status_t posix_timer_step(switch_timer_t *timer) +static switch_status_t mod_posix_timer_step(switch_timer_t *timer) { timer->tick++; timer->samplecount += timer->samples; @@ -193,16 +283,16 @@ static switch_status_t posix_timer_step(switch_timer_t *timer) /** * Timer module interface: wait for next tick * @param timer the timer - * @return SWITCH_STATUS_SUCCESS if successful + * @return SWITCH_STATUS_SUCCESS if successful */ -static switch_status_t posix_timer_next(switch_timer_t *timer) +static switch_status_t mod_posix_timer_next(switch_timer_t *timer) { interval_timer_t *it = timer->private_info; if ((int)(timer->tick - it->tick) < -1) { timer->tick = it->tick; } - posix_timer_step(timer); + mod_posix_timer_step(timer); switch_mutex_lock(it->mutex); while ((int)(timer->tick - it->tick) > 0 && !globals.shutdown) { @@ -218,7 +308,7 @@ static switch_status_t posix_timer_next(switch_timer_t *timer) * @param timer the timer * @return SWITCH_STATUS_SUCCESS */ -static switch_status_t posix_timer_sync(switch_timer_t *timer) +static switch_status_t mod_posix_timer_sync(switch_timer_t *timer) { interval_timer_t *it = timer->private_info; timer->tick = it->tick; @@ -232,7 +322,7 @@ static switch_status_t posix_timer_sync(switch_timer_t *timer) * @param step true if timer should be stepped * @return SWITCH_STATUS_SUCCESS if synched, SWITCH_STATUS_FALSE otherwise */ -static switch_status_t posix_timer_check(switch_timer_t *timer, switch_bool_t step) +static switch_status_t mod_posix_timer_check(switch_timer_t *timer, switch_bool_t step) { interval_timer_t *it = timer->private_info; int diff = (int)(timer->tick - it->tick); @@ -245,7 +335,7 @@ static switch_status_t posix_timer_check(switch_timer_t *timer, switch_bool_t st /* timer pending */ timer->diff = 0; if (step) { - posix_timer_step(timer); + mod_posix_timer_step(timer); } return SWITCH_STATUS_SUCCESS; } @@ -255,44 +345,195 @@ static switch_status_t posix_timer_check(switch_timer_t *timer, switch_bool_t st * @param timer the timer * @return SWITCH_STATUS_SUCCESS if successful */ -static switch_status_t posix_timer_destroy(switch_timer_t *timer) +static switch_status_t mod_posix_timer_destroy(switch_timer_t *timer) { interval_timer_t *it = timer->private_info; switch_status_t status; switch_mutex_lock(globals.interval_timers_mutex); - status = posix_timer_stop_interval(it); + status = interval_timer_stop(it); switch_mutex_unlock(globals.interval_timers_mutex); return status; } +/** + * Load the module + */ SWITCH_MODULE_LOAD_FUNCTION(mod_posix_timer_load) { switch_timer_interface_t *timer_interface; memset(&globals, 0, sizeof(globals)); + globals.timer_tick_pipe[0] = -1; + globals.timer_tick_pipe[1] = -1; globals.pool = pool; switch_mutex_init(&globals.interval_timers_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_init(&globals.active_timers_mutex, SWITCH_MUTEX_NESTED, globals.pool); /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(globals.pool, modname); timer_interface = switch_loadable_module_create_interface(*module_interface, SWITCH_TIMER_INTERFACE); timer_interface->interface_name = "posix"; - timer_interface->timer_init = posix_timer_init; - timer_interface->timer_next = posix_timer_next; - timer_interface->timer_step = posix_timer_step; - timer_interface->timer_sync = posix_timer_sync; - timer_interface->timer_check = posix_timer_check; - timer_interface->timer_destroy = posix_timer_destroy; + timer_interface->timer_init = mod_posix_timer_init; + timer_interface->timer_next = mod_posix_timer_next; + timer_interface->timer_step = mod_posix_timer_step; + timer_interface->timer_sync = mod_posix_timer_sync; + timer_interface->timer_check = mod_posix_timer_check; + timer_interface->timer_destroy = mod_posix_timer_destroy; + + /* the pipe allows a signal handler to notify the runtime thread in a async-signal-safe manner */ + if (pipe(globals.timer_tick_pipe) == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create pipe\n"); + globals.shutdown = 1; + return SWITCH_STATUS_GENERR; + } + fcntl(globals.timer_tick_pipe[0], F_SETFL, O_NONBLOCK); + fcntl(globals.timer_tick_pipe[1], F_SETFL, O_NONBLOCK); + + { + struct sigaction sa; + sigset_t sigmask; + + /* Prevent SIG from annoying FS process. It will be unblocked in the runtime thread. */ + sigemptyset(&sigmask); + sigaddset(&sigmask, SIG); + sigprocmask(SIG_BLOCK, &sigmask, NULL); + + /* set up signal handler */ + memset(&sa, 0, sizeof(sa)); + sa.sa_flags = SA_SIGINFO | SA_RESTART; + sa.sa_sigaction = timer_signal_handler; + sigfillset(&sa.sa_mask); + if (sigaction(SIG, &sa, NULL) == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to set up signal handler: %s\n", strerror(errno)); + globals.shutdown = 1; + return SWITCH_STATUS_GENERR; + } + } return SWITCH_STATUS_SUCCESS; } +/** + * Runtime thread watches for timer ticks sent by signal handler over pipe. Broadcasts + * ticks to session threads waiting on timer. + */ +SWITCH_MODULE_RUNTIME_FUNCTION(mod_posix_timer_runtime) +{ + uint8_t active_ids[32]; + sigset_t sigmask; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "runtime thread starting\n"); + + /* allow SIG to be delivered to this thread. */ + sigemptyset(&sigmask); + sigaddset(&sigmask, SIG); + pthread_sigmask(SIG_UNBLOCK, &sigmask, NULL); + + /* run until module shutdown */ + while (!globals.shutdown) { + int retval, i; + fd_set read_fds; + struct timeval timeout = { 0, 200 * 1000 }; /* 200 ms */ + + /* wait for timer tick */ + FD_ZERO(&read_fds); + FD_SET(globals.timer_tick_pipe[0], &read_fds); + retval = select(globals.timer_tick_pipe[0] + 1, &read_fds, NULL, NULL, &timeout); + if (retval == -1) { + if (errno == EINTR) { + /* retry */ + continue; + } + if (errno == EBADF) { + /* done */ + break; + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error waiting on pipe: %s. Timer thread exiting\n", strerror(errno)); + break; + } else if (retval == 0) { + /* retry */ + continue; + } + if (!FD_ISSET(globals.timer_tick_pipe[0], &read_fds)) { + /* retry */ + continue; + } + + /* which timer ticked? */ + retval = read(globals.timer_tick_pipe[0], &active_ids, 32); + if (retval == -1) { + if (errno == EINTR || errno == EAGAIN) { + /* retry */ + continue; + } + if (errno == EBADF) { + /* done */ + break; + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error reading from pipe: %s. Timer thread exiting\n", strerror(errno)); + break; + } else if (retval == 0) { + /* retry */ + continue; + } + + /* notify threads of timer tick */ + for (i = 0; i < retval; i++) { + interval_timer_t *it = NULL; + + /* find interval timer */ + switch_mutex_lock(globals.active_timers_mutex); + it = globals.active_interval_timers[(int)active_ids[i]]; + switch_mutex_unlock(globals.active_timers_mutex); + if (it == NULL) { + continue; + } + + /* send notification */ + switch_mutex_lock(it->mutex); + if (it->users) { + it->tick += 1 + timer_getoverrun(it->timer); + switch_thread_cond_broadcast(it->cond); + } + switch_mutex_unlock(it->mutex); + } + } + + globals.shutdown = 1; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "runtime thread finished\n"); + return SWITCH_STATUS_TERM; +} + +/** + * Module shutdown + */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_posix_timer_shutdown) { + int i; globals.shutdown = 1; + + if (globals.timer_tick_pipe[0] > 0) { + close(globals.timer_tick_pipe[0]); + } + if (globals.timer_tick_pipe[1] > 0) { + close(globals.timer_tick_pipe[1]); + } + + /* Delete all active timers */ + switch_mutex_lock(globals.interval_timers_mutex); + for (i = 0; i < MAX_ACTIVE_TIMERS; i++) { + interval_timer_t *it; + switch_mutex_lock(globals.active_timers_mutex); + it = globals.active_interval_timers[i]; + switch_mutex_unlock(globals.active_timers_mutex); + if (it) { + interval_timer_delete(it); + } + } + switch_mutex_unlock(globals.interval_timers_mutex); + return SWITCH_STATUS_SUCCESS; } diff --git a/src/mod/timers/mod_posix_timer/test/Makefile b/src/mod/timers/mod_posix_timer/test/Makefile index 856345298b..eda0799146 100644 --- a/src/mod/timers/mod_posix_timer/test/Makefile +++ b/src/mod/timers/mod_posix_timer/test/Makefile @@ -1,5 +1,5 @@ all: - gcc ../mod_posix_timer.c main.c switch.c -I. -o timer_test -lpthread -lrt -g -DLOG_LEVEL=-1 + gcc ../mod_posix_timer.c main.c switch.c -I. -o timer_test -lpthread -lrt -lm -g -DLOG_LEVEL=-1 clean: -rm timer_test diff --git a/src/mod/timers/mod_posix_timer/test/main.c b/src/mod/timers/mod_posix_timer/test/main.c index 1df054566d..44f3a34aea 100644 --- a/src/mod/timers/mod_posix_timer/test/main.c +++ b/src/mod/timers/mod_posix_timer/test/main.c @@ -1,44 +1,461 @@ #include #include +#include +#include +#include +#include +#include +#include extern SWITCH_MODULE_LOAD_FUNCTION(mod_posix_timer_load); extern SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_posix_timer_shutdown); +extern SWITCH_MODULE_RUNTIME_FUNCTION(mod_posix_timer_runtime); switch_loadable_module_interface_t *mod = NULL; switch_memory_pool_t pool = { 0 }; +switch_timer_interface_t *timer_if; +pthread_t module_runtime_thread_id; -int main (int argc, char **argv) +pthread_mutex_t session_mutex = PTHREAD_MUTEX_INITIALIZER; +int pass_count; +int warn_count; +int fail_count; +int total_sessions; +int session_count; +int last_reported_session_count; +int shutdown; + + +/** + * Return a random sample from a normal distrubtion centered at mean with + * the specified standard deviation. + * + * THIS FUNCTION IS NOT REENTRANT!!! + */ +double randnorm(double mean, double std_dev) { + static double z1 = -1.0f; + double u1, u2, z0; + + /* random numbers are generated in pairs. See if new pair needs to be calculated */ + if (z1 >= 0.0f) { + z0 = z1; + z1 = -1.0f; + } else { + /* use box-muller transform to generate random number pair over normal distribution */ + u1 = drand48(); + u2 = drand48(); + z0 = sqrt(-2.0f * log(u1)) * cos(2.0f * M_PI * u2); + z1 = sqrt(-2.0f * log(u1)) * sin(2.0f * M_PI * u2); + } + + return (z0 * std_dev) + mean; +} + +/** + * Pick a random sample according the the weights + * @param weights array of weights + * @param num_weights + */ +static int sample(int *weights, int num_weights) +{ + int total_weight = weights[num_weights - 1]; + int s = floor(drand48() * total_weight); int i; - switch_timer_interface_t *timer_if; - switch_timer_t *timer[1000]; - - mod_posix_timer_load(&mod, &pool); - timer_if = mod->timer; - - - // TODO create multi-threaded test - - // create 10 ms timers - for (i = 0; i < 1000; i++) { - timer[i] = malloc(sizeof(switch_timer_t)); - memset(timer[i], 0, sizeof(switch_timer_t)); - timer[i]->interval = 1; - timer[i]->samples = 8; - timer_if->timer_init(timer[i]); + for (i = 0; i < num_weights; i++) { + if (s < weights[i]) { + return i; + } } - - for (i = 0; i < 50000; i++) { - timer_if->timer_next(timer[0]); - } - - // destroy timers - for (i = 0; i < 1000; i++) { - timer_if->timer_destroy(timer[i]); - free(timer[i]); - } - - mod_posix_timer_shutdown(); + printf ("DOH! s = %f\n", s); return 0; } + +/* + * Calculate x - y + * @return 0 if x is before y, the difference otherwise. + */ +double timespec_subtract(struct timespec *x, struct timespec *y) +{ + struct timespec result; + + /* Perform the carry for the later subtraction by updating y. */ + if (x->tv_nsec < y->tv_nsec) { + int nsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1; + y->tv_nsec -= 1000000000 * nsec; + y->tv_sec += nsec; + } + if (x->tv_nsec - y->tv_nsec > 1000000000) { + int nsec = (x->tv_nsec - y->tv_nsec) / 1000000000; + y->tv_nsec += 1000000000 * nsec; + y->tv_sec -= nsec; + } + + /* Return 0 if result is negative. */ + if(x->tv_sec < y->tv_sec) { + return 0.0f; + } + + /* Return the difference */ + result.tv_sec = x->tv_sec - y->tv_sec; + result.tv_nsec = x->tv_nsec - y->tv_nsec; + return (double)result.tv_sec + (double)(result.tv_nsec / 1e9); +} + +/** + * Entry point for the runtime thread + */ +static void *module_thread(void *dummy) +{ + mod_posix_timer_runtime(); + return NULL; +} + +/** + * Load mod_posix_timer and start the runtime thread + */ +static void load_module() +{ + fail_count = 0; + warn_count = 0; + pass_count = 0; + total_sessions = 0; + session_count = 0; + last_reported_session_count = 0; + shutdown = 0; + mod_posix_timer_load(&mod, &pool); + timer_if = mod->timer; + pthread_create(&module_runtime_thread_id, NULL, module_thread, NULL); +} + +/** + * Shutdown mod_posix_timer + */ +static void shutdown_module() +{ + shutdown = 1; + mod_posix_timer_shutdown(); + pthread_join(module_runtime_thread_id, NULL); +} + +/** + * Test rapidly creating and destroying timers + */ +static void test_create_destroy() +{ + switch_timer_t *timers[3000] = { 0 }; + int intervals[4] = { 10, 20, 30, 40 }; + int interval_weights[4] = { 25, 50, 75, 100 }; + int interval_counts[4] = { 0, 0, 0, 0 }; + int toggle[2] = { 75, 100 }; + int timer_count = 0; + + int i = 0; + printf("test_create_destroy()\n"); + for(i = 0; i < 100000000; i++) { + int clear = i % 100000 == 0; + int j; + for (j = 0; j < 3000; j++) { + if (sample(toggle, 2) || clear) { + if (timers[j]) { + interval_counts[timers[j]->interval / 10 - 1]--; + timer_if->timer_destroy(timers[j]); + free(timers[j]); + timers[j] = NULL; + timer_count--; + } else if (!clear) { + int interval = intervals[sample(interval_weights, 4)]; + timers[j] = malloc(sizeof(switch_timer_t)); + memset(timers[j], 0, sizeof(switch_timer_t)); + timers[j]->interval = interval; + timers[j]->samples = interval * 8; + timer_if->timer_init(timers[j]); + timer_count++; + interval_counts[interval / 10 - 1]++; + } + } + } + if (i % 1000 == 0) { + printf("timers = %d, 10ms = %d, 20ms = %d, 30ms = %d, 40ms = %d\n", timer_count, interval_counts[0], interval_counts[1], interval_counts[2], interval_counts[3]); + } + } +} + +/** + * Session thread + */ +typedef struct session_thread_data +{ + int id; + int interval; + double duration; + double actual_duration; + int failed; + int detached; +} session_thread_data_t; + +/** + * Check the result of the session thread's test + * Log a message on failure. Save the result. + */ +static void check_result(session_thread_data_t *sd) +{ + double threshold = sd->interval / 1000.0f; + double diff = sd->actual_duration - sd->duration; + if (diff < 0) { + diff = diff * -1.0f; + } + if (diff > threshold * 2.0) { + sd->failed = 2; + } else if (diff > threshold) { + sd->failed = 1; + } else { + sd->failed = 0; + } + if (sd->failed > 1) { + printf("thread #%d FAILED : expected duration = %f, actual duration = %f, diff = %f, threshold = %f\n", sd->id, sd->duration, sd->actual_duration, diff, threshold); + } else { + //printf("thread #%d PASSED : expected duration = %f, actual duration = %f, diff = %f, threshold = %f\n", sd->id, sd->duration, sd->actual_duration, diff, threshold); + + } +} + +/** + * Creates a timer and advances it until duration expires + */ +void *session_thread(void *arg) +{ + int *pass = 0; + session_thread_data_t *d = (session_thread_data_t *)arg; + switch_timer_t timer = { 0 }; + + /* start the timer */ + timer.interval = d->interval; + timer.samples = d->interval * 8; + if (timer_if->timer_init(&timer) != SWITCH_STATUS_SUCCESS) { + printf("WTF!\n"); + goto done; + } + //timer_if->timer_sync(&timer); + + /* tick for duration */ + { + int i; + struct timespec start, end; + int ticks = floor(d->duration * 1000 / d->interval); + clock_gettime(CLOCK_MONOTONIC, &start); + for (i = 0; i < ticks && !shutdown; i++) { + timer_if->timer_next(&timer); + } + clock_gettime(CLOCK_MONOTONIC, &end); + d->actual_duration = timespec_subtract(&end, &start); + } + + /* stop the timer */ + timer_if->timer_destroy(&timer); + + if (!shutdown) { + check_result(d); + } + + pthread_mutex_lock(&session_mutex); + if (d->failed > 1) { + fail_count++; + } else if (d->failed > 0) { + warn_count++; + } else { + pass_count++; + } + session_count--; + if (session_count % 100 == 0 && last_reported_session_count != session_count) { + printf("sessions = %d\n", session_count); + last_reported_session_count = session_count; + } + pthread_mutex_unlock(&session_mutex); + +done: + if (d->detached) { + free(d); + return NULL; + } + + /* return result */ + return d; +} + + +/** + * @param thread the thread + * @param id for logging + * @param interval the timer period in ms + * @param duration_mean the mean duration for this thread to execute + * @param duration_std_dev the standard deviation from the mean duration + * @param detached if true this thread is detached + */ +static void create_session_thread(pthread_t *thread, int id, int interval, double duration_mean, double duration_std_dev, int detached) +{ + session_thread_data_t *d = malloc(sizeof(session_thread_data_t)); + pthread_mutex_lock(&session_mutex); + total_sessions++; + session_count++; + if (total_sessions % 100 == 0) { + printf("total sessions = %d, sessions = %d, pass = %d, warn = %d, fail = %d\n", total_sessions, session_count, pass_count, warn_count, fail_count); + } + if (session_count % 100 == 0 && last_reported_session_count != session_count) { + printf("sessions = %d\n", session_count); + last_reported_session_count = session_count; + } + pthread_mutex_unlock(&session_mutex); + if (interval == 0) { + printf("WTF WTF WTF!!\n"); + printf("id = %d, interval = %d, duration_mean = %f, duration_std_dev = %f, detached = %d\n", id, interval, duration_mean, duration_std_dev, detached); + } + d->id = id; + d->interval = interval; + d->duration = randnorm(duration_mean, duration_std_dev); + /* truncate duration to interval tick */ + d->duration = ceil(d->duration * 1000 / interval) * interval / 1000.0f; + d->detached = detached; + d->failed = 0; + pthread_create(thread, NULL, session_thread, d); + if (detached) { + pthread_detach(*thread); + } +} + + + +/** + * Create timers at a rate of CPS for test_duration. + * + * @param interval array of timer intervals in ms + * @param interval_weights array of timer intervals weights + * @param num_intervals size of interval array + * @param test_duration how long to run this test, in seconds + * @param cps the "calls per second". This is the rate at which session threads are created + * @param duration_mean mean duration for each thread + * @param duration_std_dev standard deviation from the mean duration + * @param num_timers number of threads to create + */ +static void test_timer_session(int *interval, int *interval_weights, int num_intervals, double test_duration, int cps, int max_sessions, double duration_mean, double duration_std_dev) +{ + int i = 0; + struct timespec start, now, period; + double elapsed = 0.0f; + + printf("test_timer_session(%d, %f, %d, %d, %f, %f)\n", interval[0], test_duration, cps, max_sessions, duration_mean, duration_std_dev); + + + /* create new call threads at CPS for test_duration */ + if (cps == 1) { + period.tv_sec = 1; + period.tv_nsec = 0; + } else { + period.tv_sec = 0; + period.tv_nsec = 1000000000 / cps; + } + + clock_gettime(CLOCK_MONOTONIC, &start); + while (elapsed < test_duration) { + pthread_t thread; + int retval = clock_nanosleep(CLOCK_MONOTONIC, 0, &period, NULL); + if (retval == -1) { + if (errno == EINTR) { + /* retry */ + continue; + } + printf("clock_nanosleep() error: %s\n", strerror(errno)); + break; + } + pthread_mutex_lock(&session_mutex); + if (session_count < max_sessions) { + pthread_mutex_unlock(&session_mutex); + create_session_thread(&thread, ++i, interval[sample(interval_weights, 4)], duration_mean, duration_std_dev, 1); + } else { + pthread_mutex_unlock(&session_mutex); + } + clock_gettime(CLOCK_MONOTONIC, &now); + elapsed = timespec_subtract(&now, &start); + } + + pthread_mutex_lock(&session_mutex); + while (session_count) { + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = 200 * 1000; + pthread_mutex_unlock(&session_mutex); + clock_nanosleep(CLOCK_MONOTONIC, 0, &t, NULL); + pthread_mutex_lock(&session_mutex); + } + pthread_mutex_unlock(&session_mutex); + + + printf("test_timer_session(%d, %f, %d, %d, %f, %f) done\n", interval[0], test_duration, cps, max_sessions, duration_mean, duration_std_dev); +} + +/** + * Create num_timers in threads and tick until duration_mean elapses. + * + * @param interval timer interval in ms + * @param duration_mean mean duration for each thread + * @param duration_std_dev standard deviation from the mean duration + * @param num_timers number of threads to create + */ +static void test_timer(int interval, double duration_mean, double duration_std_dev, int num_timers) +{ + int i; + int pass = 1; + pthread_t *threads = malloc(sizeof(pthread_t) * num_timers); + printf("test_timer(%d, %f, %f, %d)\n", interval, duration_mean, duration_std_dev, num_timers); + + + /* create threads */ + for (i = 0; i < num_timers; i++) { + create_session_thread(&threads[i], i, interval, duration_mean, duration_std_dev, 0); + } + + /* wait for thread results */ + for (i = 0; i < num_timers; i++) { + void *d = NULL; + pthread_join(threads[i], &d); + if (d) { + int result; + session_thread_data_t *sd = (session_thread_data_t *)d; + pass = pass & (sd->failed < 2); + free(sd); + } + } + + printf("test_timer(%d, %f, %f, %d) : %s\n", interval, duration_mean, duration_std_dev, num_timers, pass ? "PASS" : "FAIL"); + free(threads); +} + +/** + * Main program + * + */ +int main (int argc, char **argv) +{ + //int intervals[4] = { 10, 20, 30, 40 }; + //int interval_weights[4] = { 2, 95, 97, 100 }; + int intervals[1] = { 20 }; + int interval_weights[1] = { 100 }; + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + srand48(ts.tv_nsec); + load_module(); + //test_timer(20, 5.0f, .2f, 1000); + //test_timer_session(intervals, interval_weights, 4, 2 * 86400.0f, 90, 2000, 30.0, 5.0f); + while(1) { + /* stop periodically to trigger timer shutdown */ + test_timer_session(intervals, interval_weights, 1, 60, 150, 3000, 30.0, 5.0f); + } + //test_timer(1000, 5.0f, 1); + //test_timer(20, 5.0f, .2f, 1000); + //test_timer(30, 5.0f, 1000); + //test_create_destroy(); + shutdown_module(); + return 0; +} + diff --git a/src/mod/timers/mod_posix_timer/test/switch.c b/src/mod/timers/mod_posix_timer/test/switch.c index 7ac7a3a97c..e58b017d35 100644 --- a/src/mod/timers/mod_posix_timer/test/switch.c +++ b/src/mod/timers/mod_posix_timer/test/switch.c @@ -42,13 +42,17 @@ switch_status_t switch_thread_cond_create(switch_thread_cond_t **cond, switch_me return pthread_cond_init(*cond, NULL); } -switch_status_t switch_thread_cond_timedwait(switch_thread_cond_t *cond, switch_mutex_t *mutex, int wait) +switch_status_t switch_thread_cond_timedwait(switch_thread_cond_t *cond, switch_mutex_t *mutex, long wait) { - struct timespec dur = { 0, 0 }; - clock_gettime(CLOCK_REALTIME, &dur); - dur.tv_sec = wait / 1000000000; - dur.tv_nsec = wait % 1000000000; - return pthread_cond_timedwait(cond, mutex, &dur); + struct timespec abs_time = { 0, 0 }; + /* add wait duration to current time (wait is in microseconds, pthreads wants nanosecond resolution) */ + clock_gettime(CLOCK_REALTIME, &abs_time); + abs_time.tv_sec += wait / 1000000; + abs_time.tv_nsec += (wait % 1000000) * 1000; + /* handle overflow of tv_nsec */ + abs_time.tv_sec += abs_time.tv_nsec / 1000000000; + abs_time.tv_nsec = abs_time.tv_nsec % 1000000000; + return pthread_cond_timedwait(cond, mutex, &abs_time); } switch_status_t switch_thread_cond_broadcast(switch_thread_cond_t *cond) diff --git a/src/mod/timers/mod_posix_timer/test/switch.h b/src/mod/timers/mod_posix_timer/test/switch.h index 25391943fb..7305b8b237 100644 --- a/src/mod/timers/mod_posix_timer/test/switch.h +++ b/src/mod/timers/mod_posix_timer/test/switch.h @@ -9,12 +9,15 @@ #define SWITCH_STATUS_SUCCESS 0 #define SWITCH_STATUS_GENERR 1 #define SWITCH_STATUS_FALSE 2 +#define SWITCH_STATUS_TERM 3 #define SWITCH_MUTEX_NESTED 1 #define SWITCH_CHANNEL_LOG 0 +#define SWITCH_LOG_DEBUG 0 #define SWITCH_LOG_INFO 0 +#define SWITCH_LOG_ERROR 1 typedef int switch_status_t; typedef size_t switch_size_t; @@ -113,7 +116,7 @@ switch_status_t switch_mutex_init(switch_mutex_t **mutex, int flags, switch_memo switch_status_t switch_thread_cond_create(switch_thread_cond_t **cond, switch_memory_pool_t *pool); -switch_status_t switch_thread_cond_timedwait(switch_thread_cond_t *cond, switch_mutex_t *mutex, int wait); +switch_status_t switch_thread_cond_timedwait(switch_thread_cond_t *cond, switch_mutex_t *mutex, long wait); switch_status_t switch_thread_cond_broadcast(switch_thread_cond_t *cond);