add mutex to deal with small race

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@14769 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2009-09-04 18:34:52 +00:00
parent 107f913598
commit 53f09acbec
1 changed files with 86 additions and 64 deletions

View File

@ -313,9 +313,10 @@ struct conference_member {
uint32_t score;
uint32_t score_iir;
switch_mutex_t *flag_mutex;
switch_mutex_t *control_mutex;
switch_mutex_t *write_mutex;
switch_mutex_t *audio_in_mutex;
switch_mutex_t *audio_out_mutex;
switch_mutex_t *read_mutex;
switch_codec_implementation_t orig_read_impl;
switch_codec_t read_codec;
switch_codec_t write_codec;
@ -419,6 +420,10 @@ static switch_status_t conf_api_sub_undeaf(conference_member_t *member, switch_s
static switch_status_t conference_add_event_data(conference_obj_t *conference, switch_event_t *event);
static switch_status_t conference_add_event_member_data(conference_member_t *member, switch_event_t *event);
#define lock_member(_member) switch_mutex_lock(_member->write_mutex); switch_mutex_lock(_member->read_mutex)
#define unlock_member(_member) switch_mutex_unlock(_member->read_mutex); switch_mutex_unlock(_member->write_mutex)
static switch_status_t conference_add_event_data(conference_obj_t *conference, switch_event_t *event)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
@ -478,8 +483,8 @@ static conference_relationship_t *member_get_relationship(conference_member_t *m
if (member == NULL || other_member == NULL || member->relationships == NULL)
return NULL;
switch_mutex_lock(member->control_mutex);
switch_mutex_lock(other_member->control_mutex);
lock_member(member);
lock_member(other_member);
for (rel = member->relationships; rel; rel = rel->next) {
if (rel->id == other_member->id) {
@ -492,8 +497,8 @@ static conference_relationship_t *member_get_relationship(conference_member_t *m
}
}
switch_mutex_unlock(other_member->control_mutex);
switch_mutex_unlock(member->control_mutex);
unlock_member(other_member);
unlock_member(member);
return rel ? rel : global;
}
@ -557,10 +562,10 @@ static conference_relationship_t *member_add_relationship(conference_member_t *m
rel->id = id;
switch_mutex_lock(member->control_mutex);
lock_member(member);
rel->next = member->relationships;
member->relationships = rel;
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
return rel;
}
@ -574,7 +579,7 @@ static switch_status_t member_del_relationship(conference_member_t *member, uint
if (member == NULL || id == 0)
return status;
switch_mutex_lock(member->control_mutex);
lock_member(member);
for (rel = member->relationships; rel; rel = rel->next) {
if (rel->id == id) {
/* we just forget about rel here cos it was allocated by the member's pool
@ -588,7 +593,7 @@ static switch_status_t member_del_relationship(conference_member_t *member, uint
}
last = rel;
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
return status;
}
@ -608,7 +613,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
switch_mutex_lock(conference->mutex);
switch_mutex_lock(member->audio_in_mutex);
switch_mutex_lock(member->audio_out_mutex);
switch_mutex_lock(member->control_mutex);
lock_member(member);
switch_mutex_lock(conference->member_mutex);
switch_clear_flag(conference, CFLAG_DESTRUCT);
@ -703,7 +708,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
}
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_mutex_unlock(member->audio_out_mutex);
switch_mutex_unlock(member->audio_in_mutex);
@ -725,18 +730,18 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe
switch_assert(conference != NULL);
switch_assert(member != NULL);
switch_mutex_lock(member->control_mutex);
lock_member(member);
member_fnode = member->fnode;
member_sh = member->sh;
member->fnode = NULL;
member->sh = NULL;
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_mutex_lock(conference->mutex);
switch_mutex_lock(conference->member_mutex);
switch_mutex_lock(member->audio_in_mutex);
switch_mutex_lock(member->audio_out_mutex);
switch_mutex_lock(member->control_mutex);
lock_member(member);
switch_clear_flag(member, MFLAG_INTREE);
for (imember = conference->members; imember; imember = imember->next) {
@ -832,7 +837,7 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe
}
}
switch_mutex_unlock(conference->member_mutex);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_mutex_unlock(member->audio_out_mutex);
switch_mutex_unlock(member->audio_in_mutex);
switch_mutex_unlock(conference->mutex);
@ -1407,7 +1412,7 @@ static void conference_loop_fn_energy_up(conference_member_t *member, caller_con
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->energy_level += 200;
if (member->energy_level > 3000) {
member->energy_level = 3000;
@ -1420,7 +1425,7 @@ static void conference_loop_fn_energy_up(conference_member_t *member, caller_con
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->energy_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Energy level %d", member->energy_level);
conference_member_say(member, msg, 0);
@ -1434,7 +1439,7 @@ static void conference_loop_fn_energy_equ_conf(conference_member_t *member, call
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->energy_level = member->conference->energy_level;
if (test_eflag(member->conference, EFLAG_ENERGY_LEVEL) &&
@ -1444,7 +1449,7 @@ static void conference_loop_fn_energy_equ_conf(conference_member_t *member, call
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->energy_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Energy level %d", member->energy_level);
conference_member_say(member, msg, 0);
@ -1458,7 +1463,7 @@ static void conference_loop_fn_energy_dn(conference_member_t *member, caller_con
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->energy_level -= 100;
if (member->energy_level < 0) {
member->energy_level = 0;
@ -1471,7 +1476,7 @@ static void conference_loop_fn_energy_dn(conference_member_t *member, caller_con
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->energy_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Energy level %d", member->energy_level);
conference_member_say(member, msg, 0);
@ -1485,7 +1490,7 @@ static void conference_loop_fn_volume_talk_up(conference_member_t *member, calle
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_out_level++;
switch_normalize_volume(member->volume_out_level);
@ -1496,7 +1501,7 @@ static void conference_loop_fn_volume_talk_up(conference_member_t *member, calle
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_out_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Volume level %d", member->volume_out_level);
conference_member_say(member, msg, 0);
@ -1510,7 +1515,7 @@ static void conference_loop_fn_volume_talk_zero(conference_member_t *member, cal
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_out_level = 0;
if (test_eflag(member->conference, EFLAG_VOLUME_LEVEL) &&
@ -1520,7 +1525,7 @@ static void conference_loop_fn_volume_talk_zero(conference_member_t *member, cal
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_out_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Volume level %d", member->volume_out_level);
conference_member_say(member, msg, 0);
@ -1534,7 +1539,7 @@ static void conference_loop_fn_volume_talk_dn(conference_member_t *member, calle
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_out_level--;
switch_normalize_volume(member->volume_out_level);
@ -1545,7 +1550,7 @@ static void conference_loop_fn_volume_talk_dn(conference_member_t *member, calle
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_out_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Volume level %d", member->volume_out_level);
conference_member_say(member, msg, 0);
@ -1559,7 +1564,7 @@ static void conference_loop_fn_volume_listen_up(conference_member_t *member, cal
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_in_level++;
switch_normalize_volume(member->volume_in_level);
@ -1570,7 +1575,7 @@ static void conference_loop_fn_volume_listen_up(conference_member_t *member, cal
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_in_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Gain level %d", member->volume_in_level);
conference_member_say(member, msg, 0);
@ -1584,7 +1589,7 @@ static void conference_loop_fn_volume_listen_zero(conference_member_t *member, c
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_in_level = 0;
if (test_eflag(member->conference, EFLAG_GAIN_LEVEL) &&
@ -1594,7 +1599,7 @@ static void conference_loop_fn_volume_listen_zero(conference_member_t *member, c
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_in_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Gain level %d", member->volume_in_level);
conference_member_say(member, msg, 0);
@ -1608,7 +1613,7 @@ static void conference_loop_fn_volume_listen_dn(conference_member_t *member, cal
if (member == NULL)
return;
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_in_level--;
switch_normalize_volume(member->volume_in_level);
@ -1619,7 +1624,7 @@ static void conference_loop_fn_volume_listen_dn(conference_member_t *member, cal
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "New-Level", "%d", member->volume_in_level);
switch_event_fire(&event);
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
switch_snprintf(msg, sizeof(msg), "Gain level %d", member->volume_in_level);
conference_member_say(member, msg, 0);
@ -1766,9 +1771,11 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v
while (switch_test_flag(member, MFLAG_RUNNING) && switch_channel_ready(channel)) {
if (switch_channel_test_app_flag(channel, CF_APP_TAGGED)) {
switch_mutex_lock(member->read_mutex);
if (switch_channel_ready(channel) && switch_channel_test_app_flag(channel, CF_APP_TAGGED)) {
switch_yield(100000);
continue;
goto do_continue;
}
@ -1777,7 +1784,7 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v
/* end the loop, if appropriate */
if (!SWITCH_READ_ACCEPTABLE(status) || !switch_test_flag(member, MFLAG_RUNNING)) {
break;
goto do_break;
}
if (switch_test_flag(read_frame, SFF_CNG)) {
@ -1798,7 +1805,7 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v
}
}
}
continue;
goto do_continue;
}
energy_level = member->energy_level;
@ -1940,12 +1947,21 @@ static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, v
ok = switch_buffer_write(member->audio_buffer, data, datalen);
switch_mutex_unlock(member->audio_in_mutex);
if (!ok) {
break;
goto do_break;
}
}
}
do_continue:
switch_mutex_unlock(member->read_mutex);
}
do_break:
switch_mutex_unlock(member->read_mutex);
switch_resample_destroy(&member->read_resampler);
switch_clear_flag_locked(member, MFLAG_ITHREAD);
@ -2123,12 +2139,14 @@ static void conference_loop_output(conference_member_t *member)
switch_size_t file_sample_len = csamples;
switch_size_t file_data_len = file_sample_len * 2;
switch_mutex_lock(member->write_mutex);
if (switch_test_flag(member, MFLAG_RESTART)) {
switch_mutex_unlock(member->write_mutex);
goto top;
}
switch_mutex_lock(member->control_mutex);
if (switch_core_session_dequeue_event(member->session, &event, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS) {
if (event->event_id == SWITCH_EVENT_MESSAGE) {
char *from = switch_event_get_header(event, "from");
@ -2330,7 +2348,7 @@ static void conference_loop_output(conference_member_t *member)
switch_clear_flag_locked(member, MFLAG_FLUSH_BUFFER);
}
switch_mutex_unlock(member->control_mutex);
switch_mutex_unlock(member->write_mutex);
if (switch_core_session_private_event_count(member->session)) {
@ -2347,7 +2365,8 @@ static void conference_loop_output(conference_member_t *member)
switch_cond_next();
}
} /* Rinse ... Repeat */
} /* Rinse ... Repeat */
if (member->digit_stream != NULL) {
switch_ivr_digit_stream_destroy(&member->digit_stream);
@ -2415,10 +2434,11 @@ static void *SWITCH_THREAD_FUNC conference_record_thread_run(switch_thread_t *th
member->mux_frame = switch_core_alloc(member->pool, member->frame_size);
switch_mutex_init(&member->control_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->write_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->flag_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->audio_in_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->audio_out_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->read_mutex, SWITCH_MUTEX_NESTED, rec->pool);
/* Setup an audio buffer for the incoming audio */
if (switch_buffer_create_dynamic(&member->audio_buffer, CONF_DBLOCK_SIZE, CONF_DBUFFER_SIZE, 0) != SWITCH_STATUS_SUCCESS) {
@ -2575,7 +2595,7 @@ static uint32_t conference_member_stop_file(conference_member_t *member, file_st
if (member == NULL)
return count;
switch_mutex_lock(member->control_mutex);
lock_member(member);
if (stop == FILE_STOP_ALL) {
for (nptr = member->fnode; nptr; nptr = nptr->next) {
@ -2589,7 +2609,7 @@ static uint32_t conference_member_stop_file(conference_member_t *member, file_st
}
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
return count;
}
@ -2610,10 +2630,10 @@ static void conference_send_all_dtmf(conference_member_t *member, conference_obj
const char *p;
for (p = dtmf; p && *p; p++) {
switch_dtmf_t digit = { *p, SWITCH_DEFAULT_DTMF_DURATION};
switch_mutex_lock(imember->control_mutex);
lock_member(imember);
switch_core_session_kill_channel(imember->session, SWITCH_SIG_BREAK);
switch_core_session_send_dtmf(imember->session, &digit);
switch_mutex_unlock(imember->control_mutex);
unlock_member(imember);
}
}
}
@ -2802,14 +2822,14 @@ static switch_status_t conference_member_play_file(conference_member_t *member,
fnode->file = switch_core_strdup(fnode->pool, file);
/* Queue the node */
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(member->session), SWITCH_LOG_DEBUG, "Queueing file '%s' for play\n", file);
switch_mutex_lock(member->control_mutex);
lock_member(member);
for (nptr = member->fnode; nptr && nptr->next; nptr = nptr->next);
if (nptr) {
nptr->next = fnode;
} else {
member->fnode = fnode;
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
status = SWITCH_STATUS_SUCCESS;
done:
@ -2867,7 +2887,7 @@ static switch_status_t conference_member_say(conference_member_t *member, char *
}
/* Queue the node */
switch_mutex_lock(member->control_mutex);
lock_member(member);
for (nptr = member->fnode; nptr && nptr->next; nptr = nptr->next);
if (nptr) {
@ -2893,7 +2913,7 @@ static switch_status_t conference_member_say(conference_member_t *member, char *
}
switch_core_speech_feed_tts(fnode->sh, text, &flags);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
status = SWITCH_STATUS_SUCCESS;
@ -3196,12 +3216,12 @@ static switch_status_t conf_api_sub_kick(conference_member_t *member, switch_str
if (member == NULL)
return SWITCH_STATUS_GENERR;
switch_mutex_lock(member->control_mutex);
lock_member(member);
switch_clear_flag(member, MFLAG_RUNNING);
switch_set_flag_locked(member, MFLAG_KICKED);
switch_core_session_kill_channel(member->session, SWITCH_SIG_BREAK);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
if (stream != NULL) {
stream->write_function(stream, "OK kicked %u\n", member->id);
}
@ -3231,10 +3251,10 @@ static switch_status_t conf_api_sub_dtmf(conference_member_t *member, switch_str
return SWITCH_STATUS_GENERR;
}
switch_mutex_lock(member->control_mutex);
lock_member(member);
switch_core_session_kill_channel(member->session, SWITCH_SIG_BREAK);
switch_core_session_send_dtmf_string(member->session, (char *) data);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
if (stream != NULL) {
stream->write_function(stream, "OK sent %s to %u\n", (char *) data, member->id);
@ -3259,9 +3279,9 @@ static switch_status_t conf_api_sub_energy(conference_member_t *member, switch_s
return SWITCH_STATUS_GENERR;
if (data) {
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->energy_level = atoi((char *) data);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
}
if (stream != NULL) {
stream->write_function(stream, "Energy %u = %d\n", member->id, member->energy_level);
@ -3285,10 +3305,10 @@ static switch_status_t conf_api_sub_volume_in(conference_member_t *member, switc
return SWITCH_STATUS_GENERR;
if (data) {
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_in_level = atoi((char *) data);
switch_normalize_volume(member->volume_in_level);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
}
if (stream != NULL) {
stream->write_function(stream, "Volume IN %u = %d\n", member->id, member->volume_in_level);
@ -3312,10 +3332,10 @@ static switch_status_t conf_api_sub_volume_out(conference_member_t *member, swit
return SWITCH_STATUS_GENERR;
if (data) {
switch_mutex_lock(member->control_mutex);
lock_member(member);
member->volume_out_level = atoi((char *) data);
switch_normalize_volume(member->volume_out_level);
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
}
if (stream != NULL) {
stream->write_function(stream, "Volume OUT %u = %d\n", member->id, member->volume_out_level);
@ -3986,7 +4006,8 @@ static switch_status_t conf_api_sub_transfer(conference_obj_t *conference, switc
}
/* move the member from the old conference to the new one */
switch_mutex_lock(member->control_mutex);
lock_member(member);
if (conference != new_conference) {
conference_del_member(conference, member);
conference_add_member(new_conference, member);
@ -4001,7 +4022,7 @@ static switch_status_t conf_api_sub_transfer(conference_obj_t *conference, switc
}
}
switch_mutex_unlock(member->control_mutex);
unlock_member(member);
stream->write_function(stream, "OK Member '%d' sent to conference %s.\n", member->id, argv[2]);
@ -5252,7 +5273,8 @@ SWITCH_STANDARD_APP(conference_function)
/* Prepare MUTEXS */
member.id = next_member_id();
switch_mutex_init(&member.flag_mutex, SWITCH_MUTEX_NESTED, member.pool);
switch_mutex_init(&member.control_mutex, SWITCH_MUTEX_NESTED, member.pool);
switch_mutex_init(&member.write_mutex, SWITCH_MUTEX_NESTED, member.pool);
switch_mutex_init(&member.read_mutex, SWITCH_MUTEX_NESTED, member.pool);
switch_mutex_init(&member.audio_in_mutex, SWITCH_MUTEX_NESTED, member.pool);
switch_mutex_init(&member.audio_out_mutex, SWITCH_MUTEX_NESTED, member.pool);