FS-7500: add a framebuffer to reuse memory and use it to offload frame writing from video muxing thread to a dedicated write thread

This commit is contained in:
Anthony Minessale 2015-03-03 16:30:26 -06:00 committed by Michael Jerris
parent 5ac63b9250
commit 0d34e8ac77
7 changed files with 280 additions and 32 deletions

View File

@ -1525,7 +1525,8 @@ typedef enum {
SFF_RAW_RTP_PARSE_FRAME = (1 << 13),
SFF_PICTURE_RESET = (1 << 14),
SFF_SAME_IMAGE = (1 << 15),
SFF_USE_VIDEO_TIMESTAMP = (1 << 16)
SFF_USE_VIDEO_TIMESTAMP = (1 << 16),
SFF_ENCODED = (1 << 17)
} switch_frame_flag_enum_t;
typedef uint32_t switch_frame_flag_t;
@ -2525,6 +2526,9 @@ typedef struct switch_vb_s switch_vb_t;
struct switch_img_txt_handle_s;
typedef struct switch_img_txt_handle_s switch_img_txt_handle_t;
struct switch_frame_buffer_s;
typedef struct switch_frame_buffer_s switch_frame_buffer_t;
SWITCH_END_EXTERN_C
#endif
/* For Emacs:

View File

@ -1222,6 +1222,11 @@ SWITCH_DECLARE(void) switch_http_dump_request(switch_http_request_t *request);
SWITCH_DECLARE(void) switch_http_parse_qs(switch_http_request_t *request, char *qs);
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP);
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone);
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP);
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP);
SWITCH_END_EXTERN_C
#endif
/* For Emacs:

View File

@ -412,6 +412,7 @@ typedef struct mcu_canvas_s {
switch_rgb_color_t bgcolor;
switch_mutex_t *mutex;
switch_timer_t timer;
switch_frame_buffer_t *fb;
switch_memory_pool_t *pool;
} mcu_canvas_t;
@ -622,6 +623,8 @@ struct conference_member {
char *kicked_sound;
switch_queue_t *dtmf_queue;
switch_queue_t *video_queue;
switch_queue_t *mux_out_queue;
switch_thread_t *video_muxing_write_thread;
switch_thread_t *input_thread;
cJSON *json;
cJSON *status_field;
@ -713,6 +716,7 @@ static switch_status_t conference_outcall_bg(conference_obj_t *conference,
SWITCH_STANDARD_APP(conference_function);
static void launch_conference_video_muxing_thread(conference_obj_t *conference);
static void launch_conference_thread(conference_obj_t *conference);
static void launch_conference_video_muxing_write_thread(conference_member_t *member);
static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, void *obj);
static switch_status_t conference_local_play_file(conference_obj_t *conference, switch_core_session_t *session, char *path, uint32_t leadin, void *buf,
uint32_t buflen);
@ -1475,6 +1479,7 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code
frame = &write_frame;
frame->img = codec_set->frame.img;
frame->packet = codec_set->frame.packet;
frame->packetlen = codec_set->frame.packetlen;
switch_clear_flag(frame, SFF_SAME_IMAGE);
frame->m = 0;
@ -1507,6 +1512,8 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code
switch_mutex_lock(conference->member_mutex);
for (imember = conference->members; imember; imember = imember->next) {
switch_frame_t *dupframe;
if (switch_test_flag(imember, MFLAG_NO_MINIMIZE_ENCODING)) {
continue;
}
@ -1524,7 +1531,15 @@ static void write_canvas_image_to_codec_group(conference_obj_t *conference, code
switch_core_session_request_video_refresh(imember->session);
}
switch_core_session_write_encoded_video_frame(imember->session, frame, 0, 0);
//switch_core_session_write_encoded_video_frame(imember->session, frame, 0, 0);
switch_set_flag(frame, SFF_ENCODED);
if (switch_frame_buffer_dup(conference->canvas->fb, frame, &dupframe) == SWITCH_STATUS_SUCCESS) {
switch_queue_push(imember->mux_out_queue, dupframe);
dupframe = NULL;
}
switch_clear_flag(frame, SFF_ENCODED);
switch_core_session_rwunlock(imember->session);
}
@ -1568,6 +1583,39 @@ static void vmute_snap(conference_member_t *member, switch_bool_t clear)
}
}
static void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_t *thread, void *obj)
{
conference_member_t *member = (conference_member_t *) obj;
void *pop;
while(switch_test_flag(member, MFLAG_RUNNING) || switch_queue_size(member->mux_out_queue)) {
switch_frame_t *frame;
if (switch_test_flag(member, MFLAG_RUNNING)) {
if (switch_queue_pop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) {
if (!pop) continue;
frame = (switch_frame_t *) pop;
if (switch_test_flag(frame, SFF_ENCODED)) {
switch_core_session_write_encoded_video_frame(member->session, frame, 0, 0);
} else {
switch_core_session_write_video_frame(member->session, frame, SWITCH_IO_FLAG_NONE, 0);
}
switch_frame_buffer_free(member->conference->canvas->fb, &frame);
}
} else {
if (switch_queue_trypop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) {
if (pop) {
frame = (switch_frame_t *) pop;
switch_frame_buffer_free(member->conference->canvas->fb, &frame);
}
}
}
}
return NULL;
}
static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thread, void *obj)
{
conference_obj_t *conference = (conference_obj_t *) obj;
@ -1575,7 +1623,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
video_layout_t *vlayout = NULL;
switch_codec_t *check_codec = NULL;
codec_set_t *write_codecs[MAX_MUX_CODECS] = { 0 };
int buflen = SWITCH_RECOMMENDED_BUFFER_SIZE * 2;
int buflen = SWITCH_RTP_MAX_BUF_LEN;
int i = 0;
int used = 0;
uint32_t video_key_freq = 10000000;
@ -1587,6 +1635,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
switch_image_t *write_img = NULL, *file_img = NULL;
uint32_t timestamp = 0;
if (conference->video_layout_group) {
lg = switch_core_hash_find(conference->layout_group_hash, conference->video_layout_group);
vlayout = find_best_layout(conference, lg);
@ -1602,10 +1651,11 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
}
init_canvas(conference, vlayout);
switch_frame_buffer_create(&conference->canvas->fb);
conference->video_timer_reset = 1;
packet = switch_core_alloc(conference->pool, SWITCH_RECOMMENDED_BUFFER_SIZE);
packet = switch_core_alloc(conference->pool, SWITCH_RTP_MAX_BUF_LEN);
while (globals.running && !switch_test_flag(conference, CFLAG_DESTRUCT) && switch_test_flag(conference, CFLAG_VIDEO_MUXING)) {
switch_bool_t need_refresh = SWITCH_FALSE, need_keyframe = SWITCH_FALSE, need_reset = SWITCH_FALSE;
@ -1685,7 +1735,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
write_codecs[i]->frame.packet = switch_core_alloc(conference->pool, buflen);
write_codecs[i]->frame.data = ((uint8_t *)write_codecs[i]->frame.packet) + 12;
write_codecs[i]->frame.packetlen = 0;
write_codecs[i]->frame.packetlen = buflen;
write_codecs[i]->frame.buflen = buflen - 12;
switch_set_flag((&write_codecs[i]->frame), SFF_RAW_RTP);
@ -1877,6 +1927,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
switch_mutex_lock(conference->member_mutex);
for (imember = conference->members; imember; imember = imember->next) {
switch_frame_t *dupframe;
if (switch_test_flag(conference, CFLAG_MINIMIZE_VIDEO_ENCODING) && !switch_test_flag(imember, MFLAG_NO_MINIMIZE_ENCODING)) {
continue;
@ -1898,12 +1949,17 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
switch_set_flag(&write_frame, SFF_RAW_RTP);
write_frame.img = write_img;
write_frame.packet = packet;
write_frame.data = packet + 12;
write_frame.datalen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12;
write_frame.buflen = write_frame.datalen;
write_frame.packetlen = SWITCH_RECOMMENDED_BUFFER_SIZE;
write_frame.data = ((uint8_t *)packet) + 12;
write_frame.datalen = 0;
write_frame.buflen = SWITCH_RTP_MAX_BUF_LEN - 12;
write_frame.packetlen = 0;
switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0);
//switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0);
if (switch_frame_buffer_dup(conference->canvas->fb, &write_frame, &dupframe) == SWITCH_STATUS_SUCCESS) {
switch_queue_push(imember->mux_out_queue, dupframe);
dupframe = NULL;
}
if (imember->session) {
switch_core_session_rwunlock(imember->session);
@ -1939,7 +1995,7 @@ static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread
}
switch_core_timer_destroy(&conference->canvas->timer);
switch_frame_buffer_destroy(&conference->canvas->fb);
destroy_canvas(&conference->canvas);
return NULL;
@ -3663,9 +3719,7 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
member->video_codec_index = -1;
switch_queue_create(&member->dtmf_queue, 100, member->pool);
if (conference->video_layout_name) {
switch_queue_create(&member->video_queue, 2000, member->pool);
}
conference->members = member;
switch_set_flag_locked(member, MFLAG_INTREE);
switch_mutex_unlock(conference->member_mutex);
@ -6498,6 +6552,10 @@ static void conference_loop_output(conference_member_t *member)
if (member->input_thread) {
switch_thread_join(&st, member->input_thread);
}
if (member->video_muxing_write_thread) {
switch_queue_push(member->mux_out_queue, NULL);
switch_thread_join(&st, member->video_muxing_write_thread);
}
}
switch_core_timer_destroy(&timer);
@ -11584,6 +11642,12 @@ SWITCH_STANDARD_APP(conference_function)
conference->min = 2;
}
if (conference->video_layout_name) {
switch_queue_create(&member.video_queue, 2000, member.pool);
switch_queue_create(&member.mux_out_queue, 2000, member.pool);
launch_conference_video_muxing_write_thread(&member);
}
/* Add the caller to the conference */
if (conference_add_member(conference, &member) != SWITCH_STATUS_SUCCESS) {
switch_core_codec_destroy(&member.read_codec);
@ -11703,7 +11767,18 @@ SWITCH_STANDARD_APP(conference_function)
}
/* Create a thread for the conference and launch it */
static void launch_conference_video_muxing_write_thread(conference_member_t *member)
{
switch_threadattr_t *thd_attr = NULL;
switch_mutex_lock(globals.hash_mutex);
if (!member->video_muxing_write_thread) {
switch_threadattr_create(&thd_attr, member->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&member->video_muxing_write_thread, thd_attr, conference_video_muxing_write_thread_run, member, member->pool);
}
switch_mutex_unlock(globals.hash_mutex);
}
static void launch_conference_video_muxing_thread(conference_obj_t *conference)
{
switch_threadattr_t *thd_attr = NULL;

View File

@ -323,7 +323,7 @@ SWITCH_STANDARD_APP(play_fsv_function)
vid_frame.codec = &vid_codec;
vid_frame.packet = vid_buffer;
vid_frame.data = vid_buffer + 12;
vid_frame.data = ((uint8_t *)vid_buffer) + 12;
vid_frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12;
switch_set_flag((&vid_frame), SFF_RAW_RTP);
// switch_set_flag((&vid_frame), SFF_PROXY_PACKET);
@ -397,7 +397,7 @@ SWITCH_STANDARD_APP(play_fsv_function)
vid_frame.m = hdr->m;
vid_frame.timestamp = ts;
vid_frame.data = data + 12;
vid_frame.data = ((uint8_t *)data) + 12;
vid_frame.datalen = vid_frame.packetlen - 12;
switch_core_session_write_video_frame(session, &vid_frame, SWITCH_IO_FLAG_NONE, 0);
}
@ -577,7 +577,7 @@ SWITCH_STANDARD_APP(play_yuv_function)
vid_frame.codec = codec;
vid_frame.packet = vid_buffer;
vid_frame.data = vid_buffer + 12;
vid_frame.data = ((uint8_t *)vid_buffer) + 12;
vid_frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12;
switch_set_flag((&vid_frame), SFF_RAW_RTP);
// switch_set_flag((&vid_frame), SFF_PROXY_PACKET);

