From 72548c17ab59686b88f646901ecf301a8dc64ad0 Mon Sep 17 00:00:00 2001 From: George Joseph Date: Wed, 5 Nov 2025 14:27:32 -0700 Subject: [PATCH] chan_websocket: Add ability to place a MARK in the media stream. Also cleaned up a few unused #if blocks, and started sending a few ERROR events back to the apps. Resolves: #1574 DeveloperNote: Apps can now send a `MARK_MEDIA` command with an optional `correlation_id` parameter to chan_websocket which will be placed in the media frame queue. When that frame is dequeued after all intervening media has been played to the core, chan_websocket will send a `MEDIA_MARK_PROCESSED` event to the app with the same correlation_id (if any). --- channels/chan_websocket.c | 104 +++++++++++++++++++++++++++++++------- 1 file changed, 87 insertions(+), 17 deletions(-) diff --git a/channels/chan_websocket.c b/channels/chan_websocket.c index 4c5f92cc65..8235ccebc3 100644 --- a/channels/chan_websocket.c +++ b/channels/chan_websocket.c @@ -112,22 +112,13 @@ struct websocket_pvt { #define HANGUP_CHANNEL "HANGUP" #define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING" #define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING" +#define MARK_MEDIA "MARK_MEDIA" #define FLUSH_MEDIA "FLUSH_MEDIA" #define GET_DRIVER_STATUS "GET_STATUS" #define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED" #define PAUSE_MEDIA "PAUSE_MEDIA" #define CONTINUE_MEDIA "CONTINUE_MEDIA" -#if 0 -#define MEDIA_START "MEDIA_START" -#define MEDIA_XON "MEDIA_XON" -#define MEDIA_XOFF "MEDIA_XOFF" -#define QUEUE_DRAINED "QUEUE_DRAINED" -#define DRIVER_STATUS "STATUS" -#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED" -#define DTMF_END "DTMF_END" -#endif - #define QUEUE_LENGTH_MAX 1000 #define QUEUE_LENGTH_XOFF_LEVEL 900 #define QUEUE_LENGTH_XON_LEVEL 800 @@ -272,6 +263,36 @@ static char *_create_event_MEDIA_BUFFERING_COMPLETED(struct websocket_pvt *insta return payload; } +/*! + * \internal + * \brief Print the MEDIA_MARK_PROCESSED event. + * \warning Do not call directly. + */ +static char *_create_event_MEDIA_MARK_PROCESSED(struct websocket_pvt *instance, + const char *id) +{ + char *payload = NULL; + if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) { + struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}", + "event", "MEDIA_MARK_PROCESSED", + "channel_id", ast_channel_uniqueid(instance->channel), + "correlation_id", S_OR(id, "") + ); + if (!msg) { + return NULL; + } + payload = ast_json_dump_string_format(msg, AST_JSON_COMPACT); + ast_json_unref(msg); + } else { + ast_asprintf(&payload, "%s%s%s", + "MEDIA_MARK_PROCESSED", + S_COR(id, " ",""), S_OR(id, "")); + + } + + return payload; +} + /*! * \internal * \brief Print the DTMF_END event. @@ -345,15 +366,27 @@ static char *_create_event_STATUS(struct websocket_pvt *instance) * \brief Print the ERROR event. * \warning Do not call directly. */ -static char *_create_event_ERROR(struct websocket_pvt *instance, - const char *error_text) +static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR( + struct websocket_pvt *instance, const char *format, ...) { char *payload = NULL; + char *error_text = NULL; + va_list ap; + int res = 0; + + va_start(ap, format); + res = ast_vasprintf(&error_text, format, ap); + va_end(ap); + if (res < 0 || !error_text) { + return NULL; + } + if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) { struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}", "event", "ERROR", "channel_id", ast_channel_uniqueid(instance->channel), "error_text", error_text); + ast_free(error_text); if (!msg) { return NULL; } @@ -362,6 +395,7 @@ static char *_create_event_ERROR(struct websocket_pvt *instance, } else { ast_asprintf(&payload, "%s channel_id:%s error_text:%s", "ERROR", ast_channel_uniqueid(instance->channel), error_text); + ast_free(error_text); } return payload; @@ -722,6 +756,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) { if (instance->passthrough) { + send_event(instance, ERROR, "%s not supported in passthrough mode", command); ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", ast_channel_name(instance->channel), command); return 0; @@ -743,6 +778,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) } if (instance->passthrough) { + send_event(instance, ERROR, "%s not supported in passthrough mode", command); ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", ast_channel_name(instance->channel), command); return 0; @@ -767,10 +803,40 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) res = queue_option_frame(instance, option); ast_free(option); + } else if (ast_strings_equal(command, MARK_MEDIA)) { + const char *id; + char *option; + SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK, + AST_LIST_UNLOCK); + + if (instance->passthrough) { + send_event(instance, ERROR, "%s not supported in passthrough mode", command); + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", + ast_channel_name(instance->channel), command); + return 0; + } + + if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) { + id = ast_json_object_string_get(json, "correlation_id"); + } else { + id = data; + } + + ast_debug(4, "%s: %s %s\n", + ast_channel_name(instance->channel), MARK_MEDIA, id); + + option = create_event(instance, MEDIA_MARK_PROCESSED, id); + if (!option) { + return -1; + } + res = queue_option_frame(instance, option); + ast_free(option); + } else if (ast_strings_equal(command, FLUSH_MEDIA)) { struct ast_frame *frame = NULL; if (instance->passthrough) { + send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode"); ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", ast_channel_name(instance->channel), command); return 0; @@ -787,6 +853,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) { if (instance->passthrough) { + send_event(instance, ERROR, "%s not supported in passthrough mode", command); ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", ast_channel_name(instance->channel), command); return 0; @@ -801,6 +868,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) } else if (ast_strings_equal(command, PAUSE_MEDIA)) { if (instance->passthrough) { + send_event(instance, ERROR, "%s not supported in passthrough mode", command); ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", ast_channel_name(instance->channel), command); return 0; @@ -811,6 +879,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) } else if (ast_strings_equal(command, CONTINUE_MEDIA)) { if (instance->passthrough) { + send_event(instance, ERROR, "%s not supported in passthrough mode", command); ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", ast_channel_name(instance->channel), command); return 0; @@ -1515,7 +1584,8 @@ static struct ast_channel *webchan_request(const char *type, ); struct ast_flags opts = { 0, }; char *opt_args[OPT_ARG_ARRAY_SIZE]; - const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel"; + const char *requestor_name = requestor ? ast_channel_name(requestor) : + (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : ""); RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup); global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global"); @@ -1547,16 +1617,12 @@ static struct ast_channel *webchan_request(const char *type, if (ast_test_flag(&opts, OPT_WS_CODEC) && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) { - ast_debug(3, "%s: Using specified format %s\n", - requestor_name, opt_args[OPT_ARG_WS_CODEC]); fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]); } else { /* * If codec wasn't specified in the dial string, * use the first format in the capabilities. */ - ast_debug(3, "%s: Using format %s from requesting channel\n", - requestor_name, opt_args[OPT_ARG_WS_CODEC]); fmt = ast_format_cap_get_format(cap, 0); } @@ -1566,6 +1632,10 @@ static struct ast_channel *webchan_request(const char *type, goto failure; } + ast_debug(3, "%s: Using format %s from %s\n", + requestor_name, ast_format_get_name(fmt), + ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester"); + instance = websocket_new(requestor_name, args.connection_id, fmt); if (!instance) { ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",