diff --git a/libs/freetdm/src/ftmod/ftmod_misdn/ftmod_misdn.c b/libs/freetdm/src/ftmod/ftmod_misdn/ftmod_misdn.c index 462cb9a6c4..84133601f3 100644 --- a/libs/freetdm/src/ftmod/ftmod_misdn/ftmod_misdn.c +++ b/libs/freetdm/src/ftmod/ftmod_misdn/ftmod_misdn.c @@ -44,6 +44,7 @@ #include #include #include +#include /* this is how it should have been... #ifdef HAVE_FREETDM_FREETDM_H @@ -70,6 +71,7 @@ //#define MISDN_DEBUG_EVENTS //#define MISDN_DEBUG_IO +#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x)) typedef enum { MISDN_CAPS_NONE = 0, @@ -145,7 +147,7 @@ const static struct { #undef MISDN_CONTROL_TYPE }; -#if 0 /* unused for now */ +#ifdef MISDN_DEBUG_EVENTS static const char *misdn_control2str(const int ctrl) { int x; @@ -219,6 +221,16 @@ static inline void misdn_convert_audio_bits(char *buf, int buflen) * mISDN <-> FreeTDM data structures ***********************************************************************************/ +typedef enum { + MISDN_CMD_NONE = 0, + MISDN_CMD_STOP, /*!< Stop the I/O thread */ +} misdn_cmd_t; + +struct misdn_command { + misdn_cmd_t type; +/* union { } cmd; */ /*!< Command-specific parameters */ +}; + enum { MISDN_SPAN_NONE = 0, MISDN_SPAN_RUNNING = (1 << 0), @@ -227,10 +239,18 @@ enum { struct misdn_span_private { int flags; + int running; + + int event_pipe_in; + int event_pipe_out; /* event conditional */ pthread_mutex_t event_cond_mutex; pthread_cond_t event_cond; + + /* start / stop feedback */ + pthread_mutex_t ctrl_cond_mutex; + pthread_cond_t ctrl_cond; }; struct misdn_event_queue; @@ -243,9 +263,11 @@ struct misdn_chan_private { /* hw addr of channel */ struct sockaddr_mISDN addr; - /* audio tx pipe */ - int audio_pipe_in; - int audio_pipe_out; + /* audio tx pipe (= socketpair ends) */ + int tx_audio_pipe_in; + int tx_audio_pipe_out; + int rx_audio_pipe_in; + int rx_audio_pipe_out; /* counters */ unsigned long tx_cnt; @@ -254,6 +276,14 @@ struct misdn_chan_private { unsigned long slip_rx_cnt; unsigned long slip_tx_cnt; + unsigned long tx_pipe_wr_bytes; /*!< Number of bytes written into tx audio pipe */ + unsigned long tx_pipe_rd_bytes; /*!< Number of bytes read from tx audio pipe */ + unsigned long tx_miss_bytes; /*!< Number of bytes missing in short reads from tx audio pipe */ + unsigned long tx_lost_bytes; /*!< Number of bytes lost in short writes to the mISDN B-Channel */ + unsigned long tx_sent_bytes; /*!< Number of bytes successfully sent to the mISDN B-Channel */ + unsigned long tx_pipe_under_cnt; /*!< Number of tx audio pipe underflows */ + unsigned long tx_pipe_over_cnt; /*!< Number of tx audio pipe overflows */ + struct misdn_event_queue *events; }; @@ -557,11 +587,11 @@ static ftdm_status_t _misdn_toggle_channel(ftdm_channel_t *chan, int activate) (activate) ? "activation" : "deactivation", strerror(errno)); return FTDM_FAIL; } -//#ifdef MISDN_DEBUG_EVENTS +#ifdef MISDN_DEBUG_EVENTS ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s (%#x)', id %#x, while waiting for %s confirmation on %c-channel\n", misdn_event2str(hh->prim), hh->prim, hh->id, (activate) ? "activation" : "deactivation", ftdm_channel_get_type(chan) == FTDM_CHAN_TYPE_B ? 'B' : 'D'); -//#endif +#endif switch (hh->prim) { case PH_ACTIVATE_IND: case PH_ACTIVATE_CNF: @@ -685,10 +715,10 @@ static ftdm_status_t misdn_get_ph_info(ftdm_channel_t *chan, struct ph_info *inf misdn_event2str(req), strerror(errno)); return FTDM_FAIL; } -//#ifdef MISDN_DEBUG_EVENTS +#ifdef MISDN_DEBUG_EVENTS ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s' while waiting for %s answer\n", misdn_event2str(hh->prim), misdn_event2str(req)); -//#endif +#endif switch (hh->prim) { case MPH_INFORMATION_IND: /* success */ if (retval < MISDN_HEADER_LEN + sizeof(*info)) { @@ -936,7 +966,7 @@ static int misdn_handle_mph_information_ind(ftdm_channel_t *chan, const struct m * mISDN <-> FreeTDM interface functions ***********************************************************************************/ -struct misdn_globals { +static struct misdn_globals { int sockfd; } globals; @@ -1022,6 +1052,11 @@ static FIO_CLOSE_FUNCTION(misdn_close) ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D'); } + ftdm_log_chan(ftdmchan, FTDM_LOG_NOTICE, "mISDN tx stats: wr: %lu, rd: %lu, tx: %lu, tx-lost: %lu, tx-miss: %lu, tx-under#: %lu, tx-over#: %lu\n", + chan_priv->tx_pipe_wr_bytes, chan_priv->tx_pipe_rd_bytes, + chan_priv->tx_sent_bytes, chan_priv->tx_lost_bytes, chan_priv->tx_miss_bytes, + chan_priv->tx_pipe_over_cnt, chan_priv->tx_pipe_under_cnt); + chan_priv->active = 0; } @@ -1109,16 +1144,22 @@ static FIO_WAIT_FUNCTION(misdn_wait) switch (ftdm_channel_get_type(ftdmchan)) { case FTDM_CHAN_TYPE_B: if (*flags & FTDM_WRITE) { - pfds[nr_fds].fd = chan_priv->audio_pipe_in; + pfds[nr_fds].fd = chan_priv->tx_audio_pipe_in; pfds[nr_fds].events = POLLOUT; nr_fds++; } - if (*flags & (FTDM_READ | FTDM_EVENTS)) { + if (*flags & FTDM_READ) { + pfds[nr_fds].fd = chan_priv->rx_audio_pipe_out; + pfds[nr_fds].events = POLLIN; + nr_fds++; + } +/* if (*flags & (FTDM_READ | FTDM_EVENTS)) { pfds[nr_fds].fd = ftdmchan->sockfd; pfds[nr_fds].events |= (*flags & FTDM_READ) ? POLLIN : 0; pfds[nr_fds].events |= (*flags & FTDM_EVENTS) ? POLLPRI : 0; nr_fds++; } +*/ break; default: if (*flags & FTDM_READ) @@ -1149,7 +1190,7 @@ static FIO_WAIT_FUNCTION(misdn_wait) switch (ftdm_channel_get_type(ftdmchan)) { case FTDM_CHAN_TYPE_B: - if (pfds[0].revents & POLLOUT) + if ((pfds[0].revents & POLLOUT) || (pfds[1].revents & POLLOUT)) *flags |= FTDM_WRITE; if ((pfds[0].revents & POLLIN) || (pfds[1].revents & POLLIN)) *flags |= FTDM_READ; @@ -1171,10 +1212,10 @@ static FIO_WAIT_FUNCTION(misdn_wait) /** * Handle incoming mISDN message on d-channel - * @param[in] ftdmchan - * @param[in] msg_buf - * @param[in] msg_len - * @internal + * \param[in] ftdmchan + * \param[in] msg_buf + * \param[in] msg_len + * \internal */ static ftdm_status_t misdn_handle_incoming(ftdm_channel_t *ftdmchan, const char *msg_buf, const int msg_len) { @@ -1258,13 +1299,6 @@ static FIO_READ_FUNCTION(misdn_read) int retval; int maxretry = 10; - if (!priv->active) { - ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed channel\n"); - /* ignore */ - *datalen = 0; - return FTDM_SUCCESS; - } - /* nothing read yet */ *datalen = 0; @@ -1273,111 +1307,61 @@ static FIO_READ_FUNCTION(misdn_read) * we'll get a lot of "mISDN_send: error -12" message in dmesg otherwise * (= b-channel receive queue overflowing) */ - while (maxretry--) { - struct sockaddr_mISDN addr; - socklen_t addrlen = sizeof(addr); + switch (ftdm_channel_get_type(ftdmchan)) { + case FTDM_CHAN_TYPE_DQ921: { + while (maxretry--) { + struct sockaddr_mISDN addr; + socklen_t addrlen = sizeof(addr); - if ((retval = recvfrom(ftdmchan->sockfd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&addr, &addrlen)) < 0) { - if (errno == EWOULDBLOCK || errno == EAGAIN) break; - ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to receive incoming message: %s\n", + if ((retval = recvfrom(ftdmchan->sockfd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&addr, &addrlen)) < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) break; + ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to receive incoming message: %s\n", + strerror(errno)); + return FTDM_FAIL; + } + + if (retval < MISDN_HEADER_LEN) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "mISDN received message too small\n"); + return FTDM_FAIL; + } + + if (hh->prim == PH_DATA_IND) { + *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes); + + if (*datalen <= 0) + continue; + + /* + * Copy data into ouput buffer (excluding the mISDN message header) + * NOTE: audio data needs to be converted to a-law / u-law! + */ + memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen); + return FTDM_SUCCESS; + } else { + *datalen = 0; + /* event */ + misdn_handle_incoming(ftdmchan, rbuf, retval); + } + } + break; + } + case FTDM_CHAN_TYPE_B: { + if (!priv->active) { + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed b-channel\n"); + return FTDM_SUCCESS; + } + + if ((retval = read(priv->rx_audio_pipe_out, data, bytes)) < 0) { + ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read audio data from rx pipe: %s\n", strerror(errno)); return FTDM_FAIL; } - - if (retval < MISDN_HEADER_LEN) { - ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "mISDN received message too small\n"); - return FTDM_FAIL; - } - - if (hh->prim == PH_DATA_IND) { - *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes); -#ifdef MISDN_DEBUG_IO - ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n", - misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel); - - if (*datalen > 0) { - char hbuf[MAX_DATA_MEM] = { 0 }; - print_hex_bytes(data, *datalen, hbuf, sizeof(hbuf)); - ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN read data: %s\n", hbuf); - } -#endif - if (*datalen <= 0) - continue; - - /* - * Copy data into ouput buffer (excluding the mISDN message header) - * NOTE: audio data needs to be converted to a-law / u-law! - */ - memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen); - - switch (ftdm_channel_get_type(ftdmchan)) { - case FTDM_CHAN_TYPE_B: - hh->prim = PH_DATA_REQ; - hh->id = MISDN_ID_ANY; - bytes = *datalen; - - /* Convert incoming audio data to *-law */ - misdn_convert_audio_bits(data, *datalen); - - /* - * Fetch required amount of audio from tx pipe, using the amount - * of received bytes as an indicator for how much free space the - * b-channel tx buffer has available. - * - * (see misdn_write() for the part that fills the tx pipe) - * - * NOTE: can't use blocking I/O here since both parts are serviced - * from the same thread - */ - if ((retval = read(priv->audio_pipe_out, rbuf + MISDN_HEADER_LEN, bytes)) < 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { - ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read %d bytes of audio data: %s\n", - bytes, strerror(errno)); - break; - } - /* Tx pipe is empty, completely fill buffer up to "bytes" with silence value */ - retval = 0; - } - - /* - * Use a-law / u-law silence to fill missing bytes, - * in case there was not enough audio data available in the - * tx pipe to satisfy the request. - */ - if (retval < bytes) { - memset(&rbuf[MISDN_HEADER_LEN + retval], - (ftdm_channel_get_codec(ftdmchan) == FTDM_CODEC_ALAW) ? 0x2a : 0xff, - bytes - retval); - } - - /* Convert outgoing audio data to wire format */ - misdn_convert_audio_bits(rbuf + MISDN_HEADER_LEN, bytes); - bytes += MISDN_HEADER_LEN; - - /* Send converted audio to b-channel */ - if ((retval = sendto(ftdmchan->sockfd, rbuf, bytes, 0, (struct sockaddr *)&priv->addr, sizeof(priv->addr))) < bytes) { - ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to send %d bytes of audio data: (%d) %s\n", - bytes, retval, strerror(errno)); - } - break; - default: - break; - } - return FTDM_SUCCESS; - } else { - *datalen = 0; -#ifdef MISDN_DEBUG_IO - ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n", - misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel); -#endif - /* event */ - misdn_handle_incoming(ftdmchan, rbuf, retval); - } + *datalen = retval; + break; + } + default: + break; } -#ifdef MISDN_DEBUG_IO - ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN nothing received on %c-channel\n", - ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D'); -#endif return FTDM_SUCCESS; } @@ -1427,11 +1411,18 @@ static FIO_WRITE_FUNCTION(misdn_write) * NOTE: can't use blocking I/O here since both parts are serviced * from the same thread */ - if ((retval = write(priv->audio_pipe_in, data, size)) < size) { - ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error: %s\n", - strerror(errno)); + if ((retval = write(priv->tx_audio_pipe_in, data, size)) < 0) { + ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu]: %s\n", + priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes, + priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, strerror(errno)); return FTDM_FAIL; + } else if (retval < size) { + priv->tx_pipe_over_cnt++; + ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "mISDN channel audio pipe short write [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu], expected: %d, written: %d\n", + priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes, + priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, size, retval); } + ACCESS_ONCE(priv->tx_pipe_wr_bytes) += retval; *datalen = retval; break; default: @@ -1482,6 +1473,12 @@ static FIO_WRITE_FUNCTION(misdn_write) return FTDM_SUCCESS; } +/** + * Carefully choosen size for socket send/recv buffers + * larger values will add more latency, while lower values will cause deadlocks + * (see misdn_span_run() comments below for an explanation) + */ +#define SOCKETPAIR_BUFFER_SIZE 3072 static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, struct mISDN_devinfo *devinfo, int start, int end) { @@ -1556,9 +1553,11 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, ftdm_log(FTDM_LOG_DEBUG, "mISDN opened socket (on chan:dev => %d:%d): %d\n", addr.dev, addr.channel, sockfd); - /* set non-blocking */ + /* Set mISDN channel socket non-blocking */ if (fcntl(sockfd, F_SETFL, O_NONBLOCK) < 0) { - ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n", strerror(errno)); + ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n", + strerror(errno)); + close(sockfd); return FTDM_FAIL; } @@ -1604,25 +1603,109 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, if (ftdmchan->type == FTDM_CHAN_TYPE_B) { int pipefd[2] = { -1, -1 }; - ftdmchan->packet_len = 10 /* ms */ * (ftdmchan->rate / 1000); + ftdmchan->packet_len = 30 /* ms */ * (ftdmchan->rate / 1000); ftdmchan->effective_interval = ftdmchan->native_interval = ftdmchan->packet_len / 8; ftdmchan->native_codec = ftdmchan->effective_codec = FTDM_CODEC_ALAW; - ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL); +// ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL); +#ifdef USE_PIPE /* * Create audio tx pipe, use non-blocking I/O to avoid deadlock since both ends * are used from the same thread */ - if (pipe2(pipefd, O_NONBLOCK) < 0) { - ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio write pipe [%d:%d]: %s\n", + if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) { + ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio tx pipe [%d:%d]: %s\n", addr.dev, x, strerror(errno)); close(sockfd); return FTDM_FAIL; } - priv->audio_pipe_in = pipefd[1]; - priv->audio_pipe_out = pipefd[0]; + priv->tx_audio_pipe_in = pipefd[1]; + priv->tx_audio_pipe_out = pipefd[0]; +#if 1 || defined(HAVE_F_SETPIPE_SZ) + if (fcntl(priv->tx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) { + ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio tx pipe size [%d:%d]: %s\n", + addr.dev, x, strerror(errno)); + } +#endif + /* + * Create audio rx pipe, use non-blocking I/O to avoid deadlock since both ends + * are used from the same thread + */ + if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) { + ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio rx pipe [%d:%d]: %s\n", + addr.dev, x, strerror(errno)); + close(sockfd); + return FTDM_FAIL; + } + priv->rx_audio_pipe_in = pipefd[1]; + priv->rx_audio_pipe_out = pipefd[0]; + +#if 1 || defined(HAVE_F_SETPIPE_SZ) + if (fcntl(priv->rx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) { + ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio rx pipe size [%d:%d]: %s\n", + addr.dev, x, strerror(errno)); + } +#endif +#else /* !USE_PIPE */ + /* + * Use a socket pair for audio rx/tx, allows for more fine-grained control + * of latency (= amounts of data in buffers) + */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd) < 0) { + ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio socket pair [%d:%d]: %s\n", + addr.dev, x, strerror(errno)); + close(sockfd); + return FTDM_FAIL; + } else { + int opt = SOCKETPAIR_BUFFER_SIZE; + socklen_t optlen = sizeof(opt); + + if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[0] to non-blocking: %s\n", + strerror(errno)); + close(sockfd); + close(pipefd[0]); + close(pipefd[1]); + return FTDM_FAIL; + } + if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[1] to non-blocking: %s\n", + strerror(errno)); + close(sockfd); + close(pipefd[0]); + close(pipefd[1]); + return FTDM_FAIL; + } + + /* + * Set RX/TX buffer sizes on each end of the socket pair + */ + if (setsockopt(pipefd[0], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) { + ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] RCVBUF: %s\n", + strerror(errno)); + } + if (setsockopt(pipefd[0], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) { + ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] SNDBUF: %s\n", + strerror(errno)); + } + if (setsockopt(pipefd[1], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) { + ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] RCVBUF: %s\n", + strerror(errno)); + } + if (setsockopt(pipefd[1], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) { + ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] SNDBUF: %s\n", + strerror(errno)); + } + + priv->rx_audio_pipe_in = pipefd[1]; + priv->rx_audio_pipe_out = pipefd[0]; + + priv->tx_audio_pipe_in = pipefd[0]; + priv->tx_audio_pipe_out = pipefd[1]; + } +#endif } else { /* early activate D-Channel */ misdn_activate_channel(ftdmchan); @@ -1738,6 +1821,8 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span) /* allocate span private */ if (!span_priv) { + int pipe[2] = { -1, -1 }; + /* * Not perfect, there should be something like span_create too */ @@ -1751,6 +1836,19 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span) /* init event condition */ pthread_cond_init(&span_priv->event_cond, NULL); pthread_mutex_init(&span_priv->event_cond_mutex, NULL); + + /* init control condition */ + pthread_cond_init(&span_priv->ctrl_cond, NULL); + pthread_mutex_init(&span_priv->ctrl_cond_mutex, NULL); + + /* create event pipe */ + if (pipe2(pipe, O_CLOEXEC) < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN failed to create event pipe: %s\n", + strerror(errno)); + return FTDM_FAIL; + } + span_priv->event_pipe_in = pipe[0]; + span_priv->event_pipe_out = pipe[1]; } /* split channel list by ',' */ @@ -1819,8 +1917,6 @@ static FIO_GET_ALARMS_FUNCTION(misdn_get_alarms) add event queues and data fifos, so we can sift all the messages we get to forward them to the right receiver */ - ftdm_span_t *span = ftdm_channel_get_span(ftdmchan); - struct misdn_span_private *span_priv = ftdm_span_io_private(span); char buf[MAX_DATA_MEM] = { 0 }; struct sockaddr_mISDN addr; struct mISDNhead *hh; @@ -1910,31 +2006,50 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event) int retval = 0, nr_events = 0; int i; - clock_gettime(CLOCK_REALTIME, &ts); - ts_add_msec(&ts, ms); - for (i = 1; i <= ftdm_span_get_chan_count(span); i++) { ftdm_channel_t *chan = ftdm_span_get_channel(span, i); struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan); + /* Skip channels that have event processing pending (Avoids event storms) */ + if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT)) + continue; + if (misdn_event_queue_has_data(chan_priv->events)) { #ifdef MISDN_DEBUG_EVENTS ftdm_log(FTDM_LOG_DEBUG, "mISDN channel %d:%d has event(s)\n", ftdm_channel_get_span_id(chan), ftdm_channel_get_id(chan)); #endif - ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT); + ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT); chan->last_event_time = ftdm_current_time_in_ms(); nr_events++; } } - if (nr_events) + if (nr_events) { +#ifdef MISDN_DEBUG_EVENTS + ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has %d new events pending (pre poll)\n", + ftdm_span_get_id(span), nr_events); +#endif return FTDM_SUCCESS; + } + + +#ifdef MISDN_DEBUG_EVENTS + ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has no events pending, polling for new events with %d ms timeout\n", + ftdm_span_get_id(span), ms); +#endif + /* Wait at least 1 ms, max 1 s */ + ms = ftdm_clamp(ms, 1, 1000); + + clock_gettime(CLOCK_REALTIME, &ts); + ts_add_msec(&ts, ms); if ((retval = pthread_cond_timedwait(&span_priv->event_cond, &span_priv->event_cond_mutex, &ts))) { switch (retval) { case ETIMEDOUT: -// ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n", -// ftdm_span_get_id(span), ms); +#ifdef MISDN_DEBUG_EVENTS + ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n", + ftdm_span_get_id(span), ms); +#endif return FTDM_TIMEOUT; default: ftdm_log(FTDM_LOG_DEBUG, "mISDN failed to poll for events on span %d: %s\n", @@ -1943,12 +2058,20 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event) } } +#ifdef MISDN_DEBUG_EVENTS + ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d received new event notification, checking channel event queues\n", + ftdm_span_get_id(span)); +#endif for (i = 1; i <= ftdm_span_get_chan_count(span); i++) { ftdm_channel_t *chan = ftdm_span_get_channel(span, i); struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan); + /* Skip channels that have event processing pending (Avoids event storms) */ + if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT)) + continue; + if (misdn_event_queue_has_data(chan_priv->events)) { - ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT); + ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT); chan->last_event_time = ftdm_current_time_in_ms(); nr_events++; } @@ -1956,6 +2079,57 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event) return (nr_events) ? FTDM_SUCCESS : FTDM_TIMEOUT; /* no events? => timeout */ } +/** + * Retrieve event from channel + * \param ftdmchan Channel to retrieve event from + * \param event FreeTDM event to return + * \return Success or failure + */ +static FIO_CHANNEL_NEXT_EVENT_FUNCTION(misdn_channel_next_event) +{ + struct misdn_chan_private *chan_priv = ftdm_chan_io_private(ftdmchan); + struct misdn_event *evt = NULL; + ftdm_span_t *span = ftdm_channel_get_span(ftdmchan); + uint32_t event_id = FTDM_OOB_INVALID; + + ftdm_assert(span, "span == NULL"); + + ftdm_clear_io_flag(ftdmchan, FTDM_CHANNEL_IO_EVENT); + + if (!(evt = misdn_event_queue_pop(chan_priv->events))) { +#ifdef MISDN_DEBUG_EVENTS + ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n"); +#endif + return FTDM_FAIL; + } + +#ifdef MISDN_DEBUG_EVENTS + ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "Got event '%s' from channel event queue\n", + misdn_event2str(evt->id)); +#endif + /* Convert from misdn event to ftdm */ + switch (evt->id) { + case PH_DEACTIVATE_IND: + event_id = FTDM_OOB_ALARM_TRAP; + ftdmchan->alarm_flags |= FTDM_ALARM_RED; + break; + case PH_ACTIVATE_IND: + event_id = FTDM_OOB_ALARM_CLEAR; + ftdmchan->alarm_flags &= ~FTDM_ALARM_RED; + break; + default: + ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "Unhandled event id %d (0x%x) %s\n", + evt->id, evt->id, misdn_event2str(evt->id)); + } + + ftdmchan->last_event_time = 0; + span->event_header.e_type = FTDM_EVENT_OOB; + span->event_header.enum_id = event_id; + span->event_header.channel = ftdmchan; + *event = &span->event_header; + return FTDM_SUCCESS; +} + /** * \brief Retrieve event * \param span FreeTDM span @@ -1974,11 +2148,12 @@ static FIO_SPAN_NEXT_EVENT_FUNCTION(misdn_next_event) struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan); struct misdn_event *evt = NULL; + ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT); + if (!(evt = misdn_event_queue_pop(chan_priv->events))) { #ifdef MISDN_DEBUG_EVENTS ftdm_log_chan_msg(chan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n"); #endif - ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT); continue; } @@ -2051,6 +2226,7 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy) { struct misdn_span_private *span_priv = ftdm_span_io_private(span); + /* free resources */ ftdm_span_io_private(span) = NULL; ftdm_safe_free(span_priv); @@ -2060,10 +2236,397 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy) } +/** + * Called by misdn_span_run() to handle incoming b-channel events + * \param[in] chan FreeTDM channel object + * \return FTDM_SUCCESS on success, FTDM_* on error + */ +static ftdm_status_t handle_b_channel_event(ftdm_channel_t *chan) +{ + struct misdn_chan_private *priv = ftdm_chan_io_private(chan); + char buf[MAX_DATA_MEM] = { 0 }; + struct mISDNhead *mh = (void *)buf; + int retval; + + if ((retval = recvfrom(chan->sockfd, buf, sizeof(buf), 0, NULL, NULL)) <= 0) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to receive message: %s\n", + strerror(errno)); + return FTDM_FAIL; + } + + if (retval < MISDN_HEADER_LEN) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN message too short, min.: %d, read: %d\n", + (int)MISDN_HEADER_LEN, retval); + return FTDM_FAIL; + } + + switch (mh->prim) { + case PH_DATA_IND: { + int datalen = retval - MISDN_HEADER_LEN; + char *data = buf + MISDN_HEADER_LEN; + + /* Convert audio data */ + misdn_convert_audio_bits(data, datalen); + + /* Write audio into receive pipe */ + if ((retval = write(priv->rx_audio_pipe_in, data, datalen)) < 0) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into rx pipe: %s\n", + strerror(errno)); + return FTDM_FAIL; + } else if (retval < datalen) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN short write into rx pipe, written: %d, expected: %d\n", + retval, datalen); + return FTDM_FAIL; + } + + /* Get receive buffer usage */ + if (ioctl(priv->tx_audio_pipe_out, FIONREAD, &retval) < 0) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to get tx audio buffer usage: %s\n", + strerror(errno)); + return FTDM_FAIL; + } else if (retval < datalen) { +// ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN has not enough bytes in tx audio pipe, available: %d, requested: %d\n", +// retval, datalen); + priv->tx_pipe_under_cnt++; + return FTDM_SUCCESS; + } + +#ifdef MISDN_DEBUG_IO + ftdm_log_chan(chan, FTDM_LOG_INFO, "mISDN tx audio buffer usage: %d\n", + retval); +#endif + + /* Get audio from tx pipe */ + if ((retval = read(priv->tx_audio_pipe_out, data, datalen)) < 0) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to read audio data from tx pipe: %s\n", + strerror(errno)); + return FTDM_FAIL; + } else if (retval == 0) { + ftdm_log_chan_msg(chan, FTDM_LOG_NOTICE, "mISDN tx pipe is empty\n"); + priv->tx_pipe_under_cnt++; + return FTDM_SUCCESS; + } else if (retval < datalen) { + ftdm_log_chan(chan, FTDM_LOG_NOTICE, "mISDN short read from tx pipe, read: %d, expected: %d\n", + retval, datalen); + priv->tx_pipe_under_cnt++; + priv->tx_miss_bytes += ftdm_max(0, datalen - retval); + datalen = retval; + } + priv->tx_pipe_rd_bytes += retval; + + if (!priv->active) { + /* discard */ + return FTDM_SUCCESS; + } + + /* Convert audio data */ + misdn_convert_audio_bits(data, datalen); + + /* Write to channel */ + mh->prim = PH_DATA_REQ; + mh->id = 0; + datalen += MISDN_HEADER_LEN; + + if ((retval = write(chan->sockfd, buf, datalen)) < 0) { + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into b-channel: %s\n", + strerror(errno)); + return FTDM_FAIL; + } else if (retval < datalen) { + ftdm_log_chan(chan, FTDM_LOG_WARNING, "mISDN short write into b-channel, written: %d, expected: %d\n", + retval, datalen); + priv->tx_lost_bytes += ftdm_max(0, datalen - retval - MISDN_HEADER_LEN); + } + priv->tx_sent_bytes += ftdm_max(0, retval - MISDN_HEADER_LEN); + break; + } + case PH_DATA_CNF: + priv->tx_ack_cnt++; + break; + case PH_DEACTIVATE_IND: + priv->active = 0; + break; + case PH_ACTIVATE_IND: + priv->active = 1; + break; + default: + ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN received unknown/unhandled event primitive: (%d) %s\n", + mh->prim, misdn_event2str(mh->prim)); + break; + } + return FTDM_SUCCESS; +} + + +/** + * Timeout (miliseconds) for epoll_wait() + */ +#define MISDN_EPOLL_WAIT_MAX_MSEC 1000 + +/** + * mISDN I/O thread + * This thread handles all of the B-Channel I/O, this avoids all of the hazzles with + * intermixed data + control frames on mISDN sockets and the missing write poll support on B-Channels. + * + * Each channel uses a unix stream socketpair as a two-way, pipe replacement for incoming and outgoing + * data. Socketpairs allow a more fine grained tuning of the buffer sizes (pipe are restricted to multiples of + * the native page size (with the smallest possible size (4k) being already 500ms worth of audio). + * + * The socketpair buffer sizes and the send algorithm have been carefully tuned to: + * + * - Minimize the risk of sending too much data and making the mISDN drivers unhappy, by + * sending PH_DATA_REQ only when there is as much data available as we have received in + * the PH_DATA_IND. + * + * - Avoid deadlocks between ftdm_write() trying to fill an almust full socket buffer and + * the I/O thread not having enough data to send a PH_DATA_REQ message. + * (The write() call will return EAGAIN since there is not ehough space free to send all audio data.) + * + * \param thread FreeTDM thread handle + * \param data Private data pointer passed to ftdm_thread_create_detached() (the span object) + * \return Always returns NULL (unused) + * + * \note + * ftdm_span_start/_stop() locks the span mutex, + * use direct access to span members to avoid deadlocking + * + * \todo + * Move D-Channel handling into the I/O thread too. + * Use custom ring buffer structures instead of socketpairs + * (for even more fine grained size control). + */ +static void *misdn_span_run(ftdm_thread_t *thread, void *data) +{ + ftdm_span_t *span = data; + struct misdn_span_private *priv = ftdm_span_io_private(span); + struct epoll_event evh; + int epfd = -1; + int ret; + int i; + + ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread initializing\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + /* Use epoll for event handling */ + epfd = epoll_create(1); + if (epfd < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to create epoll context: %s\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno)); + goto error; + } + + ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding event pipe to epoll context\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + /* Add event pipe */ + evh.events = EPOLLIN | EPOLLPRI | EPOLLERR; + evh.data.fd = priv->event_pipe_out; + + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, priv->event_pipe_out, &evh); + if (ret < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add event pipe to epoll context: %s\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno)); + goto error; + } + + ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channels to epoll context\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + /* Add b-channels */ + for (i = 1; i <= span->chan_count; i++) { + ftdm_channel_t *chan = span->channels[i]; + ftdm_assert(chan, "channel == NULL"); + + if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B) + continue; + + ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channel [%d:%d] to epoll context\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), + ftdm_channel_get_id(chan), ftdm_channel_get_ph_id(chan)); + + evh.events = EPOLLIN | EPOLLPRI | EPOLLERR; + evh.data.ptr = chan; + + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, chan->sockfd, &evh); + if (ret < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add b-channel [%d] socket to epoll context: %s\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno)); + goto error; + } + } + + ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread started\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + /* Notify world we're running */ + priv->running = 1; + pthread_cond_signal(&priv->ctrl_cond); + + while (priv->running > 0) { + struct epoll_event ev[10]; + int timeout_ms = MISDN_EPOLL_WAIT_MAX_MSEC; + + ret = epoll_wait(epfd, ev, ftdm_array_len(ev), timeout_ms); + if (ret < 0) { + switch (errno) { + case EAGAIN: + case EINTR: + continue; + default: + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] epoll_wait() failed: %s\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno)); + goto error; + } + } + /* Check events */ + for (i = 0; i < ret; i++) { + /* */ + if (ev[i].data.fd == priv->event_pipe_out) { + struct misdn_command cmd; + /* event pipe */ + ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] event pipe notification\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + ret = read(priv->event_pipe_out, &cmd, sizeof(cmd)); + if (ret < sizeof(cmd)) { + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to read span thread command\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + continue; + } + + switch (cmd.type) { + case MISDN_CMD_STOP: + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got STOP command\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + priv->running = -1; + break; + default: + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got unknown command: %d\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), cmd.type); + } + + } else { + ftdm_channel_t *chan = ev[i].data.ptr; + handle_b_channel_event(chan); + } + } + } +error: + ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread stopped\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + /* Remove epoll event sources */ + for (i = 1; i <= span->chan_count; i++) { + ftdm_channel_t *chan = span->channels[i]; + ftdm_assert(chan, "channel == NULL"); + + if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B) + continue; + + ret = epoll_ctl(epfd, EPOLL_CTL_DEL, chan->sockfd, NULL); + if (ret < 0) { + ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to remove b-channel [%d] socket from epoll context: %s\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno)); + } + } + + /* Close epoll context */ + if (epfd >= 0) close(epfd); + + /* Notify world we stopped running */ + priv->running = 0; + pthread_cond_signal(&priv->ctrl_cond); + return NULL; +} + +/** + * Timeout (miliseconds) for span start/stop completion + */ +#define SPAN_DEFAULT_TIMEOUT_MSEC 10000 + +static FIO_SPAN_START_FUNCTION(misdn_span_start) +{ + struct misdn_span_private *span_priv = ftdm_span_io_private(span); + struct timespec timeout; + int retval; + + ftdm_log(FTDM_LOG_NOTICE, "mISDN starting span %d (%s)\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + span_priv->running = 0; + + if (ftdm_thread_create_detached(misdn_span_run, span) != FTDM_SUCCESS) { + ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s)\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + return FTDM_FAIL; + } + + /* + * Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to start up + */ + clock_gettime(CLOCK_REALTIME, &timeout); + ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC); + + pthread_mutex_lock(&span_priv->ctrl_cond_mutex); + + retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout); + if (retval == ETIMEDOUT) { + ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s) in 10 seconds\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + return FTDM_FAIL; + } else if (retval) { + ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s): %s\n", + ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno)); + return FTDM_FAIL; + } + + pthread_mutex_unlock(&span_priv->ctrl_cond_mutex); + return FTDM_SUCCESS; +} + +static FIO_SPAN_STOP_FUNCTION(misdn_span_stop) +{ + struct misdn_span_private *span_priv = ftdm_span_io_private(span); + struct timespec timeout; + struct misdn_command cmd; + int retval; + + ftdm_log(FTDM_LOG_NOTICE, "mISDN stopping span %d (%s)\n", + ftdm_span_get_id(span), ftdm_span_get_name(span)); + + span_priv->running = -1; + + /* Wake up thread */ + cmd.type = MISDN_CMD_STOP; + retval = write(span_priv->event_pipe_in, &cmd, sizeof(cmd)); + if (retval < sizeof(cmd)) { + ftdm_log(FTDM_LOG_WARNING, "mISDN failed to send STOP command to span thread\n"); + } + + /* + * Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to shut down + */ + clock_gettime(CLOCK_REALTIME, &timeout); + ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC); + + pthread_mutex_lock(&span_priv->ctrl_cond_mutex); + + retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout); + if (retval == ETIMEDOUT) { + ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread in 10 seconds\n"); + return FTDM_FAIL; + } else if (retval) { + ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread: %s\n", + strerror(errno)); + return FTDM_FAIL; + } + + pthread_mutex_unlock(&span_priv->ctrl_cond_mutex); + return FTDM_SUCCESS; +} + + /** * \brief ftmod_misdn interface */ -//static const ftdm_io_interface_t misdn_interface = { static const ftdm_io_interface_t misdn_interface = { .name = "misdn", @@ -2080,8 +2643,12 @@ static const ftdm_io_interface_t misdn_interface = { .get_alarms = misdn_get_alarms, .configure = misdn_configure, /* configure global parameters */ .configure_span = misdn_configure_span, /* assign channels to span */ + .channel_next_event = misdn_channel_next_event, .channel_destroy = misdn_channel_destroy, /* clean up channel */ .span_destroy = misdn_span_destroy, /* clean up span */ + + .span_start = misdn_span_start, + .span_stop = misdn_span_stop, };