fix thread saftey issue in expiring media bugs

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@15587 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2009-11-21 01:00:58 +00:00
parent ae7aa36f61
commit 68f60de67f
4 changed files with 83 additions and 52 deletions

View File

@ -217,6 +217,7 @@ SWITCH_DECLARE(void) switch_core_media_bug_set_read_replace_frame(_In_ switch_me
\return SWITCH_STATUS_SUCCESS if the operation was a success \return SWITCH_STATUS_SUCCESS if the operation was a success
*/ */
SWITCH_DECLARE(switch_status_t) switch_core_media_bug_remove(_In_ switch_core_session_t *session, _Inout_ switch_media_bug_t **bug); SWITCH_DECLARE(switch_status_t) switch_core_media_bug_remove(_In_ switch_core_session_t *session, _Inout_ switch_media_bug_t **bug);
SWITCH_DECLARE(uint32_t) switch_core_media_bug_prune(switch_core_session_t *session);
/*! /*!
\brief Remove media bug callback \brief Remove media bug callback

View File

@ -1141,7 +1141,8 @@ typedef enum {
SMBF_READ_PING = (1 << 4), SMBF_READ_PING = (1 << 4),
SMBF_STEREO = (1 << 5), SMBF_STEREO = (1 << 5),
SMBF_ANSWER_REQ = (1 << 6), SMBF_ANSWER_REQ = (1 << 6),
SMBF_THREAD_LOCK = (1 << 7) SMBF_THREAD_LOCK = (1 << 7),
SMBF_PRUNE = (1 << 8)
} switch_media_bug_flag_enum_t; } switch_media_bug_flag_enum_t;
typedef uint32_t switch_media_bug_flag_t; typedef uint32_t switch_media_bug_flag_t;

View File

@ -330,13 +330,19 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi
} }
if (session->bugs && !switch_channel_test_flag(session->channel, CF_PAUSE_BUGS)) { if (session->bugs && !switch_channel_test_flag(session->channel, CF_PAUSE_BUGS)) {
switch_media_bug_t *bp, *dp, *last = NULL; switch_media_bug_t *bp;
switch_bool_t ok = SWITCH_TRUE; switch_bool_t ok = SWITCH_TRUE;
switch_thread_rwlock_rdlock(session->bug_rwlock); switch_thread_rwlock_rdlock(session->bug_rwlock);
int prune = 0;
for (bp = session->bugs; bp; bp = bp->next) { for (bp = session->bugs; bp; bp = bp->next) {
if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) { if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) {
continue; continue;
} }
if (switch_test_flag(bp, SMBF_PRUNE)) {
prune++;
continue;
}
if (bp->ready && switch_test_flag(bp, SMBF_READ_STREAM)) { if (bp->ready && switch_test_flag(bp, SMBF_READ_STREAM)) {
switch_mutex_lock(bp->read_mutex); switch_mutex_lock(bp->read_mutex);
@ -358,28 +364,17 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi
} }
} }
if (bp->stop_time && bp->stop_time <= switch_epoch_time_now(NULL)) { if ((bp->stop_time && bp->stop_time <= switch_epoch_time_now(NULL)) || ok == SWITCH_FALSE) {
ok = SWITCH_FALSE; switch_set_flag(bp, SMBF_PRUNE);
prune++;
} }
if (ok == SWITCH_FALSE) {
bp->ready = 0;
if (last) {
last->next = bp->next;
} else {
session->bugs = bp->next;
}
dp = bp;
bp = last;
switch_core_media_bug_close(&dp);
if (!bp) {
break;
}
continue;
}
last = bp;
} }
switch_thread_rwlock_unlock(session->bug_rwlock); switch_thread_rwlock_unlock(session->bug_rwlock);
if (prune) {
switch_core_media_bug_prune(session);
}
} }
if (do_bugs) { if (do_bugs) {
@ -496,14 +491,20 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi
switch_set_flag((*frame), SFF_CNG); switch_set_flag((*frame), SFF_CNG);
} }
if (session->bugs && !switch_channel_test_flag(session->channel, CF_PAUSE_BUGS)) { if (session->bugs && !switch_channel_test_flag(session->channel, CF_PAUSE_BUGS)) {
switch_media_bug_t *bp, *dp, *last = NULL; switch_media_bug_t *bp;
switch_bool_t ok = SWITCH_TRUE; switch_bool_t ok = SWITCH_TRUE;
int prune = 0;
switch_thread_rwlock_rdlock(session->bug_rwlock); switch_thread_rwlock_rdlock(session->bug_rwlock);
for (bp = session->bugs; bp; bp = bp->next) { for (bp = session->bugs; bp; bp = bp->next) {
if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) { if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) {
continue; continue;
} }
if (switch_test_flag(bp, SMBF_PRUNE)) {
prune++;
continue;
}
if (bp->ready && switch_test_flag(bp, SMBF_READ_PING)) { if (bp->ready && switch_test_flag(bp, SMBF_READ_PING)) {
switch_mutex_lock(bp->read_mutex); switch_mutex_lock(bp->read_mutex);
if (bp->callback) { if (bp->callback) {
@ -516,23 +517,14 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi
} }
if (ok == SWITCH_FALSE) { if (ok == SWITCH_FALSE) {
bp->ready = 0; switch_set_flag(bp, SMBF_PRUNE);
if (last) { prune++;
last->next = bp->next;
} else {
session->bugs = bp->next;
}
dp = bp;
bp = last;
switch_core_media_bug_close(&dp);
if (!bp) {
break;
}
continue;
} }
last = bp;
} }
switch_thread_rwlock_unlock(session->bug_rwlock); switch_thread_rwlock_unlock(session->bug_rwlock);
if (prune) {
switch_core_media_bug_prune(session);
}
} }
} }
@ -760,7 +752,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess
if (session->bugs && !switch_channel_test_flag(session->channel, CF_PAUSE_BUGS)) { if (session->bugs && !switch_channel_test_flag(session->channel, CF_PAUSE_BUGS)) {
switch_media_bug_t *bp, *dp, *last = NULL; switch_media_bug_t *bp;
int prune = 0;
switch_thread_rwlock_rdlock(session->bug_rwlock); switch_thread_rwlock_rdlock(session->bug_rwlock);
for (bp = session->bugs; bp; bp = bp->next) { for (bp = session->bugs; bp; bp = bp->next) {
@ -773,6 +766,11 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess
continue; continue;
} }
if (switch_test_flag(bp, SMBF_PRUNE)) {
prune++;
continue;
}
if (switch_test_flag(bp, SMBF_WRITE_STREAM)) { if (switch_test_flag(bp, SMBF_WRITE_STREAM)) {
switch_mutex_lock(bp->write_mutex); switch_mutex_lock(bp->write_mutex);
@ -800,23 +798,14 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess
if (ok == SWITCH_FALSE) { if (ok == SWITCH_FALSE) {
bp->ready = 0; switch_set_flag(bp, SMBF_PRUNE);
if (last) { prune++;
last->next = bp->next;
} else {
session->bugs = bp->next;
}
dp = bp;
bp = last;
switch_core_media_bug_close(&dp);
if (!bp) {
break;
}
continue;
} }
last = bp;
} }
switch_thread_rwlock_unlock(session->bug_rwlock); switch_thread_rwlock_unlock(session->bug_rwlock);
if (prune) {
switch_core_media_bug_prune(session);
}
} }
if (do_bugs) { if (do_bugs) {

View File

@ -437,6 +437,46 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_remove(switch_core_session
} }
SWITCH_DECLARE(uint32_t) switch_core_media_bug_prune(switch_core_session_t *session)
{
switch_media_bug_t *bp = NULL, *last = NULL;
switch_status_t status = SWITCH_STATUS_FALSE;
int ttl = 0;
top:
if (session->bugs) {
switch_thread_rwlock_wrlock(session->bug_rwlock);
for (bp = session->bugs; bp; bp = bp->next) {
if (switch_core_media_bug_test_flag(bp, SMBF_PRUNE)) {
if (last) {
last->next = bp->next;
} else {
session->bugs = bp->next;
}
break;
}
last = bp;
}
switch_thread_rwlock_unlock(session->bug_rwlock);
if (bp) {
status = switch_core_media_bug_close(&bp);
ttl++;
goto top;
}
}
if (!session->bugs && switch_core_codec_ready(&session->bug_codec)) {
switch_core_codec_destroy(&session->bug_codec);
memset(&session->bug_codec, 0, sizeof(session->bug_codec));
}
return ttl;
}
SWITCH_DECLARE(switch_status_t) switch_core_media_bug_remove_callback(switch_core_session_t *session, switch_media_bug_callback_t callback) SWITCH_DECLARE(switch_status_t) switch_core_media_bug_remove_callback(switch_core_session_t *session, switch_media_bug_callback_t callback)
{ {
switch_media_bug_t *cur = NULL, *bp = NULL, *last = NULL; switch_media_bug_t *cur = NULL, *bp = NULL, *last = NULL;