diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 65a8523b65..09032c4cb9 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -237,6 +237,12 @@ typedef struct conf_xml_cfg { switch_xml_t controls; } conf_xml_cfg_t; +struct vid_helper { + conference_member_t *member_a; + conference_member_t *member_b; + int up; +}; + /* Conference Object */ typedef struct conference_obj { char *name; @@ -322,6 +328,7 @@ typedef struct conference_obj { switch_time_t start_time; switch_time_t end_time; char *log_dir; + struct vid_helper vh[2]; } conference_obj_t; /* Relationship with another member */ @@ -1140,50 +1147,52 @@ static switch_status_t conference_del_member(conference_obj_t *conference, confe return status; } -struct vid_helper { - conference_member_t *member_a; - conference_member_t *member_b; - int up; -}; - /* Thread bridging video between two members, there will be two threads if video briding is used */ static void *SWITCH_THREAD_FUNC conference_video_bridge_thread_run(switch_thread_t *thread, void *obj) { struct vid_helper *vh = obj; - switch_channel_t *channel_a = switch_core_session_get_channel(vh->member_a->session); - switch_channel_t *channel_b = switch_core_session_get_channel(vh->member_b->session); + switch_core_session_t *session_a = vh->member_a->session; + switch_core_session_t *session_b = vh->member_b->session; + switch_channel_t *channel_a = switch_core_session_get_channel(session_a); + switch_channel_t *channel_b = switch_core_session_get_channel(session_b); switch_status_t status; switch_frame_t *read_frame; + conference_obj_t *conference = vh->member_a->conference; + + switch_thread_rwlock_rdlock(conference->rwlock); + switch_thread_rwlock_rdlock(vh->member_a->rwlock); + switch_thread_rwlock_rdlock(vh->member_b->rwlock); + /* Acquire locks for both sessions so the helper object and member structures don't get destroyed before we exit */ - if (switch_core_session_read_lock(vh->member_a->session) != SWITCH_STATUS_SUCCESS) { - return NULL; - } + switch_core_session_read_lock(session_a); + switch_core_session_read_lock(session_b); - if (switch_core_session_read_lock(vh->member_b->session) != SWITCH_STATUS_SUCCESS) { - switch_core_session_rwunlock(vh->member_a->session); - return NULL; - } - vh->up = 1; - while (switch_test_flag(vh->member_a, MFLAG_RUNNING) && switch_test_flag(vh->member_b, MFLAG_RUNNING) && + while (vh->up == 1 && switch_test_flag(vh->member_a, MFLAG_RUNNING) && switch_test_flag(vh->member_b, MFLAG_RUNNING) && switch_channel_ready(channel_a) && switch_channel_ready(channel_b)) { - status = switch_core_session_read_video_frame(vh->member_a->session, &read_frame, SWITCH_IO_FLAG_NONE, 0); - if (!SWITCH_READ_ACCEPTABLE(status)) { + + status = switch_core_session_read_video_frame(session_a, &read_frame, SWITCH_IO_FLAG_NONE, 0); + if (!SWITCH_READ_ACCEPTABLE(status)) { + break; + } + + if (!switch_test_flag(read_frame, SFF_CNG)) { + if (switch_core_session_write_video_frame(session_b, read_frame, SWITCH_IO_FLAG_NONE, 0) != SWITCH_STATUS_SUCCESS) { break; } - - if (!switch_test_flag(read_frame, SFF_CNG)) { - if (switch_core_session_write_video_frame(vh->member_b->session, read_frame, SWITCH_IO_FLAG_NONE, 0) != SWITCH_STATUS_SUCCESS) { - break; - } - } + } } - - switch_core_session_rwunlock(vh->member_a->session); - switch_core_session_rwunlock(vh->member_b->session); + + switch_thread_rwlock_unlock(vh->member_b->rwlock); + switch_thread_rwlock_unlock(vh->member_a->rwlock); + + switch_core_session_rwunlock(session_a); + switch_core_session_rwunlock(session_b); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s video thread ended.\n", switch_channel_get_name(channel_a)); + + switch_thread_rwlock_unlock(conference->rwlock); vh->up = 0; return NULL; @@ -1434,10 +1443,14 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v } - if (members_with_video && conference->video_running != 1) { - if (!switch_test_flag(conference, CFLAG_VIDEO_BRIDGE)) { - launch_conference_video_thread(conference); - } else if (video_bridge_members[0] && video_bridge_members[1]){ + if (members_with_video) { + if (conference->video_running != 1) { + if (!switch_test_flag(conference, CFLAG_VIDEO_BRIDGE)) { + launch_conference_video_thread(conference); + } + } + + if (conference->vh[0].up != 1 && conference->vh[1].up != 1 && video_bridge_members[0] && video_bridge_members[1]){ launch_conference_video_bridge_thread(video_bridge_members[0], video_bridge_members[1]); } } @@ -1741,6 +1754,18 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v switch_mutex_unlock(conference->member_mutex); switch_mutex_unlock(conference->mutex); + if (conference->vh[0].up == 1) { + conference->vh[0].up = -1; + } + + if (conference->vh[1].up == 1) { + conference->vh[1].up = -1; + } + + while (conference->vh[0].up || conference->vh[1].up) { + switch_cond_next(); + } + if (conference->video_running == 1) { conference->video_running = -1; while (conference->video_running) { @@ -5325,10 +5350,6 @@ SWITCH_STANDARD_API(conf_api_main) conference_obj_t *conference = NULL; if ((conference = conference_find(argv[0]))) { - if (switch_thread_rwlock_tryrdlock(conference->rwlock) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Read Lock Fail\n"); - goto done; - } if (argc >= 2) { conf_api_dispatch(conference, stream, argc, argv, cmd, 1); } else { @@ -5369,7 +5390,7 @@ SWITCH_STANDARD_API(conf_api_main) } } - done: + switch_safe_free(lbuf); return status; @@ -6112,6 +6133,7 @@ SWITCH_STANDARD_APP(conference_function) } if ((conference = conference_find(conf_name))) { + switch_thread_rwlock_unlock(conference->rwlock); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Conference %s already exists!\n", conf_name); goto done; } @@ -6203,19 +6225,21 @@ SWITCH_STANDARD_APP(conference_function) /* Indicate the conference is dynamic */ switch_set_flag_locked(conference, CFLAG_DYNAMIC); + /* acquire a read lock on the thread so it can't leave without us */ + if (switch_thread_rwlock_tryrdlock(conference->rwlock) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Read Lock Fail\n"); + goto done; + } + + rl++; + /* Start the conference thread for this conference */ launch_conference_thread(conference); } else { /* setup user variable */ switch_channel_set_variable(channel, "conference_name", conference->name); + rl++; } - /* acquire a read lock on the thread so it can't leave without us */ - if (switch_thread_rwlock_tryrdlock(conference->rwlock) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Read Lock Fail\n"); - goto done; - } - rl++; - if (zstr(dpin) && conference->pin) { dpin = conference->pin; } @@ -6543,19 +6567,33 @@ static void launch_conference_video_thread(conference_obj_t *conference) /* Create a video thread for the conference and launch it */ static void launch_conference_video_bridge_thread(conference_member_t *member_a, conference_member_t *member_b) { - switch_memory_pool_t *pool = member_a->conference->pool; - struct vid_helper *vh = switch_core_alloc(pool, 2 * sizeof *vh); - - vh[0].member_a = member_a; - vh[0].member_b = member_b; - - vh[1].member_a = member_b; - vh[1].member_b = member_a; - - launch_thread_detached(conference_video_bridge_thread_run, pool, &vh[0]); - launch_thread_detached(conference_video_bridge_thread_run, pool, &vh[1]); + conference_obj_t *conference = member_a->conference; + switch_memory_pool_t *pool = conference->pool; + int sanity = 10000; - member_a->conference->video_running = 1; + memset(conference->vh, 0, sizeof(conference->vh)); + + conference->vh[0].member_a = member_a; + conference->vh[0].member_b = member_b; + + conference->vh[1].member_a = member_b; + conference->vh[1].member_b = member_a; + + launch_thread_detached(conference_video_bridge_thread_run, pool, &conference->vh[0]); + launch_thread_detached(conference_video_bridge_thread_run, pool, &conference->vh[1]); + + while(!(conference->vh[0].up && conference->vh[1].up) && --sanity > 0) { + switch_cond_next(); + } + + if (conference->vh[0].up == 1 && conference->vh[1].up != 1) { + conference->vh[0].up = -1; + } + + if (conference->vh[1].up == 1 && conference->vh[0].up != 1) { + conference->vh[1].up = -1; + } + } static void launch_conference_record_thread(conference_obj_t *conference, char *path) @@ -6643,6 +6681,7 @@ static switch_status_t chat_send(switch_event_t *message_event) switch_core_chat_send_args(proto, CONF_CHAT_PROTO, to, hint && strchr(hint, '/') ? hint : from, "", stream.data, NULL, NULL); switch_safe_free(stream.data); + switch_thread_rwlock_unlock(conference->rwlock); return SWITCH_STATUS_SUCCESS; } @@ -6659,6 +6698,12 @@ static conference_obj_t *conference_find(char *name) conference = NULL; } } + if (conference) { + if (switch_thread_rwlock_tryrdlock(conference->rwlock) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Read Lock Fail\n"); + conference = NULL; + } + } switch_mutex_unlock(globals.hash_mutex); return conference; @@ -7260,6 +7305,7 @@ static void pres_event_handler(switch_event_t *event) switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "call-direction", conference->count == 1 ? "outbound" : "inbound"); switch_event_fire(&event); } + switch_thread_rwlock_unlock(conference->rwlock); } else if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "login", conf_name);