diff --git a/src/switch_core_media_bug.c b/src/switch_core_media_bug.c index 08f2ee30d7..a8af7d8f97 100644 --- a/src/switch_core_media_bug.c +++ b/src/switch_core_media_bug.c @@ -185,9 +185,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b uint32_t blen; switch_codec_implementation_t read_impl = { 0 }; int16_t *tp; - switch_size_t do_read = 0, do_write = 0; - int fill_read = 0, fill_write = 0; - + switch_size_t do_read = 0, do_write = 0, has_read = 0, has_write = 0, fill_read = 0, fill_write = 0; switch_core_session_get_read_impl(bug->session, &read_impl); @@ -213,50 +211,47 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b frame->datalen = 0; if (switch_test_flag(bug, SMBF_READ_STREAM)) { + has_read = 1; switch_mutex_lock(bug->read_mutex); do_read = switch_buffer_inuse(bug->raw_read_buffer); switch_mutex_unlock(bug->read_mutex); } if (switch_test_flag(bug, SMBF_WRITE_STREAM)) { + has_write = 1; switch_mutex_lock(bug->write_mutex); do_write = switch_buffer_inuse(bug->raw_write_buffer); switch_mutex_unlock(bug->write_mutex); } + if (bug->record_frame_size && bug->record_pre_buffer_max && (do_read || do_write) && bug->record_pre_buffer_count < bug->record_pre_buffer_max) { bug->record_pre_buffer_count++; return SWITCH_STATUS_FALSE; } else { uint32_t frame_size; switch_codec_implementation_t read_impl = { 0 }; - //switch_codec_implementation_t other_read_impl = { 0 }; - //switch_core_session_t *other_session; - + switch_core_session_get_read_impl(bug->session, &read_impl); frame_size = read_impl.decoded_bytes_per_packet; bug->record_frame_size = frame_size; - -#if 0 - if (do_read && do_write) { - if (switch_core_session_get_partner(bug->session, &other_session) == SWITCH_STATUS_SUCCESS) { - switch_core_session_get_read_impl(other_session, &other_read_impl); - switch_core_session_rwunlock(other_session); + } - if (read_impl.actual_samples_per_second == other_read_impl.actual_samples_per_second) { - if (read_impl.decoded_bytes_per_packet < other_read_impl.decoded_bytes_per_packet) { - frame_size = read_impl.decoded_bytes_per_packet; - } - } else { - if (read_impl.decoded_bytes_per_packet > other_read_impl.decoded_bytes_per_packet) { - frame_size = read_impl.decoded_bytes_per_packet; - } - } - } + if (bug->record_frame_size && do_write > do_read && do_write > (bug->record_frame_size * 2)) { + switch_mutex_lock(bug->write_mutex); + switch_buffer_toss(bug->raw_write_buffer, bug->record_frame_size); + do_write = switch_buffer_inuse(bug->raw_write_buffer); + switch_mutex_unlock(bug->write_mutex); + } - bug->record_frame_size = bytes = frame_size; - } -#endif + + + if ((has_read && !do_read)) { + fill_read = 1; + } + + if ((has_write && !do_write)) { + fill_write = 1; } @@ -274,10 +269,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b } } - fill_read = !do_read; - fill_write = !do_write; - - if ((fill_read && fill_write) || (!fill && fill_read)) { + if ((fill_read && fill_write) || (fill && (fill_read || fill_write))) { return SWITCH_STATUS_FALSE; } @@ -384,17 +376,26 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b frame->rate = read_impl.actual_samples_per_second; frame->codec = NULL; - if (fill_read && fill_write) { - return SWITCH_STATUS_BREAK; - } - - if (fill_read || fill_write) { - return SWITCH_STATUS_BREAK; + if (switch_test_flag(bug, SMBF_STEREO)) { + frame->datalen *= 2; + frame->channels = 2; } memcpy(bug->session->recur_buffer, frame->data, frame->datalen); bug->session->recur_buffer_len = frame->datalen; - + + if (has_read) { + switch_mutex_lock(bug->read_mutex); + do_read = switch_buffer_inuse(bug->raw_read_buffer); + switch_mutex_unlock(bug->read_mutex); + } + + if (has_write) { + switch_mutex_lock(bug->write_mutex); + do_write = switch_buffer_inuse(bug->raw_write_buffer); + switch_mutex_unlock(bug->write_mutex); + } + return SWITCH_STATUS_SUCCESS; } diff --git a/src/switch_ivr_async.c b/src/switch_ivr_async.c index 25ec4d7285..21180d789f 100644 --- a/src/switch_ivr_async.c +++ b/src/switch_ivr_async.c @@ -1044,6 +1044,10 @@ struct record_helper { switch_bool_t hangup_on_error; switch_codec_implementation_t read_impl; switch_bool_t speech_detected; + switch_buffer_t *thread_buffer; + switch_thread_t *thread; + switch_mutex_t *buffer_mutex; + int thread_ready; const char *completion_cause; }; @@ -1110,6 +1114,55 @@ static void send_record_stop_event(switch_channel_t *channel, switch_codec_imple } } +static void *SWITCH_THREAD_FUNC recording_thread(switch_thread_t *thread, void *obj) +{ + switch_media_bug_t *bug = (switch_media_bug_t *) obj; + switch_core_session_t *session = switch_core_media_bug_get_session(bug); + switch_channel_t *channel = switch_core_session_get_channel(session); + struct record_helper *rh; + switch_size_t bsize = SWITCH_RECOMMENDED_BUFFER_SIZE, samples = 0, inuse = 0; + unsigned char *data = switch_core_session_alloc(session, bsize); + int channels = switch_core_media_bug_test_flag(bug, SMBF_STEREO) ? 2 : 1; + + if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { + return NULL; + } + + rh = switch_core_media_bug_get_user_data(bug); + switch_buffer_create_dynamic(&rh->thread_buffer, 1024 * 512, 1024 * 64, 0); + rh->thread_ready = 1; + + while(switch_test_flag(rh->fh, SWITCH_FILE_OPEN)) { + switch_mutex_lock(rh->buffer_mutex); + inuse = switch_buffer_inuse(rh->thread_buffer); + + if (rh->thread_ready && switch_channel_up_nosig(channel) && inuse < bsize) { + switch_mutex_unlock(rh->buffer_mutex); + switch_yield(20000); + continue; + } else if ((!rh->thread_ready || switch_channel_down_nosig(channel)) && !inuse) { + break; + } + + samples = switch_buffer_read(rh->thread_buffer, data, bsize) / 2 / channels; + switch_mutex_unlock(rh->buffer_mutex); + + if (switch_core_file_write(rh->fh, data, &samples) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file); + /* File write failed */ + set_completion_cause(rh, "uri-failure"); + if (rh->hangup_on_error) { + switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); + switch_core_session_reset(session, SWITCH_TRUE, SWITCH_TRUE); + } + } + } + + switch_core_session_rwunlock(session); + + return NULL; +} + static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type) { switch_core_session_t *session = switch_core_media_bug_get_session(bug); @@ -1123,18 +1176,40 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s switch (type) { case SWITCH_ABC_TYPE_INIT: - if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file); - switch_channel_event_set_data(channel, event); - switch_event_fire(&event); + { + const char *var = switch_channel_get_variable(channel, "RECORD_USE_THREAD"); + + if (zstr(var) || switch_true(var)) { + switch_threadattr_t *thd_attr = NULL; + switch_memory_pool_t *pool = switch_core_session_get_pool(session); + int sanity = 200; + + + switch_core_session_get_read_impl(session, &rh->read_impl); + switch_mutex_init(&rh->buffer_mutex, SWITCH_MUTEX_NESTED, pool); + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&rh->thread, thd_attr, recording_thread, bug, pool); + + while(--sanity > 0 && !rh->thread_ready) { + switch_yield(10000); + } + } + + + if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) { + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file); + switch_channel_event_set_data(channel, event); + switch_event_fire(&event); + } + + rh->silence_time = switch_micro_time_now(); + rh->silence_timeout_ms = rh->initial_timeout_ms; + rh->speech_detected = SWITCH_FALSE; + rh->completion_cause = NULL; + + switch_core_session_get_read_impl(session, &rh->read_impl); } - rh->silence_time = switch_micro_time_now(); - rh->silence_timeout_ms = rh->initial_timeout_ms; - rh->speech_detected = SWITCH_FALSE; - rh->completion_cause = NULL; - - switch_core_session_get_read_impl(session, &rh->read_impl); - break; case SWITCH_ABC_TYPE_TAP_NATIVE_READ: { @@ -1232,6 +1307,18 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; switch_frame_t frame = { 0 }; + if (rh->thread_ready) { + switch_status_t st; + + rh->thread_ready = 0; + switch_thread_join(&st, rh->thread); + } + + if (rh->thread_buffer) { + switch_buffer_destroy(&rh->thread_buffer); + } + + frame.data = data; frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; @@ -1313,18 +1400,24 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; switch_frame_t frame = { 0 }; switch_status_t status; + int i = 0; frame.data = data; frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; for (;;) { - status = switch_core_media_bug_read(bug, &frame, SWITCH_FALSE); + status = switch_core_media_bug_read(bug, &frame, i++ == 0 ? SWITCH_FALSE : SWITCH_TRUE); - if (status == SWITCH_STATUS_SUCCESS || status == SWITCH_STATUS_BREAK) { - - len = (switch_size_t) frame.datalen / 2; - - if (len && switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) { + if (status != SWITCH_STATUS_SUCCESS || !frame.datalen) { + break; + } else { + len = (switch_size_t) frame.datalen / 2 / frame.channels; + + if (rh->thread_buffer) { + switch_mutex_lock(rh->buffer_mutex); + switch_buffer_write(rh->thread_buffer, mask ? null_data : data, frame.datalen); + switch_mutex_unlock(rh->buffer_mutex); + } else if (switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file); /* File write failed */ set_completion_cause(rh, "uri-failure"); @@ -1377,8 +1470,6 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s rh->speech_detected = SWITCH_TRUE; } } - } else { - break; } } } @@ -2269,7 +2360,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_record_session(switch_core_session_t } rh->hangup_on_error = hangup_on_error; - + if ((status = switch_core_media_bug_add(session, "session_record", file, record_callback, rh, to, flags, &bug)) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error adding media bug for file %s\n", file); diff --git a/src/switch_time.c b/src/switch_time.c index f710105031..c226b90592 100644 --- a/src/switch_time.c +++ b/src/switch_time.c @@ -1046,6 +1046,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime) tfd = -1; } } + + if (tfd > -1) MATRIX = 0; } #else tfd = -1;