Wed Jan 28 12:06:06 CST 2009 Pekka Pessi <first.last@nokia.com>

* sofia-sip/su_wait.h: added su_timer_deferrable(), su_task_wakeup(), su_root_set_max_defer(), su_root_get_max_defer() and su_task_deferrable().
   
  Added implementation to different main-loop implementations in
  libsofia-sip-ua/su.
  
  Fixed su_task_is_running(). 
    
  In libsofia-sip-ua/su, added tests for deferred timers. 
  
  In libsofia-sip-ua-glib/su-glib/su_source.c, added su_source_wakeup() and
  su_source_is_running(). Using su_base_port_send() instead of
  su_source_send(). Using su_base_port_deferable() and
  su_base_port_max_defer(), too.



git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11852 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Michael Jerris 2009-02-11 17:16:44 +00:00
parent fff2dd5b66
commit 231fc1f4eb
16 changed files with 452 additions and 64 deletions

View File

@ -1 +1 @@
Wed Feb 11 11:15:57 CST 2009 Wed Feb 11 11:16:35 CST 2009

View File

@ -484,6 +484,9 @@ SOFIAPUBFUN int su_root_release(su_root_t *root);
SOFIAPUBFUN int su_root_obtain(su_root_t *root); SOFIAPUBFUN int su_root_obtain(su_root_t *root);
SOFIAPUBFUN int su_root_has_thread(su_root_t *root); SOFIAPUBFUN int su_root_has_thread(su_root_t *root);
SOFIAPUBFUN int su_root_set_max_defer(su_root_t *, su_duration_t max_defer);
SOFIAPUBFUN su_duration_t su_root_get_max_defer(su_root_t const *self);
/* Timers */ /* Timers */
SOFIAPUBFUN su_timer_t *su_timer_create(su_task_r const, su_duration_t msec) SOFIAPUBFUN su_timer_t *su_timer_create(su_task_r const, su_duration_t msec)
__attribute__((__malloc__)); __attribute__((__malloc__));
@ -503,10 +506,10 @@ SOFIAPUBFUN su_root_t *su_timer_root(su_timer_t const *);
SOFIAPUBFUN int su_timer_expire(su_timer_queue_t * const, SOFIAPUBFUN int su_timer_expire(su_timer_queue_t * const,
su_duration_t *tout, su_duration_t *tout,
su_time_t now); su_time_t now);
SOFIAPUBFUN int su_timer_deferrable(su_timer_t *t, int value);
/* Tasks */ /* Tasks */
/** NULL task. */
SOFIAPUBVAR su_task_r const su_task_null; SOFIAPUBVAR su_task_r const su_task_null;
SOFIAPUBFUN _su_task_r su_task_init(su_task_r task); SOFIAPUBFUN _su_task_r su_task_init(su_task_r task);
@ -519,6 +522,9 @@ SOFIAPUBFUN int su_task_is_running(su_task_r const);
SOFIAPUBFUN su_root_t *su_task_root(su_task_r const self); SOFIAPUBFUN su_root_t *su_task_root(su_task_r const self);
SOFIAPUBFUN su_timer_queue_t *su_task_timers(su_task_r const self); SOFIAPUBFUN su_timer_queue_t *su_task_timers(su_task_r const self);
SOFIAPUBFUN su_timer_queue_t *su_task_deferrable(su_task_r const task);
SOFIAPUBFUN int su_task_wakeup(su_task_r const task);
SOFIAPUBFUN int su_task_execute(su_task_r const task, SOFIAPUBFUN int su_task_execute(su_task_r const task,
int (*function)(void *), void *arg, int (*function)(void *), void *arg,

View File

@ -73,6 +73,8 @@ int su_base_port_init(su_port_t *self, su_port_vtable_t const *vtable)
if (self) { if (self) {
self->sup_vtable = vtable; self->sup_vtable = vtable;
self->sup_tail = &self->sup_head; self->sup_tail = &self->sup_head;
self->sup_max_defer = 15 * 1000;
return su_port_obtain(self); return su_port_obtain(self);
} }
@ -149,7 +151,6 @@ struct _GSource *su_base_port_gsource(su_port_t *self)
/** @internal Send a message to the port. /** @internal Send a message to the port.
* *
* @retval 1 if port thread needs to be woken
* @retval 0 if there are other messages in queue, too * @retval 0 if there are other messages in queue, too
* @retval -1 upon an error * @retval -1 upon an error
*/ */
@ -167,7 +168,10 @@ int su_base_port_send(su_port_t *self, su_msg_r rmsg)
su_port_unlock(self, "su_port_send"); su_port_unlock(self, "su_port_send");
return wakeup; if (wakeup > 0)
su_port_wakeup(self);
return 0;
} }
else { else {
su_msg_destroy(rmsg); su_msg_destroy(rmsg);
@ -317,12 +321,12 @@ int su_base_port_multishot(su_port_t *self, int multishot)
*/ */
void su_base_port_run(su_port_t *self) void su_base_port_run(su_port_t *self)
{ {
su_duration_t tout = 0; su_duration_t tout = 0, tout2 = 0;
assert(su_port_own_thread(self)); assert(su_port_own_thread(self));
for (self->sup_running = 1; self->sup_running;) { for (self->sup_running = 1; self->sup_running;) {
tout = 2000; tout = self->sup_max_defer;
if (self->sup_prepoll) if (self->sup_prepoll)
self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root); self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root);
@ -330,8 +334,11 @@ void su_base_port_run(su_port_t *self)
if (self->sup_head) if (self->sup_head)
self->sup_vtable->su_port_getmsgs(self); self->sup_vtable->su_port_getmsgs(self);
if (self->sup_timers) if (self->sup_timers || self->sup_deferrable) {
su_timer_expire(&self->sup_timers, &tout, su_now()); su_time_t now = su_now();
su_timer_expire(&self->sup_timers, &tout, now);
su_timer_expire(&self->sup_deferrable, &tout2, now);
}
if (!self->sup_running) if (!self->sup_running)
break; break;
@ -349,13 +356,13 @@ void su_base_port_run_tune(su_port_t *self)
{ {
int i; int i;
int timers = 0, messages = 0, events = 0; int timers = 0, messages = 0, events = 0;
su_duration_t tout = 0, tout0; su_duration_t tout = 0, tout2 = 0;
su_time_t started = su_now(), woken = started, bedtime = woken; su_time_t started = su_now(), woken = started, bedtime = woken;
assert(su_port_own_thread(self)); assert(su_port_own_thread(self));
for (self->sup_running = 1; self->sup_running;) { for (self->sup_running = 1; self->sup_running;) {
tout = 2000; tout = self->sup_max_defer;
timers = 0, messages = 0; timers = 0, messages = 0;
@ -365,8 +372,12 @@ void su_base_port_run_tune(su_port_t *self)
if (self->sup_head) if (self->sup_head)
messages = self->sup_vtable->su_port_getmsgs(self); messages = self->sup_vtable->su_port_getmsgs(self);
if (self->sup_timers) if (self->sup_timers || self->sup_deferrable) {
timers = su_timer_expire(&self->sup_timers, &tout, su_now()); su_time_t now = su_now();
timers =
su_timer_expire(&self->sup_timers, &tout, now) +
su_timer_expire(&self->sup_deferrable, &tout2, now);
}
if (!self->sup_running) if (!self->sup_running)
break; break;
@ -404,6 +415,16 @@ void su_base_port_break(su_port_t *self)
self->sup_running = 0; self->sup_running = 0;
} }
/** @internal
* Check if port is running.
*
* @param self pointer to port
*/
int su_base_port_is_running(su_port_t const *self)
{
return self->sup_running != 0;
}
/** @internal Block until wait object is signaled or timeout. /** @internal Block until wait object is signaled or timeout.
* *
* This function waits for wait objects and the timers associated with * This function waits for wait objects and the timers associated with
@ -435,6 +456,9 @@ su_duration_t su_base_port_step(su_port_t *self, su_duration_t tout)
if (self->sup_timers) if (self->sup_timers)
su_timer_expire(&self->sup_timers, &tout, now); su_timer_expire(&self->sup_timers, &tout, now);
if (self->sup_deferrable)
su_timer_expire(&self->sup_deferrable, &tout, now);
/* if there are messages do a quick wait */ /* if there are messages do a quick wait */
if (self->sup_head) if (self->sup_head)
tout = 0; tout = 0;
@ -444,15 +468,27 @@ su_duration_t su_base_port_step(su_port_t *self, su_duration_t tout)
else else
tout = SU_WAIT_FOREVER; tout = SU_WAIT_FOREVER;
if (self->sup_head) if (self->sup_head) {
if (self->sup_vtable->su_port_getmsgs(self)) { if (self->sup_vtable->su_port_getmsgs(self)) {
/* Check for wait events that may have been generated by messages */ /* Check for wait events that may have been generated by messages */
if (self->sup_vtable->su_port_wait_events(self, 0)) if (self->sup_vtable->su_port_wait_events(self, 0))
tout = 0; tout = 0;
} }
}
if (self->sup_timers) if (self->sup_timers || self->sup_deferrable) {
su_timer_expire(&self->sup_timers, &tout, su_now()); su_duration_t tout2 = SU_WAIT_FOREVER;
now = su_now();
su_timer_expire(&self->sup_timers, &tout, now);
su_timer_expire(&self->sup_deferrable, &tout2, now);
if (tout == SU_WAIT_FOREVER && tout2 != SU_WAIT_FOREVER) {
if (tout2 < self->sup_max_defer)
tout2 = self->sup_max_defer;
tout = tout2;
}
}
if (self->sup_head) if (self->sup_head)
tout = 0; tout = 0;
@ -501,6 +537,22 @@ su_timer_queue_t *su_base_port_timers(su_port_t *self)
return &self->sup_timers; return &self->sup_timers;
} }
su_timer_queue_t *su_base_port_deferrable(su_port_t *self)
{
return &self->sup_deferrable;
}
int su_base_port_max_defer(su_port_t *self,
su_duration_t *return_duration,
su_duration_t *set_duration)
{
if (set_duration && *set_duration > 0)
self->sup_max_defer = *set_duration;
if (return_duration)
*return_duration = self->sup_max_defer;
return 0;
}
/* ====================================================================== /* ======================================================================
* Clones * Clones
*/ */

View File

@ -127,7 +127,7 @@ su_port_vtable_t const su_devpoll_port_vtable[1] =
su_base_port_incref, su_base_port_incref,
su_devpoll_port_decref, su_devpoll_port_decref,
su_base_port_gsource, su_base_port_gsource,
su_socket_port_send, su_base_port_send,
su_devpoll_port_register, su_devpoll_port_register,
su_devpoll_port_unregister, su_devpoll_port_unregister,
su_devpoll_port_deregister, su_devpoll_port_deregister,
@ -148,6 +148,10 @@ su_port_vtable_t const su_devpoll_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
static char const *su_devpoll_port_name(su_port_t const *self) static char const *su_devpoll_port_name(su_port_t const *self)

View File

@ -122,7 +122,7 @@ su_port_vtable_t const su_epoll_port_vtable[1] =
su_base_port_incref, su_base_port_incref,
su_epoll_port_decref, su_epoll_port_decref,
su_base_port_gsource, su_base_port_gsource,
su_socket_port_send, su_base_port_send,
su_epoll_port_register, su_epoll_port_register,
su_epoll_port_unregister, su_epoll_port_unregister,
su_epoll_port_deregister, su_epoll_port_deregister,
@ -143,6 +143,10 @@ su_port_vtable_t const su_epoll_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
static char const *su_epoll_port_name(su_port_t const *self) static char const *su_epoll_port_name(su_port_t const *self)

View File

@ -119,7 +119,7 @@ su_port_vtable_t const su_kqueue_port_vtable[1] =
su_base_port_incref, su_base_port_incref,
su_kqueue_port_decref, su_kqueue_port_decref,
su_base_port_gsource, su_base_port_gsource,
su_socket_port_send, su_base_port_send,
su_kqueue_port_register, su_kqueue_port_register,
su_kqueue_port_unregister, su_kqueue_port_unregister,
su_kqueue_port_deregister, su_kqueue_port_deregister,
@ -140,6 +140,10 @@ su_port_vtable_t const su_kqueue_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
static char const *su_kqueue_port_name(su_port_t const *self) static char const *su_kqueue_port_name(su_port_t const *self)

View File

@ -204,6 +204,10 @@ su_port_vtable_t const su_osx_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
/* XXX - mela static void su_osx_port_destroy(su_port_t *self); */ /* XXX - mela static void su_osx_port_destroy(su_port_t *self); */

View File

@ -120,7 +120,7 @@ su_port_vtable_t const su_poll_port_vtable[1] =
su_base_port_incref, su_base_port_incref,
su_poll_port_decref, su_poll_port_decref,
su_base_port_gsource, su_base_port_gsource,
su_socket_port_send, su_base_port_send,
su_poll_port_register, su_poll_port_register,
su_poll_port_unregister, su_poll_port_unregister,
su_poll_port_deregister, su_poll_port_deregister,
@ -141,6 +141,10 @@ su_port_vtable_t const su_poll_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
static char const *su_poll_port_name(su_port_t const *self) static char const *su_poll_port_name(su_port_t const *self)

View File

@ -150,6 +150,14 @@ typedef struct su_port_vtable {
int (*su_port_execute)(su_task_r const task, int (*su_port_execute)(su_task_r const task,
int (*function)(void *), void *arg, int (*function)(void *), void *arg,
int *return_value); int *return_value);
/* >= 1.12.11 */
su_timer_queue_t *(*su_port_deferrable)(su_port_t *port);
int (*su_port_max_defer)(su_port_t *port,
su_duration_t *return_duration,
su_duration_t *set_duration);
int (*su_port_wakeup)(su_port_t *port);
int (*su_port_is_running)(su_port_t const *port);
} su_port_vtable_t; } su_port_vtable_t;
SOFIAPUBFUN su_port_t *su_port_create(void) SOFIAPUBFUN su_port_t *su_port_create(void)
@ -244,6 +252,12 @@ int su_port_send(su_port_t *self, su_msg_r rmsg)
return base->sup_vtable->su_port_send(self, rmsg); return base->sup_vtable->su_port_send(self, rmsg);
} }
su_inline
int su_port_wakeup(su_port_t *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_wakeup(self);
}
su_inline su_inline
int su_port_register(su_port_t *self, int su_port_register(su_port_t *self,
@ -372,7 +386,7 @@ int su_port_remove_prepoll(su_port_t *self,
} }
su_inline su_inline
su_timer_t **su_port_timers(su_port_t *self) su_timer_queue_t *su_port_timers(su_port_t *self)
{ {
su_virtual_port_t *base = (su_virtual_port_t *)self; su_virtual_port_t *base = (su_virtual_port_t *)self;
return base->sup_vtable->su_port_timers(self); return base->sup_vtable->su_port_timers(self);
@ -399,6 +413,41 @@ int su_port_getmsgs_from(su_port_t *self, su_port_t *cloneport)
return base->sup_vtable->su_port_getmsgs_from(self, cloneport); return base->sup_vtable->su_port_getmsgs_from(self, cloneport);
} }
/** Extension from >= 1.12.11 */
su_inline
su_timer_queue_t *su_port_deferrable(su_port_t *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
if (base == NULL)
return (void *)(errno = EFAULT), NULL;
return base->sup_vtable->su_port_deferrable(self);
}
su_inline
int su_port_max_defer(su_port_t *self,
su_duration_t *return_duration,
su_duration_t *set_duration)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
if (base == NULL)
return (errno = EFAULT), -1;
return base->sup_vtable->su_port_max_defer(self,
return_duration,
set_duration);
}
su_inline
int su_port_is_running(su_port_t const *self)
{
su_virtual_port_t *base = (su_virtual_port_t *)self;
return base && base->sup_vtable->su_port_is_running(self);
}
SOFIAPUBFUN void su_port_wait(su_clone_r rclone); SOFIAPUBFUN void su_port_wait(su_clone_r rclone);
SOFIAPUBFUN int su_port_execute(su_task_r const task, SOFIAPUBFUN int su_port_execute(su_task_r const task,
@ -427,7 +476,9 @@ typedef struct su_base_port_s {
su_msg_t *sup_head, **sup_tail; su_msg_t *sup_head, **sup_tail;
/* Timer list */ /* Timer list */
su_timer_queue_t sup_timers; su_timer_queue_t sup_timers, sup_deferrable;
su_duration_t sup_max_defer; /**< Maximum time to defer */
unsigned sup_running; /**< In su_root_run() loop? */ unsigned sup_running; /**< In su_root_run() loop? */
} su_base_port_t; } su_base_port_t;
@ -468,7 +519,7 @@ SOFIAPUBFUN int su_base_port_add_prepoll(su_port_t *self,
SOFIAPUBFUN int su_base_port_remove_prepoll(su_port_t *self, su_root_t *root); SOFIAPUBFUN int su_base_port_remove_prepoll(su_port_t *self, su_root_t *root);
SOFIAPUBFUN su_timer_t **su_base_port_timers(su_port_t *self); SOFIAPUBFUN su_timer_queue_t *su_base_port_timers(su_port_t *self);
SOFIAPUBFUN int su_base_port_multishot(su_port_t *self, int multishot); SOFIAPUBFUN int su_base_port_multishot(su_port_t *self, int multishot);
@ -479,6 +530,14 @@ SOFIAPUBFUN int su_base_port_start_shared(su_root_t *parent,
su_root_deinit_f deinit); su_root_deinit_f deinit);
SOFIAPUBFUN void su_base_port_wait(su_clone_r rclone); SOFIAPUBFUN void su_base_port_wait(su_clone_r rclone);
SOFIAPUBFUN su_timer_queue_t *su_base_port_deferrable(su_port_t *self);
SOFIAPUBFUN int su_base_port_max_defer(su_port_t *self,
su_duration_t *return_duration,
su_duration_t *set_duration);
SOFIAPUBFUN int su_base_port_is_running(su_port_t const *self);
/* ---------------------------------------------------------------------- */ /* ---------------------------------------------------------------------- */
#if SU_HAVE_PTHREADS #if SU_HAVE_PTHREADS
@ -493,7 +552,7 @@ typedef struct su_pthread_port_s {
pthread_t sup_tid; pthread_t sup_tid;
pthread_mutex_t sup_obtained[1]; pthread_mutex_t sup_obtained[1];
#if 0 #if 0 /* Pausing and resuming are not used */
pthread_mutex_t sup_runlock[1]; pthread_mutex_t sup_runlock[1];
pthread_cond_t sup_resume[1]; pthread_cond_t sup_resume[1];
short sup_paused; /**< True if thread is paused */ short sup_paused; /**< True if thread is paused */
@ -535,7 +594,6 @@ SOFIAPUBFUN int su_pthread_port_execute(su_task_r const task,
int (*function)(void *), void *arg, int (*function)(void *), void *arg,
int *return_value); int *return_value);
#if 0 #if 0
SOFIAPUBFUN int su_pthread_port_pause(su_port_t *self); SOFIAPUBFUN int su_pthread_port_pause(su_port_t *self);
SOFIAPUBFUN int su_pthread_port_resume(su_port_t *self); SOFIAPUBFUN int su_pthread_port_resume(su_port_t *self);
@ -570,6 +628,7 @@ SOFIAPUBFUN int su_socket_port_init(su_socket_port_t *,
su_port_vtable_t const *); su_port_vtable_t const *);
SOFIAPUBFUN void su_socket_port_deinit(su_socket_port_t *self); SOFIAPUBFUN void su_socket_port_deinit(su_socket_port_t *self);
SOFIAPUBFUN int su_socket_port_send(su_port_t *self, su_msg_r rmsg); SOFIAPUBFUN int su_socket_port_send(su_port_t *self, su_msg_r rmsg);
SOFIAPUBFUN int su_socket_port_wakeup(su_port_t *self);
SOFIA_END_DECLS SOFIA_END_DECLS

View File

@ -315,6 +315,9 @@ static void *su_pthread_port_clone_main(void *varg)
task->sut_root->sur_magic = arg->magic; task->sut_root->sur_magic = arg->magic;
task->sut_root->sur_deinit = arg->deinit; task->sut_root->sur_deinit = arg->deinit;
su_root_set_max_defer(task->sut_root,
su_root_get_max_defer(arg->parent));
if (arg->init(task->sut_root, arg->magic) == 0) { if (arg->init(task->sut_root, arg->magic) == 0) {
su_pthread_port_return_to_parent(arg, 0), arg = NULL; su_pthread_port_return_to_parent(arg, 0), arg = NULL;

View File

@ -86,6 +86,8 @@ struct su_root_s;
* - su_root_run() [Do not call from cloned task] * - su_root_run() [Do not call from cloned task]
* - su_root_break() [Do not call from cloned task] * - su_root_break() [Do not call from cloned task]
* - su_root_step() [Do not call from cloned task] * - su_root_step() [Do not call from cloned task]
* - su_root_get_max_defer()
* - su_root_set_max_defer()
* - su_root_task() * - su_root_task()
* *
* New tasks can be created via su_clone_start() function. * New tasks can be created via su_clone_start() function.
@ -127,6 +129,7 @@ int su_timer_reset_all(su_timer_t **t0, su_task_r);
* Tasks * Tasks
*/ */
/** NULL task. */
su_task_r const su_task_null = SU_TASK_R_INIT; su_task_r const su_task_null = SU_TASK_R_INIT;
#define SU_TASK_ZAP(t, f) \ #define SU_TASK_ZAP(t, f) \
@ -259,13 +262,13 @@ int su_task_cmp(su_task_r const a, su_task_r const b)
* *
* @retval true (nonzero) if task is not stopped, * @retval true (nonzero) if task is not stopped,
* @retval zero if it is null or stopped. * @retval zero if it is null or stopped.
*
* @note A task sharing thread with another task is considered stopped when
* ever the the main task is stopped.
*/ */
int su_task_is_running(su_task_r const task) int su_task_is_running(su_task_r const task)
{ {
return return task && task->sut_root && su_port_is_running(task->sut_port);
task &&
task->sut_port &&
task->sut_root;
} }
/** @internal /** @internal
@ -318,14 +321,43 @@ int su_task_detach(su_task_r self)
* *
* @param task task handle * @param task task handle
* *
* @return A timer list of the task. If there are no timers, it returns * @return A timer list of the task.
* NULL.
*/ */
su_timer_queue_t *su_task_timers(su_task_r const task) su_timer_queue_t *su_task_timers(su_task_r const task)
{ {
return task->sut_port ? su_port_timers(task->sut_port) : NULL; return task->sut_port ? su_port_timers(task->sut_port) : NULL;
} }
/**Return the queue for deferrable timers associated with given task.
*
* @param task task handle
*
* @return A timer list of the task.
*
* @NEW_1_12_11
*/
su_timer_queue_t *su_task_deferrable(su_task_r const task)
{
return task ? su_port_deferrable(task->sut_port) : NULL;
}
/** Wakeup a task.
*
* Wake up a task. This function is mainly useful when using deferrable
* timers executed upon wakeup.
*
* @param task task handle
*
* @retval 0 if succesful
* @retval -1 upon an error
*
* @NEW_1_12_11
*/
int su_task_wakeup(su_task_r const task)
{
return task ? su_port_wakeup(task->sut_port) : -1;
}
/** Execute the @a function by @a task thread. /** Execute the @a function by @a task thread.
* *
* @retval 0 if successful * @retval 0 if successful
@ -447,6 +479,10 @@ void su_root_destroy(su_root_t *self)
unregistered = su_port_unregister_all(port, self); unregistered = su_port_unregister_all(port, self);
reset = su_timer_reset_all(su_task_timers(self->sur_task), self->sur_task); reset = su_timer_reset_all(su_task_timers(self->sur_task), self->sur_task);
if (su_task_deferrable(self->sur_task))
reset += su_timer_reset_all(su_task_deferrable(self->sur_task),
self->sur_task);
if (unregistered || reset) if (unregistered || reset)
SU_DEBUG_1(("su_root_destroy: " SU_DEBUG_1(("su_root_destroy: "
"%u registered waits, %u timers\n", "%u registered waits, %u timers\n",
@ -601,6 +637,56 @@ int su_root_unregister(su_root_t *self,
return su_port_unregister(self->sur_port, self, wait, callback, arg); return su_port_unregister(self->sur_port, self, wait, callback, arg);
} }
/** Set maximum defer time.
*
* The deferrable timers can be deferred until the task is otherwise
* activated, however, they are deferred no longer than the maximum defer
* time. The maximum defer time determines also the maximum time during
* which task waits for events while running. The maximum defer time is 15
* seconds by default.
*
* Cloned tasks inherit the maximum defer time.
*
* @param self pointer to root object
* @param max_defer maximum defer time in milliseconds
*
* @retval 0 when successful
* @retval -1 upon an error
*
* @sa su_timer_deferrable()
*
* @NEW_1_12_11
*/
int su_root_set_max_defer(su_root_t *self, su_duration_t max_defer)
{
if (!self)
return -1;
return su_port_max_defer(self->sur_port, &max_defer, &max_defer);
}
/** Get maximum defer time.
*
* The deferrable timers can be deferred until the task is otherwise
* activated, however, they are deferred no longer than the maximum defer
* time. The maximum defer time is 15 seconds by default.
*
* @param root pointer to root object
*
* @return Maximum defer time
*
* @NEW_1_12_7
*/
su_duration_t su_root_get_max_defer(su_root_t const *self)
{
su_duration_t max_defer = SU_WAIT_MAX;
if (self != NULL)
su_port_max_defer(self->sur_port, &max_defer, NULL);
return max_defer;
}
/** Remove a su_wait_t registration. /** Remove a su_wait_t registration.
* *
* The function su_root_deregister() deregisters a su_wait_t object. The * The function su_root_deregister() deregisters a su_wait_t object. The

View File

@ -139,7 +139,7 @@ su_port_vtable_t const su_select_port_vtable[1] =
su_base_port_incref, su_base_port_incref,
su_select_port_decref, su_select_port_decref,
su_base_port_gsource, su_base_port_gsource,
su_socket_port_send, su_base_port_send,
su_select_port_register, su_select_port_register,
su_select_port_unregister, su_select_port_unregister,
su_select_port_deregister, su_select_port_deregister,
@ -160,6 +160,10 @@ su_port_vtable_t const su_select_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
static char const *su_select_port_name(su_port_t const *self) static char const *su_select_port_name(su_port_t const *self)

View File

@ -179,23 +179,17 @@ void su_socket_port_deinit(su_port_t *self)
su_pthread_port_deinit(self); su_pthread_port_deinit(self);
} }
/** @internal Send a message to the port. */ /** @internal Wake up the port. */
int su_socket_port_send(su_port_t *self, su_msg_r rmsg) int su_socket_port_wakeup(su_port_t *self)
{ {
int wakeup = su_base_port_send(self, rmsg); assert(self->sup_mbox[SU_MBOX_SEND] != INVALID_SOCKET);
if (wakeup < 0) if (!su_pthread_port_own_thread(self) &&
return -1; send(self->sup_mbox[SU_MBOX_SEND], "X", 1, 0) == -1) {
if (wakeup) {
assert(self->sup_mbox[SU_MBOX_SEND] != INVALID_SOCKET);
if (send(self->sup_mbox[SU_MBOX_SEND], "X", 1, 0) == -1) {
#if HAVE_SOCKETPAIR #if HAVE_SOCKETPAIR
if (su_errno() != EWOULDBLOCK) if (su_errno() != EWOULDBLOCK)
#endif #endif
su_perror("su_msg_send: send()"); su_perror("su_msg_send: send()");
}
} }
return 0; return 0;

View File

@ -164,7 +164,9 @@ struct su_timer_s {
su_timer_arg_t *sut_arg; /**< Pointer to argument data */ su_timer_arg_t *sut_arg; /**< Pointer to argument data */
su_time_t sut_run; /**< When this timer was last waken up */ su_time_t sut_run; /**< When this timer was last waken up */
unsigned sut_woken; /**< Timer has waken up this many times */ unsigned sut_woken; /**< Timer has waken up this many times */
unsigned short sut_running; /**< Timer is running */
unsigned sut_running:2;/**< Timer is running */
unsigned sut_deferrable:1;/**< Timer can be deferrable */
}; };
/** Timer running status */ /** Timer running status */
@ -246,9 +248,9 @@ su_timer_set0(su_timer_queue_t *timers,
* @retval NULL upon an error * @retval NULL upon an error
*/ */
static static
su_timer_queue_t *su_timer_tree(su_timer_t const *t, su_timer_queue_t *su_timer_queue(su_timer_t const *t,
int use_sut_duration, int use_sut_duration,
char const *caller) char const *caller)
{ {
su_timer_queue_t *timers; su_timer_queue_t *timers;
@ -265,7 +267,10 @@ su_timer_queue_t *su_timer_tree(su_timer_t const *t,
return NULL; return NULL;
} }
timers = su_task_timers(t->sut_task); if (t->sut_deferrable)
timers = su_task_deferrable(t->sut_task);
else
timers = su_task_timers(t->sut_task);
if (timers == NULL) { if (timers == NULL) {
SU_DEBUG_1(("%s(%p): %s\n", caller, (void *)t, "invalid timer")); SU_DEBUG_1(("%s(%p): %s\n", caller, (void *)t, "invalid timer"));
@ -285,7 +290,7 @@ su_timer_queue_t *su_timer_tree(su_timer_t const *t,
* Allocate and initialize an instance of su_timer_t. * Allocate and initialize an instance of su_timer_t.
* *
* @param task a task for root object with which the timer will be associated * @param task a task for root object with which the timer will be associated
* @param msec the default duration of the timer * @param msec the default duration of the timer in milliseconds
* *
* @return A pointer to allocated timer instance, NULL on error. * @return A pointer to allocated timer instance, NULL on error.
*/ */
@ -307,6 +312,7 @@ su_timer_t *su_timer_create(su_task_r const task, su_duration_t msec)
return retval; return retval;
} }
/** Destroy a timer. /** Destroy a timer.
* *
* Deinitialize and free an instance of su_timer_t. * Deinitialize and free an instance of su_timer_t.
@ -322,6 +328,7 @@ void su_timer_destroy(su_timer_t *t)
} }
} }
/** Set the timer for the given @a interval. /** Set the timer for the given @a interval.
* *
* Sets (starts) the given timer to expire after the specified duration. * Sets (starts) the given timer to expire after the specified duration.
@ -338,7 +345,7 @@ int su_timer_set_interval(su_timer_t *t,
su_timer_arg_t *arg, su_timer_arg_t *arg,
su_duration_t interval) su_duration_t interval)
{ {
su_timer_queue_t *timers = su_timer_tree(t, 0, "su_timer_set_interval"); su_timer_queue_t *timers = su_timer_queue(t, 0, "su_timer_set_interval");
return su_timer_set0(timers, t, wakeup, arg, su_now(), interval); return su_timer_set0(timers, t, wakeup, arg, su_now(), interval);
} }
@ -359,7 +366,7 @@ int su_timer_set(su_timer_t *t,
su_timer_f wakeup, su_timer_f wakeup,
su_timer_arg_t *arg) su_timer_arg_t *arg)
{ {
su_timer_queue_t *timers = su_timer_tree(t, 1, "su_timer_set"); su_timer_queue_t *timers = su_timer_queue(t, 1, "su_timer_set");
return su_timer_set0(timers, t, wakeup, arg, su_now(), t->sut_duration); return su_timer_set0(timers, t, wakeup, arg, su_now(), t->sut_duration);
} }
@ -380,7 +387,7 @@ int su_timer_set_at(su_timer_t *t,
su_wakeup_arg_t *arg, su_wakeup_arg_t *arg,
su_time_t when) su_time_t when)
{ {
su_timer_queue_t *timers = su_timer_tree(t, 0, "su_timer_set_at"); su_timer_queue_t *timers = su_timer_queue(t, 0, "su_timer_set_at");
return su_timer_set0(timers, t, wakeup, arg, when, 0); return su_timer_set0(timers, t, wakeup, arg, when, 0);
} }
@ -407,7 +414,7 @@ int su_timer_run(su_timer_t *t,
su_timer_f wakeup, su_timer_f wakeup,
su_timer_arg_t *arg) su_timer_arg_t *arg)
{ {
su_timer_queue_t *timers = su_timer_tree(t, 1, "su_timer_run"); su_timer_queue_t *timers = su_timer_queue(t, 1, "su_timer_run");
su_time_t now; su_time_t now;
if (timers == NULL) if (timers == NULL)
@ -440,7 +447,7 @@ int su_timer_set_for_ever(su_timer_t *t,
su_timer_f wakeup, su_timer_f wakeup,
su_timer_arg_t *arg) su_timer_arg_t *arg)
{ {
su_timer_queue_t *timers = su_timer_tree(t, 1, "su_timer_set_for_ever"); su_timer_queue_t *timers = su_timer_queue(t, 1, "su_timer_set_for_ever");
su_time_t now; su_time_t now;
if (timers == NULL) if (timers == NULL)
@ -463,7 +470,7 @@ int su_timer_set_for_ever(su_timer_t *t,
*/ */
int su_timer_reset(su_timer_t *t) int su_timer_reset(su_timer_t *t)
{ {
su_timer_queue_t *timers = su_timer_tree(t, 0, "su_timer_reset"); su_timer_queue_t *timers = su_timer_queue(t, 0, "su_timer_reset");
if (timers == NULL) if (timers == NULL)
return -1; return -1;
@ -508,7 +515,7 @@ int su_timer_expire(su_timer_queue_t * const timers,
if (SU_TIME_CMP(t->sut_when, now) > 0) { if (SU_TIME_CMP(t->sut_when, now) > 0) {
su_duration_t at = su_duration(t->sut_when, now); su_duration_t at = su_duration(t->sut_when, now);
if (at < *timeout) if (at < *timeout || *timeout < 0)
*timeout = at; *timeout = at;
break; break;
@ -622,3 +629,31 @@ su_root_t *su_timer_root(su_timer_t const *t)
return t ? su_task_root(t->sut_task) : NULL; return t ? su_task_root(t->sut_task) : NULL;
} }
/** Change timer as deferrable (or as undeferrable).
*
* A deferrable timer is executed after the given timeout, however, the task
* tries to avoid being woken up only because the timeout. Deferable timers
* have their own queue and timers there are ignored when calculating the
* timeout for epoll()/select()/whatever unless the timeout would exceed the
* maximum defer time. The maximum defer time is 15 seconds by default, but
* it can be modified by su_root_set_max_defer().
*
* @param t pointer to the timer
* @param value make timer deferrable if true, undeferrable if false
*
* @return 0 if succesful, -1 upon an error
*
* @sa su_root_set_max_defer()
*
* @NEW_1_12_7
*/
int su_timer_deferrable(su_timer_t *t, int value)
{
if (t == NULL || su_task_deferrable(t->sut_task) == NULL)
return errno = EINVAL, -1;
t->sut_deferrable = value != 0;
return 0;
}

View File

@ -118,7 +118,7 @@ su_port_vtable_t const su_wsevent_port_vtable[1] =
su_base_port_incref, su_base_port_incref,
su_wsevent_port_decref, su_wsevent_port_decref,
su_base_port_gsource, su_base_port_gsource,
su_socket_port_send, su_base_port_send,
su_wsevent_port_register, su_wsevent_port_register,
su_wsevent_port_unregister, su_wsevent_port_unregister,
su_wsevent_port_deregister, su_wsevent_port_deregister,
@ -139,6 +139,10 @@ su_port_vtable_t const su_wsevent_port_vtable[1] =
su_base_port_start_shared, su_base_port_start_shared,
su_pthread_port_wait, su_pthread_port_wait,
su_pthread_port_execute, su_pthread_port_execute,
su_base_port_deferrable,
su_base_port_max_defer,
su_socket_port_wakeup,
su_base_port_is_running,
}}; }};
static char const *su_wsevent_port_name(su_port_t const *self) static char const *su_wsevent_port_name(su_port_t const *self)

View File

@ -61,6 +61,8 @@ typedef struct test_ep_s test_ep_t;
#include <pthread.h> #include <pthread.h>
#endif #endif
#define ALARM_IN_SECONDS 120
struct test_ep_s { struct test_ep_s {
test_ep_t *next, **prev, **list; test_ep_t *next, **prev, **list;
int i; int i;
@ -99,6 +101,8 @@ struct root_test_s {
unsigned rt_executed:1; unsigned rt_executed:1;
unsigned rt_t1:1, rt_t2:1;
unsigned :0; unsigned :0;
test_ep_at rt_ep[5]; test_ep_at rt_ep[5];
@ -161,7 +165,6 @@ int test_api(root_test_t *rt)
END(); END();
} }
#if SU_HAVE_PTHREADS #if SU_HAVE_PTHREADS
#include <pthread.h> #include <pthread.h>
@ -589,6 +592,61 @@ void send_a_reporter_msg(root_test_t *rt,
rt->rt_sent_reporter = 1; rt->rt_sent_reporter = 1;
} }
static void expire1(root_test_t *rt, su_timer_t *t, su_timer_arg_t *arg)
{
(void)arg;
rt->rt_t1 = 1;
}
static void expire2(root_test_t *rt, su_timer_t *t, su_timer_arg_t *arg)
{
(void)arg;
rt->rt_t2 = 1;
}
int timer_test(root_test_t rt[1])
{
BEGIN();
su_timer_t *t1, *t2;
su_duration_t defer;
TEST_1(t1 = su_timer_create(su_root_task(rt->rt_root), 100));
TEST_1(t2 = su_timer_create(su_root_task(rt->rt_root), 110));
rt->rt_t1 = rt->rt_t2 = 0;
TEST_1(su_root_step(rt->rt_root, 0) == SU_WAIT_FOREVER);
TEST_1(su_root_set_max_defer(rt->rt_root, 30000) != -1);
TEST(su_root_get_max_defer(rt->rt_root), 30000);
if (su_timer_deferrable(t1, 1) == 0) {
/*
* If only a deferrable timer is set, su_root_step() should return
* about the maximum defer time, which now defaults to 15 seconds
*/
TEST(su_timer_set(t1, expire1, NULL), 0);
defer = su_root_step(rt->rt_root, 0);
TEST_1(defer > 100);
}
else {
TEST(su_timer_set(t1, expire1, NULL), 0);
}
TEST(su_timer_set(t2, expire2, NULL), 0);
while (su_root_step(rt->rt_root, 100) != SU_WAIT_FOREVER)
;
TEST_1(rt->rt_t1 && rt->rt_t2);
su_timer_destroy(t1);
su_timer_destroy(t2);
END();
}
static int set_execute_bit_and_return_3(void *void_rt) static int set_execute_bit_and_return_3(void *void_rt)
{ {
root_test_t *rt = void_rt; root_test_t *rt = void_rt;
@ -612,13 +670,34 @@ static void receive_simple_msg(root_test_t *rt,
su_task_cmp(su_msg_to(msg), su_task_null) == 0; su_task_cmp(su_msg_to(msg), su_task_null) == 0;
} }
static int clone_test(root_test_t rt[1]) static void expire1destroy(root_test_t *rt, su_timer_t *t, su_timer_arg_t *arg)
{
(void)arg;
rt->rt_t1 = 1;
su_timer_destroy(t);
}
static int set_deferrable_timer(void *void_rt)
{
root_test_t *rt = void_rt;
su_timer_t *t1;
TEST_1(t1 = su_timer_create(su_clone_task(rt->rt_clone), 100));
TEST_1(su_timer_deferrable(t1, 1) == 0);
TEST(su_timer_set(t1, expire1destroy, NULL), 0);
return 0;
}
static int clone_test(root_test_t rt[1], int multithread)
{ {
BEGIN(); BEGIN();
su_msg_r m = SU_MSG_R_INIT; su_msg_r m = SU_MSG_R_INIT;
int retval; int retval;
su_root_threading(rt->rt_root, multithread);
rt->rt_fail_init = 0; rt->rt_fail_init = 0;
rt->rt_fail_deinit = 0; rt->rt_fail_deinit = 0;
rt->rt_success_init = 0; rt->rt_success_init = 0;
@ -635,6 +714,10 @@ static int clone_test(root_test_t rt[1])
TEST_1(rt->rt_fail_init); TEST_1(rt->rt_fail_init);
TEST_1(rt->rt_fail_deinit); TEST_1(rt->rt_fail_deinit);
/* Defer longer than maximum allowed run time */
TEST_1(su_root_set_max_defer(rt->rt_root, ALARM_IN_SECONDS * 1000) != -1);
TEST(su_root_get_max_defer(rt->rt_root), ALARM_IN_SECONDS * 1000);
TEST(su_clone_start(rt->rt_root, TEST(su_clone_start(rt->rt_root,
rt->rt_clone, rt->rt_clone,
rt, rt,
@ -643,6 +726,7 @@ static int clone_test(root_test_t rt[1])
TEST_1(rt->rt_success_init); TEST_1(rt->rt_success_init);
TEST_1(!rt->rt_success_deinit); TEST_1(!rt->rt_success_deinit);
/* Test su_task_execute() */
retval = -1; retval = -1;
rt->rt_executed = 0; rt->rt_executed = 0;
TEST(su_task_execute(su_clone_task(rt->rt_clone), TEST(su_task_execute(su_clone_task(rt->rt_clone),
@ -666,6 +750,28 @@ static int clone_test(root_test_t rt[1])
TEST(rt->rt_msg_received, 1); TEST(rt->rt_msg_received, 1);
if (multithread) {
TEST_1(su_task_is_running(su_clone_task(rt->rt_clone)));
}
else {
TEST_1(!su_task_is_running(su_clone_task(rt->rt_clone)));
}
/* Test su_wakeup() */
if (multithread) {
retval = -1;
rt->rt_t1 = 0;
TEST(su_task_execute(su_clone_task(rt->rt_clone),
set_deferrable_timer, rt,
&retval), 0);
TEST(retval, 0);
while (rt->rt_t1 == 0) {
TEST(su_root_step(rt->rt_root, 100), SU_WAIT_FOREVER);
su_task_wakeup(su_clone_task(rt->rt_clone));
}
}
/* Make sure 3-way handshake is done as expected */ /* Make sure 3-way handshake is done as expected */
TEST(su_msg_create(m, TEST(su_msg_create(m,
su_clone_task(rt->rt_clone), su_clone_task(rt->rt_clone),
@ -707,10 +813,21 @@ void usage(int exitcode)
exit(exitcode); exit(exitcode);
} }
#if HAVE_ALARM
#include <signal.h>
static RETSIGTYPE sig_alarm(int s)
{
fprintf(stderr, "%s: FAIL! test timeout!\n", name);
exit(1);
}
#endif
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
root_test_t *rt, rt0[1] = {{{ SU_HOME_INIT(rt0) }}}, rt1[1]; root_test_t *rt, rt0[1] = {{{ SU_HOME_INIT(rt0) }}}, rt1[1];
int retval = 0; int retval = 0;
int no_alarm = 0;
int i; int i;
struct { struct {
@ -746,6 +863,8 @@ int main(int argc, char *argv[])
rt->rt_flags |= tst_verbatim; rt->rt_flags |= tst_verbatim;
else if (strcmp(argv[i], "-a") == 0) else if (strcmp(argv[i], "-a") == 0)
rt->rt_flags |= tst_abort; rt->rt_flags |= tst_abort;
else if (strcmp(argv[i], "--no-alarm") == 0)
no_alarm = 1;
#if SU_HAVE_IN6 #if SU_HAVE_IN6
else if (strcmp(argv[i], "-6") == 0) else if (strcmp(argv[i], "-6") == 0)
rt->rt_family = AF_INET6; rt->rt_family = AF_INET6;
@ -754,6 +873,13 @@ int main(int argc, char *argv[])
usage(1); usage(1);
} }
#if HAVE_ALARM
if (!no_alarm) {
signal(SIGALRM, sig_alarm);
alarm(ALARM_IN_SECONDS);
}
#endif
#if HAVE_OPEN_C #if HAVE_OPEN_C
rt->rt_flags |= tst_verbatim; rt->rt_flags |= tst_verbatim;
#endif #endif
@ -770,10 +896,9 @@ int main(int argc, char *argv[])
retval |= init_test(rt, prefer[i].name, prefer[i].create, prefer[i].start); retval |= init_test(rt, prefer[i].name, prefer[i].create, prefer[i].start);
retval |= register_test(rt); retval |= register_test(rt);
retval |= event_test(rt); retval |= event_test(rt);
su_root_threading(rt->rt_root, 1); retval |= timer_test(rt);
retval |= clone_test(rt); retval |= clone_test(rt, 1);
su_root_threading(rt->rt_root, 0); retval |= clone_test(rt, 0);
retval |= clone_test(rt);
retval |= deinit_test(rt); retval |= deinit_test(rt);
} while (prefer[++i].create); } while (prefer[++i].create);