View File

@ -450,7 +450,7 @@ static void vlc_video_display_callback(void *data, void *id)
} else {
context->vid_frame->img = context->img;
context->vid_frame->packet = context->video_packet;
context->vid_frame->data = context->video_packet + 12;
context->vid_frame->data = ((uint8_t *)context->video_packet) + 12;
switch_core_session_write_video_frame(context->session, context->vid_frame, SWITCH_IO_FLAG_NONE, 0);
}
@ -1304,7 +1304,7 @@ SWITCH_STANDARD_APP(play_video_function)
audio_frame.codec = &codec;
video_frame.codec = read_vid_codec;
video_frame.packet = context->video_packet;
video_frame.data = context->video_packet + 12;
video_frame.data = ((uint8_t *)context->video_packet) + 12;
switch_channel_set_variable(channel, SWITCH_PLAYBACK_TERMINATOR_USED, "");
@ -1994,7 +1994,7 @@ static switch_status_t setup_tech_pvt(switch_core_session_t *osession, switch_co
context->aud_frame = &tech_pvt->read_frame;
context->vid_frame = &tech_pvt->read_video_frame;
context->vid_frame->packet = context->video_packet;
context->vid_frame->data = context->video_packet + 12;
context->vid_frame->data = ((uint8_t *)context->video_packet) + 12;
context->playing = 0;
// context->err = 0;

View File

@ -7145,7 +7145,8 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra
uint32_t len, ts = 0;
switch_payload_t payload = 0;
rtp_msg_t *send_msg = NULL;
rtp_msg_t local_send_msg = { {0} };
srtp_hdr_t local_header;
int r = 0;
if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr) {
return -1;
@ -7300,8 +7301,7 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra
if (fwd) {
send_msg = frame->packet;
local_send_msg = *send_msg;
send_msg = &local_send_msg;
local_header = send_msg->header;
len = frame->packetlen;
ts = 0;
@ -7330,7 +7330,14 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra
}
*/
return rtp_common_write(rtp_session, send_msg, data, len, payload, ts, &frame->flags);
r = rtp_common_write(rtp_session, send_msg, data, len, payload, ts, &frame->flags);
if (send_msg) {
send_msg->header = local_header;
}
return r;
}
SWITCH_DECLARE(switch_rtp_stats_t *) switch_rtp_get_stats(switch_rtp_t *rtp_session, switch_memory_pool_t *pool)

