mod_portaudio: create the actual shared stream

This commit is contained in:
Moises Silva 2011-03-19 19:23:11 -04:00
parent e4b24e841c
commit 877b4cf53b
2 changed files with 269 additions and 62 deletions

View File

@ -47,6 +47,7 @@
#define SWITCH_PA_CALL_ID_VARIABLE "pa_call_id"
#define MIN_STREAM_SAMPLE_RATE 8000
#define STREAM_SAMPLES_PER_PACKET(stream) ((stream->codec_ms * stream->sample_rate) / 1000)
SWITCH_MODULE_LOAD_FUNCTION(mod_portaudio_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_portaudio_shutdown);
@ -82,24 +83,6 @@ typedef enum {
TFLAG_AUTO_ANSWER = (1 << 10)
} TFLAGS;
struct private_object {
unsigned int flags;
switch_core_session_t *session;
switch_caller_profile_t *caller_profile;
char call_id[50];
int sample_rate;
int codec_ms;
switch_mutex_t *flag_mutex;
char *hold_file;
switch_file_handle_t fh;
switch_file_handle_t *hfh;
switch_frame_t hold_frame;
unsigned char holdbuf[SWITCH_RECOMMENDED_BUFFER_SIZE];
struct private_object *next;
};
typedef struct private_object private_t;
struct audio_stream {
int indev;
int outdev;
@ -119,16 +102,21 @@ typedef struct _shared_audio_stream_t {
int codec_ms;
/*! The PA input device */
int indev;
/*! Input channels being used */
uint8_t inchan_used[MAX_IO_CHANNELS];
/*! The PA output device */
int outdev;
/*! Output channels being used */
uint8_t outchan_used[MAX_IO_CHANNELS];
/*! How many channels to create (for both indev and outdev) */
int channels;
/*! The io stream helper to buffer audio */
PABLIO_Stream *stream;
/*! How often to write */
switch_timer_t write_timer;
/* It can be shared after all :-) */
switch_mutex_t *mutex;
} shared_audio_stream_t;
typedef struct private_object private_t;
/* Endpoint that can be called via portaudio/endpoint/<endpoint-name> */
typedef struct _audio_endpoint {
/*! Friendly name for this endpoint */
@ -145,8 +133,35 @@ typedef struct _audio_endpoint {
/*! Channel index within the output stream where we get the audio for this endpoint */
int outchan;
/*! Associated private information if involved in a call */
private_t *master;
/*! For timed writes */
switch_timer_t write_timer;
/*! Let's be safe */
switch_mutex_t *mutex;
} audio_endpoint_t;
struct private_object {
unsigned int flags;
switch_core_session_t *session;
switch_caller_profile_t *caller_profile;
char call_id[50];
int sample_rate;
int codec_ms;
switch_mutex_t *flag_mutex;
char *hold_file;
switch_file_handle_t fh;
switch_file_handle_t *hfh;
switch_frame_t hold_frame;
unsigned char holdbuf[SWITCH_RECOMMENDED_BUFFER_SIZE];
audio_endpoint_t *audio_endpoint;
struct private_object *next;
};
static struct {
int debug;
int port;
@ -434,6 +449,7 @@ static audio_stream_t* find_audio_stream(int indev, int outdev, int already_lock
}
return NULL;
}
static void destroy_audio_streams()
{
int close_wait = 4;
@ -447,6 +463,7 @@ static void destroy_audio_streams()
}
globals.destroying_streams = 0;
}
static switch_status_t validate_main_audio_stream()
{
if (globals.read_timer.timer_interface) {
@ -949,6 +966,80 @@ switch_io_routines_t portaudio_io_routines = {
/*.receive_message */ channel_receive_message
};
static int create_shared_audio_stream(shared_audio_stream_t *stream);
static int destroy_shared_audio_stream(shared_audio_stream_t *stream);
static int take_stream_channel(shared_audio_stream_t *stream, int index, int input)
{
int rc = 0;
if (!stream) {
return rc;
}
switch_mutex_lock(stream->mutex);
if (!stream->stream && create_shared_audio_stream(stream)) {
rc = -1;
goto done;
}
if (input) {
if (stream->inchan_used[index]) {
rc = -1;
goto done;
}
stream->inchan_used[index] = 1;
} else {
if (!input && stream->outchan_used[index]) {
rc = -1;
goto done;
}
stream->outchan_used[index] = 1;
}
done:
switch_mutex_unlock(stream->mutex);
return rc;
}
static int release_stream_channel(shared_audio_stream_t *stream, int index, int input)
{
int i = 0;
int destroy_stream = 1;
int rc = 0;
if (!stream) {
return rc;
}
switch_mutex_lock(stream->mutex);
if (input) {
if (stream->inchan_used[index]) {
rc = -1;
goto done;
}
stream->inchan_used[index] = 1;
} else {
if (!input && stream->outchan_used[index]) {
rc = -1;
goto done;
}
stream->outchan_used[index] = 1;
}
for (i = 0; i < stream->channels; i++) {
if (stream->inchan_used[i] || stream->outchan_used[i]) {
destroy_stream = 0;
}
}
if (destroy_stream) {
destroy_shared_audio_stream(stream);
}
done:
switch_mutex_unlock(stream->mutex);
return rc;
}
/* Make sure when you have 2 sessions in the same scope that you pass the appropriate one to the routines
that allocate memory or you will have 1 channel with memory allocated from another channel's pool!
*/
@ -957,51 +1048,93 @@ static switch_call_cause_t channel_outgoing_channel(switch_core_session_t *sessi
switch_core_session_t **new_session, switch_memory_pool_t **pool, switch_originate_flag_t flags,
switch_call_cause_t *cancel_cause)
{
char name[128];
const char *id = NULL;
private_t *tech_pvt = NULL;
switch_channel_t *channel = NULL;
switch_caller_profile_t *caller_profile = NULL;
switch_call_cause_t retcause = SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
if ((*new_session = switch_core_session_request(portaudio_endpoint_interface, SWITCH_CALL_DIRECTION_OUTBOUND, flags, pool)) != 0) {
private_t *tech_pvt;
switch_channel_t *channel;
switch_caller_profile_t *caller_profile;
switch_core_session_add_stream(*new_session, NULL);
if ((tech_pvt = (private_t *) switch_core_session_alloc(*new_session, sizeof(private_t))) != 0) {
memset(tech_pvt, 0, sizeof(*tech_pvt));
switch_mutex_init(&tech_pvt->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(*new_session));
channel = switch_core_session_get_channel(*new_session);
switch_core_session_set_private(*new_session, tech_pvt);
tech_pvt->session = *new_session;
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(*new_session), SWITCH_LOG_CRIT, "Hey where is my memory pool?\n");
switch_core_session_destroy(new_session);
return SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
}
if (outbound_profile) {
char name[128];
const char *id = !zstr(outbound_profile->caller_id_number) ? outbound_profile->caller_id_number : "na";
switch_snprintf(name, sizeof(name), "portaudio/%s", id);
switch_channel_set_name(channel, name);
caller_profile = switch_caller_profile_clone(*new_session, outbound_profile);
switch_channel_set_caller_profile(channel, caller_profile);
tech_pvt->caller_profile = caller_profile;
if (outbound_profile->destination_number && !strcasecmp(outbound_profile->destination_number, "auto_answer")) {
switch_set_flag(tech_pvt, TFLAG_AUTO_ANSWER);
}
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(*new_session), SWITCH_LOG_ERROR, "Doh! no caller profile\n");
switch_core_session_destroy(new_session);
return SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
}
switch_set_flag_locked(tech_pvt, TFLAG_OUTBOUND);
switch_channel_set_state(channel, CS_INIT);
return SWITCH_CAUSE_SUCCESS;
if (!outbound_profile) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing caller profile\n");
return retcause;
}
return SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
if (!(*new_session = switch_core_session_request(portaudio_endpoint_interface, SWITCH_CALL_DIRECTION_OUTBOUND, flags, pool))) {
return retcause;
}
switch_core_session_add_stream(*new_session, NULL);
if ((tech_pvt = (private_t *) switch_core_session_alloc(*new_session, sizeof(private_t))) != 0) {
memset(tech_pvt, 0, sizeof(*tech_pvt));
switch_mutex_init(&tech_pvt->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(*new_session));
channel = switch_core_session_get_channel(*new_session);
switch_core_session_set_private(*new_session, tech_pvt);
tech_pvt->session = *new_session;
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(*new_session), SWITCH_LOG_CRIT, "Hey where is my memory pool?\n");
switch_core_session_destroy(new_session);
return retcause;
}
if (outbound_profile->destination_number && !strncasecmp(outbound_profile->destination_number, "endpoint", sizeof("endpoint"))) {
audio_endpoint_t *endpoint = NULL;
char *endpoint_name = switch_core_strdup(outbound_profile->pool, outbound_profile->destination_number);
endpoint_name = strchr(endpoint_name, '/');
if (!endpoint_name) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(*new_session), SWITCH_LOG_CRIT, "No portaudio endpoint specified\n");
goto error;
}
endpoint = switch_core_hash_find(globals.endpoints, endpoint_name);
if (!endpoint) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(*new_session), SWITCH_LOG_CRIT, "Invalid portaudio endpoint %s\n", endpoint_name);
goto error;
}
/* check that there is no call there yet */
switch_mutex_lock(endpoint->mutex);
if (endpoint->master) {
switch_mutex_unlock(endpoint->mutex);
retcause = SWITCH_CAUSE_USER_BUSY;
goto error;
}
/* try to acquire the stream */
if (take_stream_channel(endpoint->in_stream, endpoint->inchan, 1)) {
switch_mutex_unlock(endpoint->mutex);
retcause = SWITCH_CAUSE_USER_BUSY;
goto error;
}
if (take_stream_channel(endpoint->out_stream, endpoint->outchan, 0)) {
release_stream_channel(endpoint->in_stream, endpoint->inchan, 1);
switch_mutex_unlock(endpoint->mutex);
retcause = SWITCH_CAUSE_USER_BUSY;
goto error;
}
switch_snprintf(name, sizeof(name), "portaudio/endpoint-%s", endpoint_name);
switch_set_flag(tech_pvt, TFLAG_AUTO_ANSWER);
endpoint->master = tech_pvt;
tech_pvt->audio_endpoint = endpoint;
switch_mutex_unlock(endpoint->mutex);
} else {
id = !zstr(outbound_profile->caller_id_number) ? outbound_profile->caller_id_number : "na";
switch_snprintf(name, sizeof(name), "portaudio/%s", id);
if (outbound_profile->destination_number && !strcasecmp(outbound_profile->destination_number, "auto_answer")) {
switch_set_flag(tech_pvt, TFLAG_AUTO_ANSWER);
}
}
switch_channel_set_name(channel, name);
caller_profile = switch_caller_profile_clone(*new_session, outbound_profile);
switch_channel_set_caller_profile(channel, caller_profile);
tech_pvt->caller_profile = caller_profile;
switch_set_flag_locked(tech_pvt, TFLAG_OUTBOUND);
switch_channel_set_state(channel, CS_INIT);
return SWITCH_CAUSE_SUCCESS;
error:
if (new_session && *new_session) {
switch_core_session_destroy(new_session);
}
return retcause;
}
@ -1146,6 +1279,7 @@ static switch_status_t load_streams(switch_xml_t streams)
if (!stream) {
continue;
}
switch_mutex_init(&stream->mutex, SWITCH_MUTEX_NESTED, module_pool);
stream->indev = -1;
stream->outdev = -1;
stream->sample_rate = globals.sample_rate;
@ -1250,6 +1384,14 @@ static shared_audio_stream_t *check_stream(char *streamstr, int check_input, int
if (cnum < 0 || cnum > stream->channels) {
return NULL;
}
if (check_input && stream->indev < 0) {
return NULL;
}
if (!check_input && stream->outdev < 0) {
return NULL;
}
*chanindex = cnum;
@ -1282,6 +1424,7 @@ static switch_status_t load_endpoints(switch_xml_t endpoints)
if (!endpoint) {
continue;
}
switch_mutex_init(&endpoint->mutex, SWITCH_MUTEX_NESTED, module_pool);
endpoint->inchan = -1;
endpoint->outchan = -1;
switch_snprintf(endpoint->name, sizeof(endpoint->name), "%s", endpoint_name);
@ -1983,6 +2126,7 @@ static switch_status_t switch_audio_stream()
return SWITCH_STATUS_SUCCESS;
}
PaError open_audio_stream(PABLIO_Stream **stream, const PaStreamParameters * inputParameters, const PaStreamParameters * outputParameters)
{
if (inputParameters->device != -1) {
@ -1991,6 +2135,68 @@ PaError open_audio_stream(PABLIO_Stream **stream, const PaStreamParameters * inp
return OpenAudioStream(stream, NULL, outputParameters, globals.sample_rate, paClipOff, globals.read_codec.implementation->samples_per_packet, 0);
}
PaError open_shared_audio_stream(shared_audio_stream_t *shstream, const PaStreamParameters * inputParameters, const PaStreamParameters * outputParameters)
{
PaError err;
if (inputParameters->device != -1) {
err = OpenAudioStream(&shstream->stream, inputParameters, outputParameters, shstream->sample_rate,
paClipOff, STREAM_SAMPLES_PER_PACKET(shstream), globals.dual_streams);
} else {
err = OpenAudioStream(&shstream->stream, NULL, outputParameters, shstream->sample_rate,
paClipOff, STREAM_SAMPLES_PER_PACKET(shstream), 0);
}
if (err != paNoError) {
shstream->stream = NULL;
}
return err;
}
static int create_shared_audio_stream(shared_audio_stream_t *shstream)
{
PaStreamParameters inputParameters, outputParameters;
PaError err;
switch_event_t *event;
inputParameters.device = shstream->indev;
if (shstream->indev != -1) {
inputParameters.channelCount = shstream->channels;
inputParameters.sampleFormat = SAMPLE_TYPE;
inputParameters.suggestedLatency = Pa_GetDeviceInfo(inputParameters.device)->defaultLowInputLatency;
inputParameters.hostApiSpecificStreamInfo = NULL;
}
outputParameters.device = shstream->outdev;
outputParameters.channelCount = shstream->channels;
outputParameters.sampleFormat = SAMPLE_TYPE;
outputParameters.suggestedLatency = Pa_GetDeviceInfo(outputParameters.device)->defaultLowOutputLatency;
outputParameters.hostApiSpecificStreamInfo = NULL;
err = open_shared_audio_stream(shstream, &inputParameters, &outputParameters);
if (err != paNoError) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening audio device retrying\n");
switch_yield(1000000);
err = open_shared_audio_stream(shstream, &inputParameters, &outputParameters);
}
if (err != paNoError) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't open audio device\n");
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, MY_EVENT_ERROR_AUDIO_DEV) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Reason", Pa_GetErrorText(err));
switch_event_fire(&event);
}
return -1;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Created audio stream: %d channels %d\n",
shstream->sample_rate, shstream->channels);
return 0;
}
static int destroy_shared_audio_stream(shared_audio_stream_t *shstream)
{
CloseAudioStream(shstream->stream);
shstream->stream = NULL;
return 0;
}
static audio_stream_t *create_audio_stream(int indev, int outdev)
{
PaStreamParameters inputParameters, outputParameters;

View File

@ -411,3 +411,4 @@ PaError CloseAudioStream(PABLIO_Stream * aStream)
return paNoError;
}