mirror of
https://github.com/asterisk/asterisk.git
synced 2025-12-01 02:31:55 +00:00
chan_websocket: Add ability to place a MARK in the media stream.
Also cleaned up a few unused #if blocks, and started sending a few ERROR events back to the apps. Resolves: #1574 DeveloperNote: Apps can now send a `MARK_MEDIA` command with an optional `correlation_id` parameter to chan_websocket which will be placed in the media frame queue. When that frame is dequeued after all intervening media has been played to the core, chan_websocket will send a `MEDIA_MARK_PROCESSED` event to the app with the same correlation_id (if any).
This commit is contained in:
@@ -112,22 +112,13 @@ struct websocket_pvt {
|
|||||||
#define HANGUP_CHANNEL "HANGUP"
|
#define HANGUP_CHANNEL "HANGUP"
|
||||||
#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
|
#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
|
||||||
#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
|
#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
|
||||||
|
#define MARK_MEDIA "MARK_MEDIA"
|
||||||
#define FLUSH_MEDIA "FLUSH_MEDIA"
|
#define FLUSH_MEDIA "FLUSH_MEDIA"
|
||||||
#define GET_DRIVER_STATUS "GET_STATUS"
|
#define GET_DRIVER_STATUS "GET_STATUS"
|
||||||
#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
|
#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
|
||||||
#define PAUSE_MEDIA "PAUSE_MEDIA"
|
#define PAUSE_MEDIA "PAUSE_MEDIA"
|
||||||
#define CONTINUE_MEDIA "CONTINUE_MEDIA"
|
#define CONTINUE_MEDIA "CONTINUE_MEDIA"
|
||||||
|
|
||||||
#if 0
|
|
||||||
#define MEDIA_START "MEDIA_START"
|
|
||||||
#define MEDIA_XON "MEDIA_XON"
|
|
||||||
#define MEDIA_XOFF "MEDIA_XOFF"
|
|
||||||
#define QUEUE_DRAINED "QUEUE_DRAINED"
|
|
||||||
#define DRIVER_STATUS "STATUS"
|
|
||||||
#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED"
|
|
||||||
#define DTMF_END "DTMF_END"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define QUEUE_LENGTH_MAX 1000
|
#define QUEUE_LENGTH_MAX 1000
|
||||||
#define QUEUE_LENGTH_XOFF_LEVEL 900
|
#define QUEUE_LENGTH_XOFF_LEVEL 900
|
||||||
#define QUEUE_LENGTH_XON_LEVEL 800
|
#define QUEUE_LENGTH_XON_LEVEL 800
|
||||||
@@ -272,6 +263,36 @@ static char *_create_event_MEDIA_BUFFERING_COMPLETED(struct websocket_pvt *insta
|
|||||||
return payload;
|
return payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \internal
|
||||||
|
* \brief Print the MEDIA_MARK_PROCESSED event.
|
||||||
|
* \warning Do not call directly.
|
||||||
|
*/
|
||||||
|
static char *_create_event_MEDIA_MARK_PROCESSED(struct websocket_pvt *instance,
|
||||||
|
const char *id)
|
||||||
|
{
|
||||||
|
char *payload = NULL;
|
||||||
|
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
|
||||||
|
struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
|
||||||
|
"event", "MEDIA_MARK_PROCESSED",
|
||||||
|
"channel_id", ast_channel_uniqueid(instance->channel),
|
||||||
|
"correlation_id", S_OR(id, "")
|
||||||
|
);
|
||||||
|
if (!msg) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
payload = ast_json_dump_string_format(msg, AST_JSON_COMPACT);
|
||||||
|
ast_json_unref(msg);
|
||||||
|
} else {
|
||||||
|
ast_asprintf(&payload, "%s%s%s",
|
||||||
|
"MEDIA_MARK_PROCESSED",
|
||||||
|
S_COR(id, " ",""), S_OR(id, ""));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \internal
|
* \internal
|
||||||
* \brief Print the DTMF_END event.
|
* \brief Print the DTMF_END event.
|
||||||
@@ -345,15 +366,27 @@ static char *_create_event_STATUS(struct websocket_pvt *instance)
|
|||||||
* \brief Print the ERROR event.
|
* \brief Print the ERROR event.
|
||||||
* \warning Do not call directly.
|
* \warning Do not call directly.
|
||||||
*/
|
*/
|
||||||
static char *_create_event_ERROR(struct websocket_pvt *instance,
|
static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR(
|
||||||
const char *error_text)
|
struct websocket_pvt *instance, const char *format, ...)
|
||||||
{
|
{
|
||||||
char *payload = NULL;
|
char *payload = NULL;
|
||||||
|
char *error_text = NULL;
|
||||||
|
va_list ap;
|
||||||
|
int res = 0;
|
||||||
|
|
||||||
|
va_start(ap, format);
|
||||||
|
res = ast_vasprintf(&error_text, format, ap);
|
||||||
|
va_end(ap);
|
||||||
|
if (res < 0 || !error_text) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
|
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
|
||||||
struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
|
struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
|
||||||
"event", "ERROR",
|
"event", "ERROR",
|
||||||
"channel_id", ast_channel_uniqueid(instance->channel),
|
"channel_id", ast_channel_uniqueid(instance->channel),
|
||||||
"error_text", error_text);
|
"error_text", error_text);
|
||||||
|
ast_free(error_text);
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -362,6 +395,7 @@ static char *_create_event_ERROR(struct websocket_pvt *instance,
|
|||||||
} else {
|
} else {
|
||||||
ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
|
ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
|
||||||
"ERROR", ast_channel_uniqueid(instance->channel), error_text);
|
"ERROR", ast_channel_uniqueid(instance->channel), error_text);
|
||||||
|
ast_free(error_text);
|
||||||
}
|
}
|
||||||
|
|
||||||
return payload;
|
return payload;
|
||||||
@@ -722,6 +756,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
|
|||||||
|
|
||||||
} else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
|
} else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
|
||||||
if (instance->passthrough) {
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
|
||||||
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
ast_channel_name(instance->channel), command);
|
ast_channel_name(instance->channel), command);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -743,6 +778,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (instance->passthrough) {
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
|
||||||
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
ast_channel_name(instance->channel), command);
|
ast_channel_name(instance->channel), command);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -767,10 +803,40 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
|
|||||||
res = queue_option_frame(instance, option);
|
res = queue_option_frame(instance, option);
|
||||||
ast_free(option);
|
ast_free(option);
|
||||||
|
|
||||||
|
} else if (ast_strings_equal(command, MARK_MEDIA)) {
|
||||||
|
const char *id;
|
||||||
|
char *option;
|
||||||
|
SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
|
||||||
|
AST_LIST_UNLOCK);
|
||||||
|
|
||||||
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
|
||||||
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
|
ast_channel_name(instance->channel), command);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
|
||||||
|
id = ast_json_object_string_get(json, "correlation_id");
|
||||||
|
} else {
|
||||||
|
id = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
ast_debug(4, "%s: %s %s\n",
|
||||||
|
ast_channel_name(instance->channel), MARK_MEDIA, id);
|
||||||
|
|
||||||
|
option = create_event(instance, MEDIA_MARK_PROCESSED, id);
|
||||||
|
if (!option) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
res = queue_option_frame(instance, option);
|
||||||
|
ast_free(option);
|
||||||
|
|
||||||
} else if (ast_strings_equal(command, FLUSH_MEDIA)) {
|
} else if (ast_strings_equal(command, FLUSH_MEDIA)) {
|
||||||
struct ast_frame *frame = NULL;
|
struct ast_frame *frame = NULL;
|
||||||
|
|
||||||
if (instance->passthrough) {
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
|
||||||
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
ast_channel_name(instance->channel), command);
|
ast_channel_name(instance->channel), command);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -787,6 +853,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
|
|||||||
|
|
||||||
} else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
|
} else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
|
||||||
if (instance->passthrough) {
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
|
||||||
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
ast_channel_name(instance->channel), command);
|
ast_channel_name(instance->channel), command);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -801,6 +868,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
|
|||||||
|
|
||||||
} else if (ast_strings_equal(command, PAUSE_MEDIA)) {
|
} else if (ast_strings_equal(command, PAUSE_MEDIA)) {
|
||||||
if (instance->passthrough) {
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
|
||||||
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
ast_channel_name(instance->channel), command);
|
ast_channel_name(instance->channel), command);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -811,6 +879,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
|
|||||||
|
|
||||||
} else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
|
} else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
|
||||||
if (instance->passthrough) {
|
if (instance->passthrough) {
|
||||||
|
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
|
||||||
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
|
||||||
ast_channel_name(instance->channel), command);
|
ast_channel_name(instance->channel), command);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -1515,7 +1584,8 @@ static struct ast_channel *webchan_request(const char *type,
|
|||||||
);
|
);
|
||||||
struct ast_flags opts = { 0, };
|
struct ast_flags opts = { 0, };
|
||||||
char *opt_args[OPT_ARG_ARRAY_SIZE];
|
char *opt_args[OPT_ARG_ARRAY_SIZE];
|
||||||
const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel";
|
const char *requestor_name = requestor ? ast_channel_name(requestor) :
|
||||||
|
(assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
|
||||||
RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
|
RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
|
||||||
|
|
||||||
global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
|
global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
|
||||||
@@ -1547,16 +1617,12 @@ static struct ast_channel *webchan_request(const char *type,
|
|||||||
|
|
||||||
if (ast_test_flag(&opts, OPT_WS_CODEC)
|
if (ast_test_flag(&opts, OPT_WS_CODEC)
|
||||||
&& !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
|
&& !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
|
||||||
ast_debug(3, "%s: Using specified format %s\n",
|
|
||||||
requestor_name, opt_args[OPT_ARG_WS_CODEC]);
|
|
||||||
fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
|
fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
* If codec wasn't specified in the dial string,
|
* If codec wasn't specified in the dial string,
|
||||||
* use the first format in the capabilities.
|
* use the first format in the capabilities.
|
||||||
*/
|
*/
|
||||||
ast_debug(3, "%s: Using format %s from requesting channel\n",
|
|
||||||
requestor_name, opt_args[OPT_ARG_WS_CODEC]);
|
|
||||||
fmt = ast_format_cap_get_format(cap, 0);
|
fmt = ast_format_cap_get_format(cap, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1566,6 +1632,10 @@ static struct ast_channel *webchan_request(const char *type,
|
|||||||
goto failure;
|
goto failure;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ast_debug(3, "%s: Using format %s from %s\n",
|
||||||
|
requestor_name, ast_format_get_name(fmt),
|
||||||
|
ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
|
||||||
|
|
||||||
instance = websocket_new(requestor_name, args.connection_id, fmt);
|
instance = websocket_new(requestor_name, args.connection_id, fmt);
|
||||||
if (!instance) {
|
if (!instance) {
|
||||||
ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
|
ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
|
||||||
|
|||||||
Reference in New Issue
Block a user