View File

@ -93,6 +93,143 @@ SWITCH_DECLARE(switch_status_t) switch_frame_alloc(switch_frame_t **frame, switc
}
typedef struct switch_frame_node_s {
switch_frame_t *frame;
int inuse;
struct switch_frame_node_s *next;
} switch_frame_node_t;
struct switch_frame_buffer_s {
switch_frame_node_t *head;
switch_memory_pool_t *pool;
switch_mutex_t *mutex;
};
static switch_frame_t *find_free_frame(switch_frame_buffer_t *fb, switch_frame_t *orig)
{
switch_frame_node_t *np;
switch_mutex_lock(fb->mutex);
for (np = fb->head; np; np = np->next) {
if (!np->inuse && ((orig->packet && np->frame->packet) || (!orig->packet && !np->frame->packet))) {
break;
}
}
if (!np) {
np = switch_core_alloc(fb->pool, sizeof(*np));
np->frame = switch_core_alloc(fb->pool, sizeof(*np->frame));
if (orig->packet) {
np->frame->packet = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN);
} else {
np->frame->data = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN);
np->frame->buflen = SWITCH_RTP_MAX_BUF_LEN;
}
np->next = fb->head;
fb->head = np;
}
np->frame->samples = orig->samples;
np->frame->rate = orig->rate;
np->frame->channels = orig->channels;
np->frame->payload = orig->payload;
np->frame->timestamp = orig->timestamp;
np->frame->seq = orig->seq;
np->frame->ssrc = orig->ssrc;
np->frame->m = orig->m;
np->frame->flags = orig->flags;
np->frame->codec = NULL;
np->frame->pmap = NULL;
np->frame->img = NULL;
np->frame->extra_data = np;
np->inuse = 1;
switch_set_flag(np->frame, SFF_DYNAMIC);
if (orig->packet) {
memcpy(np->frame->packet, orig->packet, orig->packetlen);
np->frame->packetlen = orig->packetlen;
np->frame->data = ((unsigned char *)np->frame->packet) + 12;
np->frame->datalen = orig->datalen;
} else {
np->frame->packetlen = 0;
memcpy(np->frame->data, orig->data, orig->datalen);
np->frame->datalen = orig->datalen;
}
if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) {
switch_img_copy(orig->img, &np->frame->img);
}
switch_mutex_unlock(fb->mutex);
return np->frame;
}
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP)
{
switch_frame_t *old_frame;
switch_frame_node_t *node;
switch_mutex_lock(fb->mutex);
old_frame = *frameP;
*frameP = NULL;
node = (switch_frame_node_t *) old_frame->extra_data;
node->inuse = 0;
switch_img_free(&node->frame->img);
switch_mutex_unlock(fb->mutex);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone)
{
switch_frame_t *new_frame;
if (!orig) {
return SWITCH_STATUS_FALSE;
}
switch_assert(orig->buflen);
new_frame = find_free_frame(fb, orig);
*clone = new_frame;
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP)
{
switch_frame_buffer_t *fb = *fbP;
switch_memory_pool_t *pool;
*fbP = NULL;
pool = fb->pool;
switch_core_destroy_memory_pool(&pool);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP)
{
switch_frame_buffer_t *fb;
switch_memory_pool_t *pool;
switch_core_new_memory_pool(&pool);
fb = switch_core_alloc(pool, sizeof(*fb));
fb->pool = pool;
switch_mutex_init(&fb->mutex, SWITCH_MUTEX_NESTED, pool);
*fbP = fb;
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_frame_t **clone)
{
switch_frame_t *new_frame;
@ -104,18 +241,28 @@ SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_fr
switch_assert(orig->buflen);
new_frame = malloc(sizeof(*new_frame));
switch_assert(new_frame);
*new_frame = *orig;
switch_set_flag(new_frame, SFF_DYNAMIC);
if (orig->packet) {
new_frame->packet = malloc(SWITCH_RTP_MAX_BUF_LEN);
memcpy(new_frame->packet, orig->packet, orig->packetlen);
new_frame->data = ((unsigned char *)new_frame->packet) + 12;
} else {
new_frame->data = malloc(new_frame->buflen);
switch_assert(new_frame->data);
memcpy(new_frame->data, orig->data, orig->datalen);
}
new_frame->codec = NULL;
new_frame->pmap = NULL;
new_frame->img = NULL;
if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) {
switch_img_copy(orig->img, &new_frame->img);
}
*clone = new_frame;
return SWITCH_STATUS_SUCCESS;
@ -127,7 +274,17 @@ SWITCH_DECLARE(switch_status_t) switch_frame_free(switch_frame_t **frame)
return SWITCH_STATUS_FALSE;
}
if ((*frame)->img) {
switch_img_free(&(*frame)->img);
}
if ((*frame)->packet) {
free((*frame)->packet);
(*frame)->packet = NULL;
} else {
switch_safe_free((*frame)->data);
}
free(*frame);
*frame = NULL;