diff --git a/libs/openzap/Makefile.am b/libs/openzap/Makefile.am index 9f0c74869a..3d8094fba2 100644 --- a/libs/openzap/Makefile.am +++ b/libs/openzap/Makefile.am @@ -71,7 +71,8 @@ $(SRC)/libteletone_detect.c \ $(SRC)/libteletone_generate.c \ $(SRC)/zap_buffer.c \ $(SRC)/zap_threadmutex.c \ -$(SRC)/zap_dso.c +$(SRC)/zap_dso.c \ +$(SRC)/zap_cpu_monitor.c library_include_HEADERS = \ $(SRC)/include/fsk.h \ @@ -89,7 +90,8 @@ $(SRC)/include/zap_buffer.h \ $(SRC)/include/zap_config.h \ $(SRC)/include/zap_threadmutex.h \ $(SRC)/include/zap_dso.h \ -$(SRC)/include/zap_types.h +$(SRC)/include/zap_types.h \ +$(SRC)/include/zap_cpu_monitor.h lib_LTLIBRARIES = libopenzap.la libopenzap_la_CFLAGS = $(AM_CFLAGS) $(MY_CFLAGS) diff --git a/libs/openzap/mod_openzap/mod_openzap.c b/libs/openzap/mod_openzap/mod_openzap.c index fddf2de69a..c9159f3cb7 100644 --- a/libs/openzap/mod_openzap/mod_openzap.c +++ b/libs/openzap/mod_openzap/mod_openzap.c @@ -3119,7 +3119,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_openzap_load) module_pool = pool; zap_global_set_logger(zap_logger); - + zap_cpu_monitor_disable(); + if (zap_global_init() != ZAP_SUCCESS) { zap_log(ZAP_LOG_ERROR, "Error loading OpenZAP\n"); return SWITCH_STATUS_TERM; diff --git a/libs/openzap/src/include/openzap.h b/libs/openzap/src/include/openzap.h index 3f88772cd8..18275c43bf 100644 --- a/libs/openzap/src/include/openzap.h +++ b/libs/openzap/src/include/openzap.h @@ -716,6 +716,7 @@ OZ_DECLARE(zap_status_t) zap_span_find_by_name(const char *name, zap_span_t **sp OZ_DECLARE(char *) zap_api_execute(const char *type, const char *cmd); OZ_DECLARE(int) zap_vasprintf(char **ret, const char *fmt, va_list ap); OZ_DECLARE(zap_status_t) zap_channel_set_caller_data(zap_channel_t *zchan, zap_caller_data_t *caller_data); +OZ_DECLARE(void) zap_cpu_monitor_disable(void); ZIO_CODEC_FUNCTION(zio_slin2ulaw); ZIO_CODEC_FUNCTION(zio_ulaw2slin); diff --git a/libs/openzap/src/include/zap_threadmutex.h b/libs/openzap/src/include/zap_threadmutex.h index 6e2af571ac..bc043ca180 100644 --- a/libs/openzap/src/include/zap_threadmutex.h +++ b/libs/openzap/src/include/zap_threadmutex.h @@ -36,6 +36,11 @@ OZ_DECLARE(zap_status_t) _zap_mutex_lock(zap_mutex_t *mutex); OZ_DECLARE(zap_status_t) _zap_mutex_trylock(zap_mutex_t *mutex); OZ_DECLARE(zap_status_t) _zap_mutex_unlock(zap_mutex_t *mutex); +OZ_DECLARE(zap_status_t) zap_interrupt_create(zap_interrupt_t **ininterrupt, zap_socket_t device); +OZ_DECLARE(zap_status_t) zap_interrupt_wait(zap_interrupt_t *interrupt, int ms); +OZ_DECLARE(zap_status_t) zap_interrupt_signal(zap_interrupt_t *interrupt); +OZ_DECLARE(zap_status_t) zap_interrupt_destroy(zap_interrupt_t **ininterrupt); +OZ_DECLARE(zap_status_t) zap_interrupt_multiple_wait(zap_interrupt_t *interrupts[], zap_size_t size, int ms); #endif /* For Emacs: diff --git a/libs/openzap/src/include/zap_types.h b/libs/openzap/src/include/zap_types.h index f318da6d56..1ac2e86ba4 100644 --- a/libs/openzap/src/include/zap_types.h +++ b/libs/openzap/src/include/zap_types.h @@ -35,6 +35,9 @@ #define ZAP_TYPES_H #include "fsk.h" +#define ZAP_INVALID_SOCKET -1 +#define zap_array_len(obj) sizeof(obj)/sizeof(obj[0]) + #ifdef WIN32 #include typedef HANDLE zap_socket_t; @@ -62,6 +65,7 @@ typedef int zap_filehandle_t; typedef size_t zap_size_t; struct zap_io_interface; +typedef struct zap_interrupt zap_interrupt_t; #define ZAP_COMMAND_OBJ_INT *((int *)obj) #define ZAP_COMMAND_OBJ_CHAR_P (char *)obj diff --git a/libs/openzap/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c b/libs/openzap/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c index a8609a0652..6149cb1c9b 100644 --- a/libs/openzap/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c +++ b/libs/openzap/src/ozmod/ozmod_sangoma_boost/ozmod_sangoma_boost.c @@ -726,6 +726,7 @@ static __inline__ void advance_chan_states(zap_channel_t *zchan); static void handle_call_start(zap_span_t *span, sangomabc_connection_t *mcon, sangomabc_event_t *event) { zap_channel_t *zchan; + int hangup_cause = ZAP_CAUSE_CALL_REJECTED; if (!(zchan = find_zchan(span, (sangomabc_short_event_t*)event, 0))) { if ((zchan = find_zchan(span, (sangomabc_short_event_t*)event, 1))) { @@ -793,16 +794,18 @@ static void handle_call_start(zap_span_t *span, sangomabc_connection_t *mcon, sa return; error: - if (!zchan) { + if (zchan) { + hangup_cause = zchan->caller_data.hangup_cause; + } else { zap_log(ZAP_LOG_CRIT, "START CANT FIND A CHAN %d:%d\n", event->span+1,event->chan+1); + hangup_cause = ZAP_CAUSE_REQUESTED_CHAN_UNAVAIL; } - sangomabc_exec_command(mcon, event->span, event->chan, 0, SIGBOOST_EVENT_CALL_START_NACK, - 0, 0); + hangup_cause, 0); } diff --git a/libs/openzap/src/zap_io.c b/libs/openzap/src/zap_io.c index 8c11bae87e..f0acf1ba27 100644 --- a/libs/openzap/src/zap_io.c +++ b/libs/openzap/src/zap_io.c @@ -44,6 +44,7 @@ #ifdef ZAP_PIKA_SUPPORT #include "zap_pika.h" #endif +#include "zap_cpu_monitor.h" static int time_is_init = 0; @@ -63,6 +64,7 @@ static void time_end(void) time_is_init = 0; } + OZ_DECLARE(zap_time_t) zap_current_time_in_ms(void) { #ifdef WIN32 @@ -74,6 +76,16 @@ OZ_DECLARE(zap_time_t) zap_current_time_in_ms(void) #endif } +typedef struct { + uint8_t running; + uint8_t alarm; + uint32_t interval; + uint8_t alarm_action_flags; + uint8_t set_alarm_threshold; + uint8_t reset_alarm_threshold; + zap_interrupt_t *interrupt; +} cpu_monitor_t; + static struct { zap_hash_t *interface_hash; zap_hash_t *module_hash; @@ -83,8 +95,16 @@ static struct { uint32_t span_index; uint32_t running; zap_span_t *spans; + cpu_monitor_t cpu_monitor; } globals; +static uint8_t zap_cpu_monitor_disabled = 0; + +enum zap_enum_cpu_alarm_action_flags +{ + ZAP_CPU_ALARM_ACTION_WARN = (1 << 0), + ZAP_CPU_ALARM_ACTION_REJECT = (1 << 1) +}; /* enum lookup funcs */ ZAP_ENUM_NAMES(TONEMAP_NAMES, TONEMAP_STRINGS) @@ -111,6 +131,8 @@ ZAP_STR2ENUM(zap_str2zap_mdmf_type, zap_mdmf_type2str, zap_mdmf_type_t, MDMF_TYP ZAP_ENUM_NAMES(CHAN_TYPE_NAMES, CHAN_TYPE_STRINGS) ZAP_STR2ENUM(zap_str2zap_chan_type, zap_chan_type2str, zap_chan_type_t, CHAN_TYPE_NAMES, ZAP_CHAN_TYPE_COUNT) +static zap_status_t zap_cpu_monitor_start(cpu_monitor_t* monitor_params); +static void zap_cpu_monitor_stop(cpu_monitor_t* monitor_params); static const char *cut_path(const char *in) { @@ -148,8 +170,12 @@ static const char *LEVEL_NAMES[] = { NULL }; + static int zap_log_level = 7; +/* Cpu monitor thread */ +static void *zap_cpu_monitor_run(zap_thread_t *me, void *obj); + static void default_logger(const char *file, const char *func, int line, int level, const char *fmt, ...) { const char *fp; @@ -1154,6 +1180,14 @@ OZ_DECLARE(zap_status_t) zap_channel_open_chan(zap_channel_t *zchan) snprintf(zchan->last_error, sizeof(zchan->last_error), "%s", "Channel is suspended"); return ZAP_FAIL; } + if (globals.cpu_monitor.alarm && + globals.cpu_monitor.alarm_action_flags & ZAP_CPU_ALARM_ACTION_REJECT) { + + snprintf(zchan->last_error, sizeof(zchan->last_error), "%s", "CPU usage alarm is on - refusing to open channel\n"); + zap_log(ZAP_LOG_WARNING, "CPU usage alarm is on - refusing to open channel\n"); + zchan->caller_data.hangup_cause = ZAP_CAUSE_SWITCH_CONGESTION; + return ZAP_FAIL; + } if (!zap_test_flag(zchan, ZAP_CHANNEL_READY) || (status = zap_mutex_trylock(zchan->mutex)) != ZAP_SUCCESS) { snprintf(zchan->last_error, sizeof(zchan->last_error), "Channel is not ready or is in use %d %d", zap_test_flag(zchan, ZAP_CHANNEL_READY), status); @@ -2540,6 +2574,43 @@ static zap_status_t load_config(void) } else { zap_log(ZAP_LOG_ERROR, "unknown span variable '%s'\n", var); } + } else if (!strncasecmp(cfg.category, "general", 7)) { + if (!strncasecmp(var, "cpu_monitoring_interval", 24)) { + if (atoi(val) > 0) { + globals.cpu_monitor.interval = atoi(val); + } else { + zap_log(ZAP_LOG_ERROR, "Invalid cpu monitoring interval %s\n", val); + } + } else if (!strncasecmp(var, "cpu_set_alarm_threshold", 22)) { + if (atoi(val) > 0 && atoi(val) < 100) { + globals.cpu_monitor.set_alarm_threshold = atoi(val); + } else { + zap_log(ZAP_LOG_ERROR, "Invalid cpu alarm set threshold %s\n", val); + } + } else if (!strncasecmp(var, "cpu_reset_alarm_threshold", 22)) { + if (atoi(val) > 0 && atoi(val) < 100) { + globals.cpu_monitor.reset_alarm_threshold = atoi(val); + if (globals.cpu_monitor.reset_alarm_threshold > globals.cpu_monitor.set_alarm_threshold) { + globals.cpu_monitor.reset_alarm_threshold = globals.cpu_monitor.set_alarm_threshold-10; + zap_log(ZAP_LOG_ERROR, "Cpu alarm reset threshold must be lower than set threshold, set threshold to %d\n", globals.cpu_monitor.reset_alarm_threshold); + } + } else { + zap_log(ZAP_LOG_ERROR, "Invalid cpu alarm reset threshold %s\n", val); + } + } else if (!strncasecmp(var, "cpu_alarm_action", 16)) { + char* p = val; + do { + if (!strncasecmp(p, "reject", 6)) { + globals.cpu_monitor.alarm_action_flags |= ZAP_CPU_ALARM_ACTION_REJECT; + } else if (!strncasecmp(p, "warn", 4)) { + globals.cpu_monitor.alarm_action_flags |= ZAP_CPU_ALARM_ACTION_WARN; + } + p = strchr(p, ','); + if (p) { + while(++p) if (*p != 0x20) break; + } + } while (p); + } } else { zap_log(ZAP_LOG_ERROR, "unknown param [%s] '%s' / '%s'\n", cfg.category, var, val); } @@ -2547,7 +2618,6 @@ static zap_status_t load_config(void) zap_config_close_file(&cfg); zap_log(ZAP_LOG_INFO, "Configured %u channel(s)\n", configured); - return configured ? ZAP_SUCCESS : ZAP_FAIL; } @@ -2806,7 +2876,7 @@ OZ_DECLARE(zap_status_t) zap_span_send_signal(zap_span_t *span, zap_sigmsg_t *si OZ_DECLARE(zap_status_t) zap_global_init(void) { int modcount; - + memset(&globals, 0, sizeof(globals)); time_init(); @@ -2824,13 +2894,23 @@ OZ_DECLARE(zap_status_t) zap_global_init(void) modcount = zap_load_modules(); zap_log(ZAP_LOG_NOTICE, "Modules configured: %d \n", modcount); - if (load_config() == ZAP_SUCCESS) { - globals.running = 1; - return ZAP_SUCCESS; + globals.cpu_monitor.interval = 1000; + globals.cpu_monitor.alarm_action_flags = ZAP_CPU_ALARM_ACTION_WARN | ZAP_CPU_ALARM_ACTION_REJECT; + globals.cpu_monitor.set_alarm_threshold = 80; + globals.cpu_monitor.reset_alarm_threshold = 70; + + if (load_config() != ZAP_SUCCESS) { + zap_log(ZAP_LOG_ERROR, "No modules configured!\n"); + return ZAP_FAIL; } - zap_log(ZAP_LOG_ERROR, "No modules configured!\n"); - return ZAP_FAIL; + globals.running = 1; + if (!zap_cpu_monitor_disabled) { + if (zap_cpu_monitor_start(&globals.cpu_monitor) != ZAP_SUCCESS) { + return ZAP_FAIL; + } + } + return ZAP_SUCCESS; } OZ_DECLARE(uint32_t) zap_running(void) @@ -2846,10 +2926,11 @@ OZ_DECLARE(zap_status_t) zap_global_destroy(void) time_end(); - globals.running = 0; + globals.running = 0; + zap_cpu_monitor_stop(&globals.cpu_monitor); zap_span_close_all(); zap_sleep(1000); - + zap_mutex_lock(globals.span_mutex); for (sp = globals.spans; sp;) { zap_span_t *cur_span = sp; @@ -2900,7 +2981,7 @@ OZ_DECLARE(zap_status_t) zap_global_destroy(void) zap_mutex_unlock(globals.mutex); zap_mutex_destroy(&globals.mutex); zap_mutex_destroy(&globals.span_mutex); - + zap_interrupt_destroy(&globals.cpu_monitor.interrupt); memset(&globals, 0, sizeof(globals)); return ZAP_SUCCESS; } @@ -3192,6 +3273,69 @@ OZ_DECLARE_NONSTD(zap_status_t) zap_console_stream_write(zap_stream_handle_t *ha return ret ? ZAP_FAIL : ZAP_SUCCESS; } +static void *zap_cpu_monitor_run(zap_thread_t *me, void *obj) +{ + cpu_monitor_t *monitor = (cpu_monitor_t *)obj; + struct zap_cpu_monitor_stats *cpu_stats = zap_new_cpu_monitor(); + if (!cpu_stats) { + return NULL; + } + monitor->running = 1; + + while(zap_running()) { + double time; + if (zap_cpu_get_system_idle_time(cpu_stats, &time)) { + break; + } + + if (monitor->alarm) { + if ((int)time >= (100-monitor->set_alarm_threshold)) { + zap_log(ZAP_LOG_DEBUG, "CPU alarm OFF (idle:%d)\n", (int) time); + monitor->alarm = 0; + } + if (monitor->alarm_action_flags & ZAP_CPU_ALARM_ACTION_WARN) { + zap_log(ZAP_LOG_WARNING, "CPU alarm is ON (cpu usage:%d)\n", (int) (100-time)); + } + } else { + if ((int)time <= (100-monitor->reset_alarm_threshold)) { + zap_log(ZAP_LOG_DEBUG, "CPU alarm ON (idle:%d)\n", (int) time); + monitor->alarm = 1; + } + } + zap_interrupt_wait(monitor->interrupt, monitor->interval); + } + zap_delete_cpu_monitor(cpu_stats); + monitor->running = 0; + return NULL; +} + + +static zap_status_t zap_cpu_monitor_start(cpu_monitor_t* monitor) +{ + if (zap_interrupt_create(&monitor->interrupt, ZAP_INVALID_SOCKET) != ZAP_SUCCESS) { + zap_log(ZAP_LOG_CRIT, "Failed to create CPU monitor interrupt\n"); + return ZAP_FAIL; + } + + if (zap_thread_create_detached(zap_cpu_monitor_run, monitor) != ZAP_SUCCESS) { + zap_log(ZAP_LOG_CRIT, "Failed to create cpu monitor thread!!\n"); + return ZAP_FAIL; + } + return ZAP_SUCCESS; +} + +static void zap_cpu_monitor_stop(cpu_monitor_t* monitor) +{ + zap_interrupt_signal(monitor->interrupt); + while(monitor->running) { + zap_sleep(10); + } +} + +OZ_DECLARE(void) zap_cpu_monitor_disable(void) +{ + zap_cpu_monitor_disabled = 1; +} /* For Emacs: diff --git a/libs/openzap/src/zap_threadmutex.c b/libs/openzap/src/zap_threadmutex.c index b6bc8435b6..5307f2bd7e 100644 --- a/libs/openzap/src/zap_threadmutex.c +++ b/libs/openzap/src/zap_threadmutex.c @@ -3,7 +3,7 @@ * Copyright(C) 2007 Michael Jerris * * You may opt to use, copy, modify, merge, publish, distribute and/or sell - * copies of the Software, and permit persons to whom the Software is + * copies of the Soozware, and permit persons to whom the Soozware is * furnished to do so. * * This work is provided under this license on an "as is" basis, without warranty of any kind, @@ -15,6 +15,10 @@ * constitutes an essential part of this license. No use of any covered code is authorized hereunder * except under this disclaimer. * + * Contributors: + * + * Moises Silva + * */ #ifdef WIN32 @@ -35,8 +39,8 @@ struct zap_mutex { }; #else - #include +#include #define ZAP_THREAD_CALLING_CONVENTION @@ -46,6 +50,18 @@ struct zap_mutex { #endif +struct zap_interrupt { + zap_socket_t device; +#ifdef WIN32 + /* for generic interruption */ + HANDLE event; +#else + /* for generic interruption */ + int readfd; + int writefd; +#endif +}; + struct zap_thread { #ifdef WIN32 void *handle; @@ -75,7 +91,7 @@ static void * ZAP_THREAD_CALLING_CONVENTION thread_launch(void *args) #ifndef WIN32 pthread_attr_destroy(&thread->attribute); #endif - free(thread); + zap_safe_free(thread); return exit_val; } @@ -125,7 +141,7 @@ OZ_DECLARE(zap_status_t) zap_thread_create_detached_ex(zap_thread_function_t fun fail: if (thread) { - free(thread); + zap_safe_free(thread); } done: return status; @@ -183,7 +199,7 @@ OZ_DECLARE(zap_status_t) zap_mutex_destroy(zap_mutex_t **mutex) if (pthread_mutex_destroy(&mp->mutex)) return ZAP_FAIL; #endif - free(mp); + zap_safe_free(mp); return ZAP_SUCCESS; } @@ -192,8 +208,11 @@ OZ_DECLARE(zap_status_t) _zap_mutex_lock(zap_mutex_t *mutex) #ifdef WIN32 EnterCriticalSection(&mutex->mutex); #else - if (pthread_mutex_lock(&mutex->mutex)) + int err; + if ((err = pthread_mutex_lock(&mutex->mutex))) { + zap_log(ZAP_LOG_ERROR, "Failed to lock mutex %d:%s\n", err, strerror(err)); return ZAP_FAIL; + } #endif return ZAP_SUCCESS; } @@ -222,8 +241,202 @@ OZ_DECLARE(zap_status_t) _zap_mutex_unlock(zap_mutex_t *mutex) } +OZ_DECLARE(zap_status_t) zap_interrupt_create(zap_interrupt_t **ininterrupt, zap_socket_t device) +{ + zap_interrupt_t *interrupt = NULL; +#ifndef WIN32 + int fds[2]; +#endif + interrupt = calloc(1, sizeof(*interrupt)); + if (!interrupt) { + zap_log(ZAP_LOG_ERROR, "Failed to allocate interrupt memory\n"); + return ZAP_FAIL; + } + interrupt->device = device; +#ifdef WIN32 + interrupt->interrupt = CreateEvent(NULL, FALSE, FALSE, NULL); + if (!interrupt->interrupt) { + zap_log(ZAP_LOG_ERROR, "Failed to allocate interrupt event\n"); + goto failed; + } +#else + if (pipe(fds)) { + zap_log(ZAP_LOG_ERROR, "Failed to allocate interrupt pipe: %s\n", strerror(errno)); + goto failed; + } + + interrupt->readfd = fds[0]; + interrupt->writefd = fds[1]; +#endif + + *ininterrupt = interrupt; + return ZAP_SUCCESS; + +failed: + if (interrupt) { +#ifndef WIN32 + if (interrupt->readfd) { + close(interrupt->readfd); + close(interrupt->writefd); + interrupt->readfd = -1; + interrupt->writefd = -1; + } +#endif + zap_safe_free(interrupt); + } + return ZAP_FAIL; +} + +#define ONE_BILLION 1000000000 + +OZ_DECLARE(zap_status_t) zap_interrupt_wait(zap_interrupt_t *interrupt, int ms) +{ + int num = 1; +#ifdef WIN32 + DWORD res = 0; + HANDLE ints[2]; +#else + int res = 0; + struct pollfd ints[2]; + char pipebuf[255]; +#endif + + /* start implementation */ +#ifdef WIN32 + ints[0] = interrupt->event; + if (interrupt->device != ZAP_INVALID_SOCKET) { + num++; + ints[1] = interrupt->device; + } + res = WaitForMultipleObjects(num, &ints, FALSE, ms >= 0 ? ms : INFINITE); + switch (res) { + case WAIT_TIMEOUT: + return ZAP_TIMEOUT; + case WAIT_FAILED: + case WAIT_ABANDONED: /* is it right to fail with abandoned? */ + return ZAP_FAIL; + default: + if (res >= (sizeof(ints)/sizeof(ints[0]))) { + zap_log(ZAP_LOG_ERROR, "Error waiting for openzap interrupt event (WaitForSingleObject returned %d)\n", res); + return ZAP_FAIL; + } + return ZAP_SUCCESS; + } +#else + ints[0].fd = interrupt->readfd; + ints[0].events = POLLIN; + ints[0].revents = 0; + + if (interrupt->device != ZAP_INVALID_SOCKET) { + num++; + ints[1].fd = interrupt->device; + ints[1].events = POLLIN; + ints[1].revents = 0; + } + + res = poll(ints, num, ms); + if (res == -1) { + zap_log(ZAP_LOG_CRIT, "interrupt poll failed (%s)\n", strerror(errno)); + return ZAP_FAIL; + } + + if (res == 0) { + return ZAP_TIMEOUT; + } + + if (ints[0].revents & POLLIN) { + res = read(ints[0].fd, pipebuf, sizeof(pipebuf)); + if (res == -1) { + zap_log(ZAP_LOG_CRIT, "reading interrupt descriptor failed (%s)\n", strerror(errno)); + } + } + + return ZAP_SUCCESS; +#endif +} + +OZ_DECLARE(zap_status_t) zap_interrupt_signal(zap_interrupt_t *interrupt) +{ +#ifdef WIN32 + if (!SetEvent(interrupt->interrupt)) { + zap_log(ZAP_LOG_ERROR, "Failed to signal interrupt\n"); + return ZAP_FAIL; + } +#else + int err; + if ((err = write(interrupt->writefd, "w", 1)) != 1) { + zap_log(ZAP_LOG_ERROR, "Failed to signal interrupt: %s\n", errno, strerror(errno)); + return ZAP_FAIL; + } +#endif + return ZAP_SUCCESS; +} + +OZ_DECLARE(zap_status_t) zap_interrupt_destroy(zap_interrupt_t **ininterrupt) +{ + zap_interrupt_t *interrupt = NULL; + interrupt = *ininterrupt; +#ifdef WIN32 + CloseHandle(interrupt->interrupt); +#else + close(interrupt->readfd); + close(interrupt->writefd); + interrupt->readfd = -1; + interrupt->writefd = -1; +#endif + zap_safe_free(interrupt); + *ininterrupt = NULL; + return ZAP_SUCCESS; +} + +OZ_DECLARE(zap_status_t) zap_interrupt_multiple_wait(zap_interrupt_t *interrupts[], zap_size_t size, int ms) +{ +#ifndef WIN32 + int i; + int res = 0; + int numdevices = 0; + char pipebuf[255]; + struct pollfd ints[size*2]; + + memset(&ints, 0, sizeof(ints)); + + for (i = 0; i < size; i++) { + ints[i].events = POLLIN; + ints[i].revents = 0; + ints[i].fd = interrupts[i]->readfd; + if (interrupts[i]->device != ZAP_INVALID_SOCKET) { + ints[i+numdevices].events = POLLIN; + ints[i+numdevices].revents = 0; + ints[i+numdevices].fd = interrupts[i]->device; + numdevices++; + } + } + + res = poll(ints, size + numdevices, ms); + + if (res == -1) { + zap_log(ZAP_LOG_CRIT, "interrupt poll failed (%s)\n", strerror(errno)); + return ZAP_FAIL; + } + + if (res == 0) { + return ZAP_TIMEOUT; + } + + for (i = size; i < zap_array_len(ints); i++) { + if (ints[i].revents & POLLIN) { + res = read(ints[0].fd, pipebuf, sizeof(pipebuf)); + if (res == -1) { + zap_log(ZAP_LOG_CRIT, "reading interrupt descriptor failed (%s)\n", strerror(errno)); + } + } + } + +#endif + return ZAP_SUCCESS; +} /* For Emacs: * Local Variables: