From 5305611be392b20c02c54f1f9868fe15c974e5fc Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Fri, 12 Mar 2010 18:27:24 +0000 Subject: [PATCH] added generic interrupt support git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@1057 a93c3328-9c30-0410-af19-c9cd2b2d52af --- libs/freetdm/TODO | 2 + libs/freetdm/src/ftdm_io.c | 138 ++++++++---- libs/freetdm/src/ftdm_queue.c | 29 ++- libs/freetdm/src/ftdm_threadmutex.c | 209 +++++++++++++----- .../ftmod_sangoma_boost/ftdm_sangoma_boost.h | 2 - .../ftmod_sangoma_boost/ftmod_sangoma_boost.c | 136 +++++------- .../sangoma_boost_client.c | 6 +- .../sangoma_boost_client.h | 2 +- libs/freetdm/src/include/freetdm.h | 69 +++--- libs/freetdm/src/include/ftdm_threadmutex.h | 11 +- libs/freetdm/src/include/ftdm_types.h | 1 + 11 files changed, 372 insertions(+), 233 deletions(-) diff --git a/libs/freetdm/TODO b/libs/freetdm/TODO index fb297b2eef..45b134bad3 100644 --- a/libs/freetdm/TODO +++ b/libs/freetdm/TODO @@ -5,3 +5,5 @@ then ftdm_event_t would be renamed to ftdm_oob_event_t and the enum_id renamed to type, then ftdm_span_next_event() will only return OOB events +- query span hw status (connected/disconnected) on startup + diff --git a/libs/freetdm/src/ftdm_io.c b/libs/freetdm/src/ftdm_io.c index 3e6cdef0e9..221bf34117 100644 --- a/libs/freetdm/src/ftdm_io.c +++ b/libs/freetdm/src/ftdm_io.c @@ -49,6 +49,8 @@ #include "ftdm_pika.h" #endif +#define SPAN_PENDING_CHANS_QUEUE_SIZE 1000 + static int time_is_init = 0; static void time_init(void) @@ -410,6 +412,7 @@ static ftdm_status_t ftdm_span_destroy(ftdm_span_t *span) } /* destroy final basic resources of the span data structure */ + ftdm_queue_destroy(&span->pendingchans); ftdm_mutex_unlock(span->mutex); ftdm_mutex_destroy(&span->mutex); ftdm_safe_free(span->signal_data); @@ -504,16 +507,19 @@ FT_DECLARE(ftdm_status_t) ftdm_span_create(ftdm_io_interface_t *fio, ftdm_span_t ftdm_span_t *new_span = NULL; ftdm_status_t status = FTDM_FAIL; - assert(fio != NULL); + ftdm_assert(fio != NULL, "No IO provided\n"); ftdm_mutex_lock(globals.mutex); if (globals.span_index < FTDM_MAX_SPANS_INTERFACE) { - new_span = ftdm_malloc(sizeof(*new_span)); - assert(new_span); - memset(new_span, 0, sizeof(*new_span)); + new_span = ftdm_calloc(sizeof(*new_span), 1); + ftdm_assert(new_span, "allocating span failed\n"); + status = ftdm_mutex_create(&new_span->mutex); - assert(status == FTDM_SUCCESS); + ftdm_assert(status == FTDM_SUCCESS, "mutex creation failed\n"); + + status = ftdm_queue_create(&new_span->pendingchans, SPAN_PENDING_CHANS_QUEUE_SIZE); + ftdm_assert(status == FTDM_SUCCESS, "span chans queue creation failed\n"); ftdm_set_flag(new_span, FTDM_SPAN_CONFIGURED); new_span->span_id = ++globals.span_index; @@ -1154,7 +1160,12 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_set_state(ftdm_channel_t *ftdmchan, ftdm_ if (ok) { ftdm_set_flag(ftdmchan, FTDM_CHANNEL_STATE_CHANGE); - ftdm_set_flag_locked(ftdmchan->span, FTDM_SPAN_STATE_CHANGE); + + ftdm_mutex_lock(ftdmchan->span->mutex); + ftdm_set_flag(ftdmchan->span, FTDM_SPAN_STATE_CHANGE); + ftdm_queue_enqueue(ftdmchan->span->pendingchans, ftdmchan); + ftdm_mutex_unlock(ftdmchan->span->mutex); + ftdmchan->last_state = ftdmchan->state; ftdmchan->state = state; } @@ -3155,44 +3166,85 @@ FT_DECLARE(int) ftdm_load_modules(void) FT_DECLARE(ftdm_status_t) ftdm_unload_modules(void) { - ftdm_hash_iterator_t *i; - ftdm_dso_lib_t lib; + ftdm_hash_iterator_t *i = NULL; + ftdm_dso_lib_t lib = NULL; + char modpath[255] = { 0 }; + /* stop signaling interfaces first as signaling depends on I/O and not the other way around */ for (i = hashtable_first(globals.module_hash); i; i = hashtable_next(i)) { - const void *key; - void *val; + const void *key = NULL; + void *val = NULL; + ftdm_module_t *mod = NULL; hashtable_this(i, &key, NULL, &val); - if (key && val) { - ftdm_module_t *mod = (ftdm_module_t *) val; - - if (!mod) { - continue; - } - - if (mod->io_unload) { - if (mod->io_unload() == FTDM_SUCCESS) { - ftdm_log(FTDM_LOG_INFO, "Unloading IO %s\n", mod->name); - } else { - ftdm_log(FTDM_LOG_ERROR, "Error unloading IO %s\n", mod->name); - } - } - - if (mod->sig_unload) { - if (mod->sig_unload() == FTDM_SUCCESS) { - ftdm_log(FTDM_LOG_INFO, "Unloading SIG %s\n", mod->name); - } else { - ftdm_log(FTDM_LOG_ERROR, "Error unloading SIG %s\n", mod->name); - } - } - - - ftdm_log(FTDM_LOG_INFO, "Unloading %s\n", mod->path); - lib = mod->lib; - ftdm_dso_destroy(&lib); - + if (!key || !val) { + continue; } + + mod = (ftdm_module_t *) val; + + if (!mod->sig_unload) { + continue; + } + + ftdm_log(FTDM_LOG_INFO, "Unloading signaling interface %s\n", mod->name); + + if (mod->sig_unload() != FTDM_SUCCESS) { + ftdm_log(FTDM_LOG_ERROR, "Error unloading signaling interface %s\n", mod->name); + continue; + } + + ftdm_log(FTDM_LOG_INFO, "Unloaded signaling interface %s\n", mod->name); + } + + /* Now go ahead with I/O interfaces */ + for (i = hashtable_first(globals.module_hash); i; i = hashtable_next(i)) { + const void *key = NULL; + void *val = NULL; + ftdm_module_t *mod = NULL; + + hashtable_this(i, &key, NULL, &val); + + if (!key || !val) { + continue; + } + + mod = (ftdm_module_t *) val; + + if (!mod->io_unload) { + continue; + } + + ftdm_log(FTDM_LOG_INFO, "Unloading I/O interface %s\n", mod->name); + + if (mod->io_unload() != FTDM_SUCCESS) { + ftdm_log(FTDM_LOG_ERROR, "Error unloading I/O interface %s\n", mod->name); + continue; + } + + ftdm_log(FTDM_LOG_INFO, "Unloaded I/O interface %s\n", mod->name); + } + + /* Now unload the actual shared object/dll */ + for (i = hashtable_first(globals.module_hash); i; i = hashtable_next(i)) { + ftdm_module_t *mod = NULL; + const void *key = NULL; + void *val = NULL; + + hashtable_this(i, &key, NULL, &val); + + if (!key || !val) { + continue; + } + + mod = (ftdm_module_t *) val; + + lib = mod->lib; + snprintf(modpath, sizeof(modpath), "%s", mod->path); + ftdm_log(FTDM_LOG_INFO, "Unloading module %s\n", modpath); + ftdm_dso_destroy(&lib); + ftdm_log(FTDM_LOG_INFO, "Unloaded module %s\n", modpath); } return FTDM_SUCCESS; @@ -3506,8 +3558,12 @@ FT_DECLARE(ftdm_status_t) ftdm_global_destroy(void) time_end(); globals.running = 0; + + globals.span_index = 0; + ftdm_span_close_all(); - ftdm_sleep(1000); + + ftdm_unload_modules(); ftdm_mutex_lock(globals.span_mutex); for (sp = globals.spans; sp;) { @@ -3529,10 +3585,6 @@ FT_DECLARE(ftdm_status_t) ftdm_global_destroy(void) globals.spans = NULL; ftdm_mutex_unlock(globals.span_mutex); - globals.span_index = 0; - - ftdm_unload_modules(); - ftdm_mutex_lock(globals.mutex); hashtable_destroy(globals.interface_hash); hashtable_destroy(globals.module_hash); diff --git a/libs/freetdm/src/ftdm_queue.c b/libs/freetdm/src/ftdm_queue.c index 92ea75c57a..60ee74737d 100644 --- a/libs/freetdm/src/ftdm_queue.c +++ b/libs/freetdm/src/ftdm_queue.c @@ -38,11 +38,12 @@ static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj); static void *ftdm_std_queue_dequeue(ftdm_queue_t *queue); static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms); +static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt); static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue); struct ftdm_queue { ftdm_mutex_t *mutex; - ftdm_condition_t *condition; + ftdm_interrupt_t *interrupt; ftdm_size_t capacity; ftdm_size_t size; unsigned rindex; @@ -56,6 +57,7 @@ FT_DECLARE_DATA ftdm_queue_handler_t g_ftdm_queue_handler = /*.enqueue = */ ftdm_std_queue_enqueue, /*.dequeue = */ ftdm_std_queue_dequeue, /*.wait = */ ftdm_std_queue_wait, + /*.get_interrupt = */ ftdm_std_queue_get_interrupt, /*.destroy = */ ftdm_std_queue_destroy }; @@ -66,6 +68,7 @@ FT_DECLARE(ftdm_status_t) ftdm_global_set_queue_handler(ftdm_queue_handler_t *ha !handler->enqueue || !handler->dequeue || !handler->wait || + !handler->get_interrupt || !handler->destroy) { return FTDM_FAIL; } @@ -95,7 +98,7 @@ static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t goto failed; } - if (ftdm_condition_create(&queue->condition, queue->mutex) != FTDM_SUCCESS) { + if (ftdm_interrupt_create(&queue->interrupt, FTDM_INVALID_SOCKET) != FTDM_SUCCESS) { goto failed; } @@ -104,8 +107,8 @@ static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t failed: if (queue) { - if (queue->condition) { - ftdm_condition_destroy(&queue->condition); + if (queue->interrupt) { + ftdm_interrupt_destroy(&queue->interrupt); } if (queue->mutex) { ftdm_mutex_destroy(&queue->mutex); @@ -139,7 +142,7 @@ static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj) status = FTDM_SUCCESS; /* wake up queue reader */ - ftdm_condition_signal(queue->condition); + ftdm_interrupt_signal(queue->interrupt); done: @@ -188,7 +191,7 @@ static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms) } /* no elements on the queue, wait for someone to write an element */ - ret = ftdm_condition_wait(queue->condition, ms); + ret = ftdm_interrupt_wait(queue->interrupt, ms); /* got an element or timeout, bail out */ ftdm_mutex_unlock(queue->mutex); @@ -196,14 +199,22 @@ static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms) return ret; } +static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt) +{ + ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!\n"); + ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Queue is null!\n"); + *interrupt = queue->interrupt; + return FTDM_SUCCESS; +} + static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue) { ftdm_queue_t *queue = NULL; - ftdm_assert_return(inqueue != NULL, FTDM_FAIL, "Queue is null!"); - ftdm_assert_return(*inqueue != NULL, FTDM_FAIL, "Queue is null!"); + ftdm_assert_return(inqueue != NULL, FTDM_FAIL, "Queue is null!\n"); + ftdm_assert_return(*inqueue != NULL, FTDM_FAIL, "Queue is null!\n"); queue = *inqueue; - ftdm_condition_destroy(&queue->condition); + ftdm_interrupt_destroy(&queue->interrupt); ftdm_mutex_destroy(&queue->mutex); ftdm_safe_free(queue->elements); ftdm_safe_free(queue); diff --git a/libs/freetdm/src/ftdm_threadmutex.c b/libs/freetdm/src/ftdm_threadmutex.c index 7446896dcd..49cdb59cc1 100644 --- a/libs/freetdm/src/ftdm_threadmutex.c +++ b/libs/freetdm/src/ftdm_threadmutex.c @@ -40,6 +40,7 @@ struct ftdm_mutex { #else #include +#include #define FTDM_THREAD_CALLING_CONVENTION @@ -49,16 +50,18 @@ struct ftdm_mutex { #endif -struct ftdm_condition { +struct ftdm_interrupt { + ftdm_socket_t device; #ifdef WIN32 - HANDLE condition; + /* for generic interruption */ + HANDLE event; #else - pthread_cond_t condition; + /* for generic interruption */ + int readfd; + int writefd; #endif - ftdm_mutex_t *mutex; }; - struct ftdm_thread { #ifdef WIN32 void *handle; @@ -238,123 +241,207 @@ FT_DECLARE(ftdm_status_t) _ftdm_mutex_unlock(ftdm_mutex_t *mutex) } -FT_DECLARE(ftdm_status_t) ftdm_condition_create(ftdm_condition_t **incondition, ftdm_mutex_t *mutex) +FT_DECLARE(ftdm_status_t) ftdm_interrupt_create(ftdm_interrupt_t **ininterrupt, ftdm_socket_t device) { - ftdm_condition_t *condition = NULL; + ftdm_interrupt_t *interrupt = NULL; +#ifndef WIN32 + int fds[2]; +#endif - ftdm_assert_return(incondition != NULL, FTDM_FAIL, "Condition double pointer is null!\n"); - ftdm_assert_return(mutex != NULL, FTDM_FAIL, "Mutex for condition must not be null!\n"); + ftdm_assert_return(ininterrupt != NULL, FTDM_FAIL, "interrupt double pointer is null!\n"); - condition = ftdm_calloc(1, sizeof(*condition)); - if (!condition) { + interrupt = ftdm_calloc(1, sizeof(*interrupt)); + if (!interrupt) { + ftdm_log(FTDM_LOG_ERROR, "Failed to allocate interrupt memory\n"); return FTDM_FAIL; } - condition->mutex = mutex; + interrupt->device = device; #ifdef WIN32 - condition->condition = CreateEvent(NULL, FALSE, FALSE, NULL); - if (!condition->condition) { + interrupt->interrupt = CreateEvent(NULL, FALSE, FALSE, NULL); + if (!interrupt->interrupt) { + ftdm_log(FTDM_LOG_ERROR, "Failed to allocate interrupt event\n"); goto failed; } #else - if (pthread_cond_init(&condition->condition, NULL)) { + if (pipe(fds)) { + ftdm_log(FTDM_LOG_ERROR, "Failed to allocate interrupt pipe: %s\n", strerror(errno)); goto failed; } + interrupt->readfd = fds[0]; + interrupt->writefd = fds[1]; #endif - *incondition = condition; + *ininterrupt = interrupt; return FTDM_SUCCESS; failed: - if (condition) { - ftdm_safe_free(condition); + if (interrupt) { +#ifndef WIN32 + if (interrupt->readfd) { + close(interrupt->readfd); + close(interrupt->writefd); + interrupt->readfd = -1; + interrupt->writefd = -1; + } +#endif + ftdm_safe_free(interrupt); } return FTDM_FAIL; } #define ONE_BILLION 1000000000 -FT_DECLARE(ftdm_status_t) ftdm_condition_wait(ftdm_condition_t *condition, int ms) +FT_DECLARE(ftdm_status_t) ftdm_interrupt_wait(ftdm_interrupt_t *interrupt, int ms) { + int num = 1; #ifdef WIN32 DWORD res = 0; + HANDLE ints[2]; +#else + int res = 0; + struct pollfd ints[2]; + char pipebuf[255]; #endif - ftdm_assert_return(condition != NULL, FTDM_FAIL, "Condition is null!\n"); + + ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Condition is null!\n"); + + + /* start implementation */ #ifdef WIN32 - ftdm_mutex_unlock(condition->mutex); - res = WaitForSingleObject(condition->condition, ms > 0 ? ms : INFINITE); - ftdm_mutex_lock(condition->mutex); + ints[0] = interrupt->event; + if (interrupt->device != FTDM_INVALID_SOCKET) { + num++; + ints[1] = interrupt->device; + } + res = WaitForMultipleObjects(num, &ints, FALSE, ms >= 0 ? ms : INFINITE); switch (res) { - case WAIT_ABANDONED: case WAIT_TIMEOUT: return FTDM_TIMEOUT; case WAIT_FAILED: + case WAIT_ABANDONED: /* is it right to fail with abandoned? */ return FTDM_FAIL; - case WAIT_OBJECT_0: - return FTDM_SUCCESS; default: - ftdm_log(FTDM_LOG_ERROR, "Error waiting for freetdm condition event (WaitForSingleObject returned %d)\n", res); - return FTDM_FAIL; + if (res >= (sizeof(ints)/sizeof(ints[0]))) { + ftdm_log(FTDM_LOG_ERROR, "Error waiting for freetdm interrupt event (WaitForSingleObject returned %d)\n", res); + return FTDM_FAIL; + } + return FTDM_SUCCESS; } #else - int res = 0; - if (ms > 0) { - struct timeval t; - struct timespec waitms; - gettimeofday(&t, NULL); - waitms.tv_sec = t.tv_sec + ( ms / 1000 ); - waitms.tv_nsec = 1000*(t.tv_usec + (1000 * ( ms % 1000 ))); - if (waitms.tv_nsec >= ONE_BILLION) { - waitms.tv_sec++; - waitms.tv_nsec -= ONE_BILLION; - } - res = pthread_cond_timedwait(&condition->condition, &condition->mutex->mutex, &waitms); - } else { - res = pthread_cond_wait(&condition->condition, &condition->mutex->mutex); - } - if (res != 0) { - if (res == ETIMEDOUT) { - return FTDM_TIMEOUT; - } + ints[0].fd = interrupt->readfd; + ints[0].events = POLLIN; + ints[0].revents = 0; - ftdm_log(FTDM_LOG_CRIT,"pthread_cond_timedwait failed (%d)\n", res); + if (interrupt->device != FTDM_INVALID_SOCKET) { + num++; + ints[1].fd = interrupt->device; + ints[1].events = POLLIN; + ints[1].revents = 0; + } + + res = poll(ints, num, ms); + + if (res == -1) { + ftdm_log(FTDM_LOG_CRIT, "interrupt poll failed (%s)\n", strerror(errno)); return FTDM_FAIL; } + + if (res == 0) { + return FTDM_TIMEOUT; + } + + if (ints[0].revents & POLLIN) { + res = read(ints[0].fd, pipebuf, sizeof(pipebuf)); + if (res == -1) { + ftdm_log(FTDM_LOG_CRIT, "reading interrupt descriptor failed (%s)\n", strerror(errno)); + } + } + return FTDM_SUCCESS; #endif } -FT_DECLARE(ftdm_status_t) ftdm_condition_signal(ftdm_condition_t *condition) +FT_DECLARE(ftdm_status_t) ftdm_interrupt_signal(ftdm_interrupt_t *interrupt) { - ftdm_assert_return(condition != NULL, FTDM_FAIL, "Condition is null!\n"); + ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Interrupt is null!\n"); #ifdef WIN32 - if (!SetEvent(condition->condition)) { + if (!SetEvent(interrupt->interrupt)) { + ftdm_log(FTDM_LOG_ERROR, "Failed to signal interrupt\n"); return FTDM_FAIL; } #else int err; - if ((err = pthread_cond_signal(&condition->condition))) { - ftdm_log(FTDM_LOG_ERROR, "Failed to signal condition %d:%s\n", err, strerror(err)); + if ((err = write(interrupt->writefd, "w", 1)) != 1) { + ftdm_log(FTDM_LOG_ERROR, "Failed to signal interrupt: %s\n", errno, strerror(errno)); return FTDM_FAIL; } #endif return FTDM_SUCCESS; } -FT_DECLARE(ftdm_status_t) ftdm_condition_destroy(ftdm_condition_t **incondition) +FT_DECLARE(ftdm_status_t) ftdm_interrupt_destroy(ftdm_interrupt_t **ininterrupt) { - ftdm_condition_t *condition = NULL; - ftdm_assert_return(incondition != NULL, FTDM_FAIL, "Condition null when destroying!\n"); - condition = *incondition; + ftdm_interrupt_t *interrupt = NULL; + ftdm_assert_return(ininterrupt != NULL, FTDM_FAIL, "Interrupt null when destroying!\n"); + interrupt = *ininterrupt; #ifdef WIN32 - CloseHandle(condition->condition); + CloseHandle(interrupt->interrupt); #else - if (pthread_cond_destroy(&condition->condition)) { + close(interrupt->readfd); + close(interrupt->writefd); + interrupt->readfd = -1; + interrupt->writefd = -1; +#endif + ftdm_safe_free(interrupt); + *ininterrupt = NULL; + return FTDM_SUCCESS; +} + +FT_DECLARE(ftdm_status_t) ftdm_interrupt_multiple_wait(ftdm_interrupt_t *interrupts[], ftdm_size_t size, int ms) +{ +#ifndef WIN32 + int i; + int res = 0; + int numdevices = 0; + char pipebuf[255]; + struct pollfd ints[size*2]; + + memset(&ints, 0, sizeof(ints)); + + for (i = 0; i < size; i++) { + ints[i].events = POLLIN; + ints[i].revents = 0; + ints[i].fd = interrupts[i]->readfd; + if (interrupts[i]->device != FTDM_INVALID_SOCKET) { + ints[i+numdevices].events = POLLIN; + ints[i+numdevices].revents = 0; + ints[i+numdevices].fd = interrupts[i]->device; + numdevices++; + } + } + + res = poll(ints, size + numdevices, ms); + + if (res == -1) { + ftdm_log(FTDM_LOG_CRIT, "interrupt poll failed (%s)\n", strerror(errno)); return FTDM_FAIL; } + + if (res == 0) { + return FTDM_TIMEOUT; + } + + for (i = size; i < ftdm_array_len(ints); i++) { + if (ints[i].revents & POLLIN) { + res = read(ints[0].fd, pipebuf, sizeof(pipebuf)); + if (res == -1) { + ftdm_log(FTDM_LOG_CRIT, "reading interrupt descriptor failed (%s)\n", strerror(errno)); + } + } + } + #endif - ftdm_safe_free(condition); - *incondition = NULL; return FTDM_SUCCESS; } diff --git a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftdm_sangoma_boost.h b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftdm_sangoma_boost.h index d4659b1141..31a231f309 100644 --- a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftdm_sangoma_boost.h +++ b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftdm_sangoma_boost.h @@ -46,8 +46,6 @@ typedef enum { typedef struct ftdm_sangoma_boost_data { sangomabc_connection_t mcon; sangomabc_connection_t pcon; - fd_set rfds; - fd_set efds; int iteration; uint32_t flags; boost_sigmod_interface_t *sigmod; diff --git a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftmod_sangoma_boost.c b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftmod_sangoma_boost.c index 339a5a4203..1f9e607490 100644 --- a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftmod_sangoma_boost.c +++ b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/ftmod_sangoma_boost.c @@ -39,7 +39,7 @@ */ /* NOTE: -On WIN32 platform this code works with sigmod ONLY, don't try to make sense of any socket code for win32 +On __WINDOWS__ platform this code works with sigmod ONLY, don't try to make sense of any socket code for win I basically ifdef out everything that the compiler complained about */ @@ -1016,11 +1016,8 @@ static void handle_heartbeat(sangomabc_connection_t *mcon, sangomabc_short_event err = sangomabc_connection_writep(mcon, (sangomabc_event_t*)event); if (err <= 0) { - ftdm_log(FTDM_LOG_CRIT, "Failed to tx on ISUP socket [%s]: %s\n", strerror(errno)); + ftdm_log(FTDM_LOG_CRIT, "Failed to tx on boost connection [%s]: %s\n", strerror(errno)); } - - mcon->hb_elapsed = 0; - return; } @@ -1050,7 +1047,6 @@ static void handle_restart(sangomabc_connection_t *mcon, ftdm_span_t *span, sang ftdm_set_flag_locked(span, FTDM_SPAN_SUSPENDED); ftdm_set_flag(sangoma_boost_data, FTDM_SANGOMA_BOOST_RESTARTING); - mcon->hb_elapsed = 0; } /** @@ -1452,6 +1448,7 @@ static __inline__ void init_outgoing_array(void) */ static __inline__ void check_state(ftdm_span_t *span) { + ftdm_channel_t *ftdmchan = NULL; ftdm_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; int susp = ftdm_test_flag(span, FTDM_SPAN_SUSPENDED); @@ -1462,17 +1459,23 @@ static __inline__ void check_state(ftdm_span_t *span) if (ftdm_test_flag(span, FTDM_SPAN_STATE_CHANGE) || susp) { uint32_t j; ftdm_clear_flag_locked(span, FTDM_SPAN_STATE_CHANGE); - for(j = 1; j <= span->chan_count; j++) { - if (ftdm_test_flag((span->channels[j]), FTDM_CHANNEL_STATE_CHANGE) || susp) { + if (susp) { + for (j = 0; j <= span->chan_count; j++) { ftdm_mutex_lock(span->channels[j]->mutex); ftdm_clear_flag((span->channels[j]), FTDM_CHANNEL_STATE_CHANGE); - if (susp && span->channels[j]->state != FTDM_CHANNEL_STATE_DOWN) { - ftdm_channel_set_state(span->channels[j], FTDM_CHANNEL_STATE_RESTART, 0); - } + ftdm_channel_set_state(span->channels[j], FTDM_CHANNEL_STATE_RESTART, 0); state_advance(span->channels[j]); ftdm_channel_complete_state(span->channels[j]); ftdm_mutex_unlock(span->channels[j]->mutex); } + } else { + while ((ftdmchan = ftdm_queue_dequeue(span->pendingchans))) { + ftdm_mutex_lock(ftdmchan->mutex); + ftdm_clear_flag(ftdmchan, FTDM_CHANNEL_STATE_CHANGE); + state_advance(ftdmchan); + ftdm_channel_complete_state(ftdmchan); + ftdm_mutex_unlock(ftdmchan->mutex); + } } } @@ -1487,7 +1490,6 @@ static __inline__ void check_state(ftdm_span_t *span) ftdm_clear_flag(sangoma_boost_data, FTDM_SANGOMA_BOOST_RESTARTING); ftdm_clear_flag_locked(span, FTDM_SPAN_SUSPENDED); ftdm_clear_flag((&sangoma_boost_data->mcon), MSU_FLAG_DOWN); - sangoma_boost_data->mcon.hb_elapsed = 0; init_outgoing_array(); } } @@ -1596,6 +1598,18 @@ static ftdm_status_t ftdm_boost_connection_open(ftdm_span_t *span) ftdm_log(FTDM_LOG_ERROR, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno)); return FTDM_FAIL; } + + /* try to create the boost sockets interrupt objects */ + if (ftdm_interrupt_create(&sangoma_boost_data->pcon.sock_interrupt, sangoma_boost_data->pcon.socket) != FTDM_SUCCESS) { + ftdm_log(FTDM_LOG_ERROR, "Span %s could not create its boost msock interrupt!\n", span->name); + return FTDM_FAIL; + } + + if (ftdm_interrupt_create(&sangoma_boost_data->mcon.sock_interrupt, sangoma_boost_data->mcon.socket) != FTDM_SUCCESS) { + ftdm_log(FTDM_LOG_ERROR, "Span %s could not create its boost psock interrupt!\n", span->name); + return FTDM_FAIL; + } + return FTDM_SUCCESS; } @@ -1603,49 +1617,34 @@ static ftdm_status_t ftdm_boost_connection_open(ftdm_span_t *span) \brief wait for a boost event \return -1 on error, 0 on timeout, 1 when there are events */ -static int ftdm_boost_wait_event(ftdm_span_t *span, int ms) +static int ftdm_boost_wait_event(ftdm_span_t *span) { -#ifndef WIN32 - struct timeval tv = { 0, ms * 1000 }; - sangomabc_connection_t *mcon, *pcon; - int max, activity; -#endif + ftdm_status_t res; + ftdm_interrupt_t *ints[3]; + int numints; ftdm_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; + ftdm_queue_get_interrupt(span->pendingchans, &ints[0]); + numints = 1; + /* if in queue mode wait for both the pendingchans queue and the boost msg queue */ if (sangoma_boost_data->sigmod) { - ftdm_status_t res; - res = ftdm_queue_wait(sangoma_boost_data->boost_queue, ms); - if (FTDM_TIMEOUT == res) { - return 0; - } - if (FTDM_SUCCESS != res) { - return -1; - } - return 1; + ftdm_queue_get_interrupt(sangoma_boost_data->boost_queue, &ints[1]); + numints = 2; + } +#ifndef __WINDOWS__ + else { + /* socket mode ... */ + ints[1] = sangoma_boost_data->mcon.sock_interrupt; + ints[2] = sangoma_boost_data->pcon.sock_interrupt; + numints = 3; + sangoma_boost_data->iteration = 0; } -#ifndef WIN32 - mcon = &sangoma_boost_data->mcon; - pcon = &sangoma_boost_data->pcon; - - FD_ZERO(&sangoma_boost_data->rfds); - FD_ZERO(&sangoma_boost_data->efds); - FD_SET(mcon->socket, &sangoma_boost_data->rfds); - FD_SET(mcon->socket, &sangoma_boost_data->efds); - FD_SET(pcon->socket, &sangoma_boost_data->rfds); - FD_SET(pcon->socket, &sangoma_boost_data->efds); - sangoma_boost_data->iteration = 0; - - max = ((pcon->socket > mcon->socket) ? pcon->socket : mcon->socket) + 1; - if ((activity = select(max, &sangoma_boost_data->rfds, NULL, &sangoma_boost_data->efds, &tv)) < 0) { - return -1; - } - - if (FD_ISSET(pcon->socket, &sangoma_boost_data->efds) || FD_ISSET(mcon->socket, &sangoma_boost_data->efds)) { - return -1; - } - - return 1; #endif + res = ftdm_interrupt_multiple_wait(ints, numints, -1); + if (FTDM_SUCCESS != res) { + ftdm_log(FTDM_LOG_CRIT, "Unexpected return value from interrupt waiting: %d\n", res); + return -1; + } return 0; } @@ -1659,19 +1658,13 @@ static sangomabc_event_t *ftdm_boost_read_event(ftdm_span_t *span) mcon = &sangoma_boost_data->mcon; pcon = &sangoma_boost_data->pcon; - if (sangoma_boost_data->sigmod -#ifndef WIN32 - || FD_ISSET(pcon->socket, &sangoma_boost_data->rfds) -#endif - ) { - event = sangomabc_connection_readp(pcon, sangoma_boost_data->iteration); - } -#ifndef WIN32 + event = sangomabc_connection_readp(pcon, sangoma_boost_data->iteration); + /* if there is no event and this is not a sigmod-driven span it's time to try the other connection for events */ - if (!event && !sangoma_boost_data->sigmod && FD_ISSET(mcon->socket, &sangoma_boost_data->rfds)) { - event = sangomabc_connection_readp(mcon, sangoma_boost_data->iteration); + if (!event && !sangoma_boost_data->sigmod) { + event = sangomabc_connection_read(mcon, sangoma_boost_data->iteration); } -#endif + return event; } @@ -1684,7 +1677,6 @@ static void *ftdm_sangoma_boost_run(ftdm_thread_t *me, void *obj) { ftdm_span_t *span = (ftdm_span_t *) obj; sangomabc_connection_t *mcon, *pcon; - uint32_t ms = 10; ftdm_sangoma_boost_data_t *sangoma_boost_data = span->signal_data; mcon = &sangoma_boost_data->mcon; @@ -1719,7 +1711,6 @@ static void *ftdm_sangoma_boost_run(ftdm_thread_t *me, void *obj) while (ftdm_test_flag(sangoma_boost_data, FTDM_SANGOMA_BOOST_RUNNING)) { sangomabc_event_t *event = NULL; - int activity = 0; if (!ftdm_running()) { if (!sangoma_boost_data->sigmod) { @@ -1735,27 +1726,17 @@ static void *ftdm_sangoma_boost_run(ftdm_thread_t *me, void *obj) break; } - if ((activity = ftdm_boost_wait_event(span, ms)) < 0) { + if (ftdm_boost_wait_event(span) < 0) { ftdm_log(FTDM_LOG_ERROR, "ftdm_boost_wait_event failed\n"); goto error; } - if (activity) { - while ((event = ftdm_boost_read_event(span))) { - parse_sangoma_event(span, pcon, (sangomabc_short_event_t*)event); - sangoma_boost_data->iteration++; - } + while ((event = ftdm_boost_read_event(span))) { + parse_sangoma_event(span, pcon, (sangomabc_short_event_t*)event); + sangoma_boost_data->iteration++; } - pcon->hb_elapsed += ms; - - if (ftdm_test_flag(span, FTDM_SPAN_SUSPENDED) || ftdm_test_flag(mcon, MSU_FLAG_DOWN)) { - pcon->hb_elapsed = 0; - } - - if (ftdm_running()) { - check_state(span); - } + check_state(span); } goto end; @@ -2235,6 +2216,7 @@ static FIO_CONFIGURE_SPAN_SIGNALING_FUNCTION(ftdm_sangoma_boost_configure_span) sangoma_boost_data->sigmod = sigmod_iface; sigmod_iface->configure_span(span, ftdm_parameters); } else { + ftdm_log(FTDM_LOG_NOTICE, "Span %s will use boost socket mode\n", span->name); ftdm_set_string(sangoma_boost_data->mcon.cfg.local_ip, local_ip); sangoma_boost_data->mcon.cfg.local_port = local_port; ftdm_set_string(sangoma_boost_data->mcon.cfg.remote_ip, remote_ip); diff --git a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.c b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.c index 626074dc84..c5cb97119f 100644 --- a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.c +++ b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.c @@ -311,11 +311,7 @@ sangomabc_event_t *__sangomabc_connection_read(sangomabc_connection_t *mcon, int ftdm_log(FTDM_LOG_CRIT, "Invalid Boost Version %i Expecting %i\n",mcon->event.version, SIGBOOST_VERSION); } - /* Must check for < 0 cannot rely on bytes > MIN_SIZE_... compiler issue */ - if (bytes < 0) { - msg_ok=0; - - } else if ((bytes >= MIN_SIZE_CALLSTART_MSG) && boost_full_event(mcon->event.event_id)) { + if ((bytes >= MIN_SIZE_CALLSTART_MSG) && boost_full_event(mcon->event.event_id)) { msg_ok=1; } else if (bytes == sizeof(sangomabc_short_event_t)) { diff --git a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.h b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.h index 2e18d406e5..12c3f12b8c 100644 --- a/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.h +++ b/libs/freetdm/src/ftmod/ftmod_sangoma_boost/sangoma_boost_client.h @@ -105,10 +105,10 @@ struct sangomabc_connection { unsigned int txwindow; unsigned int rxseq_reset; sangomabc_ip_cfg_t cfg; - uint32_t hb_elapsed; /* boost signaling mod interface pointer (if not working in TCP mode) */ boost_sigmod_interface_t *sigmod; ftdm_queue_t *boost_queue; + ftdm_interrupt_t *sock_interrupt; ftdm_span_t *span; }; diff --git a/libs/freetdm/src/include/freetdm.h b/libs/freetdm/src/include/freetdm.h index aa1da3af56..966cca210b 100644 --- a/libs/freetdm/src/include/freetdm.h +++ b/libs/freetdm/src/include/freetdm.h @@ -381,6 +381,42 @@ struct ftdm_event { void *data; }; +typedef struct ftdm_queue ftdm_queue_t; +typedef ftdm_status_t (*ftdm_queue_create_func_t)(ftdm_queue_t **queue, ftdm_size_t capacity); +typedef ftdm_status_t (*ftdm_queue_enqueue_func_t)(ftdm_queue_t *queue, void *obj); +typedef void *(*ftdm_queue_dequeue_func_t)(ftdm_queue_t *queue); +typedef ftdm_status_t (*ftdm_queue_wait_func_t)(ftdm_queue_t *queue, int ms); +typedef ftdm_status_t (*ftdm_queue_get_interrupt_func_t)(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt); +typedef ftdm_status_t (*ftdm_queue_destroy_func_t)(ftdm_queue_t **queue); +typedef struct ftdm_queue_handler { + ftdm_queue_create_func_t create; + ftdm_queue_enqueue_func_t enqueue; + ftdm_queue_dequeue_func_t dequeue; + ftdm_queue_wait_func_t wait; + ftdm_queue_get_interrupt_func_t get_interrupt; + ftdm_queue_destroy_func_t destroy; +} ftdm_queue_handler_t; +FT_DECLARE_DATA extern ftdm_queue_handler_t g_ftdm_queue_handler; + +/*! brief create a new queue */ +#define ftdm_queue_create(queue, capacity) g_ftdm_queue_handler.create(queue, capacity) + +/*! Enqueue an object */ +#define ftdm_queue_enqueue(queue, obj) g_ftdm_queue_handler.enqueue(queue, obj) + +/*! dequeue an object from the queue */ +#define ftdm_queue_dequeue(queue) g_ftdm_queue_handler.dequeue(queue) + +/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */ +#define ftdm_queue_wait(queue, ms) g_ftdm_queue_handler.wait(queue, ms) + +/*! get the internal interrupt object (to wait for elements to be added from the outside bypassing ftdm_queue_wait) */ +#define ftdm_queue_get_interrupt(queue, ms) g_ftdm_queue_handler.get_interrupt(queue, ms) + +/*! destroy the queue */ +#define ftdm_queue_destroy(queue) g_ftdm_queue_handler.destroy(queue) + + #define FTDM_TOKEN_STRLEN 128 #define FTDM_MAX_TOKENS 10 @@ -621,6 +657,7 @@ struct ftdm_span { int suggest_chan_id; ftdm_state_map_t *state_map; ftdm_caller_data_t default_caller_data; + ftdm_queue_t *pendingchans; struct ftdm_span *next; }; @@ -673,36 +710,6 @@ struct ftdm_io_interface { fio_api_t api; }; -typedef struct ftdm_queue ftdm_queue_t; -typedef ftdm_status_t (*ftdm_queue_create_func_t)(ftdm_queue_t **queue, ftdm_size_t capacity); -typedef ftdm_status_t (*ftdm_queue_enqueue_func_t)(ftdm_queue_t *queue, void *obj); -typedef void *(*ftdm_queue_dequeue_func_t)(ftdm_queue_t *queue); -typedef ftdm_status_t (*ftdm_queue_wait_func_t)(ftdm_queue_t *queue, int ms); -typedef ftdm_status_t (*ftdm_queue_destroy_func_t)(ftdm_queue_t **queue); -typedef struct ftdm_queue_handler { - ftdm_queue_create_func_t create; - ftdm_queue_enqueue_func_t enqueue; - ftdm_queue_dequeue_func_t dequeue; - ftdm_queue_wait_func_t wait; - ftdm_queue_destroy_func_t destroy; -} ftdm_queue_handler_t; -FT_DECLARE_DATA extern ftdm_queue_handler_t g_ftdm_queue_handler; - -/*! brief create a new queue */ -#define ftdm_queue_create(queue, capacity) g_ftdm_queue_handler.create(queue, capacity) - -/*! Enqueue an object */ -#define ftdm_queue_enqueue(queue, obj) g_ftdm_queue_handler.enqueue(queue, obj) - -/*! dequeue an object from the queue */ -#define ftdm_queue_dequeue(queue) g_ftdm_queue_handler.dequeue(queue) - -/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */ -#define ftdm_queue_wait(queue, ms) g_ftdm_queue_handler.wait(queue, ms) - -/*! destroy the queue */ -#define ftdm_queue_destroy(queue) g_ftdm_queue_handler.destroy(queue) - /*! \brief Override the default queue handler */ FT_DECLARE(ftdm_status_t) ftdm_global_set_queue_handler(ftdm_queue_handler_t *handler); @@ -890,6 +897,8 @@ FIO_CODEC_FUNCTION(fio_alaw2ulaw); */ #define ftdm_socket_close(it) if (it > -1) { close(it); it = -1;} +#define ftdm_array_len(array) sizeof(array)/sizeof(array[0]) + static __inline__ void ftdm_abort(void) { #ifdef __cplusplus diff --git a/libs/freetdm/src/include/ftdm_threadmutex.h b/libs/freetdm/src/include/ftdm_threadmutex.h index a970eda522..f67d2840fe 100644 --- a/libs/freetdm/src/include/ftdm_threadmutex.h +++ b/libs/freetdm/src/include/ftdm_threadmutex.h @@ -32,7 +32,7 @@ extern "C" { #endif typedef struct ftdm_mutex ftdm_mutex_t; typedef struct ftdm_thread ftdm_thread_t; -typedef struct ftdm_condition ftdm_condition_t; +typedef struct ftdm_interrupt ftdm_interrupt_t; typedef void *(*ftdm_thread_function_t) (ftdm_thread_t *, void *); FT_DECLARE(ftdm_status_t) ftdm_thread_create_detached(ftdm_thread_function_t func, void *data); @@ -43,10 +43,11 @@ FT_DECLARE(ftdm_status_t) ftdm_mutex_destroy(ftdm_mutex_t **mutex); FT_DECLARE(ftdm_status_t) _ftdm_mutex_lock(ftdm_mutex_t *mutex); FT_DECLARE(ftdm_status_t) _ftdm_mutex_trylock(ftdm_mutex_t *mutex); FT_DECLARE(ftdm_status_t) _ftdm_mutex_unlock(ftdm_mutex_t *mutex); -FT_DECLARE(ftdm_status_t) ftdm_condition_create(ftdm_condition_t **cond, ftdm_mutex_t *mutex); -FT_DECLARE(ftdm_status_t) ftdm_condition_destroy(ftdm_condition_t **cond); -FT_DECLARE(ftdm_status_t) ftdm_condition_signal(ftdm_condition_t *cond); -FT_DECLARE(ftdm_status_t) ftdm_condition_wait(ftdm_condition_t *cond, int ms); +FT_DECLARE(ftdm_status_t) ftdm_interrupt_create(ftdm_interrupt_t **cond, ftdm_socket_t device); +FT_DECLARE(ftdm_status_t) ftdm_interrupt_destroy(ftdm_interrupt_t **cond); +FT_DECLARE(ftdm_status_t) ftdm_interrupt_signal(ftdm_interrupt_t *cond); +FT_DECLARE(ftdm_status_t) ftdm_interrupt_wait(ftdm_interrupt_t *cond, int ms); +FT_DECLARE(ftdm_status_t) ftdm_interrupt_multiple_wait(ftdm_interrupt_t *interrupts[], ftdm_size_t size, int ms); #ifdef __cplusplus } diff --git a/libs/freetdm/src/include/ftdm_types.h b/libs/freetdm/src/include/ftdm_types.h index 3d45185073..7260358c8f 100644 --- a/libs/freetdm/src/include/ftdm_types.h +++ b/libs/freetdm/src/include/ftdm_types.h @@ -40,6 +40,7 @@ #define FTDM_TYPES_H #include "fsk.h" +#define FTDM_INVALID_SOCKET -1 #ifdef WIN32 #include typedef HANDLE ftdm_socket_t;