diff --git a/.gitignore b/.gitignore index 1ba75facf3..0e2bb7dbb1 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,5 @@ out/ *.orig tests/CI/output .develvars +.devcontainer/ +.claude/ diff --git a/apps/app_stasis_broadcast.c b/apps/app_stasis_broadcast.c new file mode 100644 index 0000000000..ba71e04fd4 --- /dev/null +++ b/apps/app_stasis_broadcast.c @@ -0,0 +1,286 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2026, Aurora Innovation AB + * + * Daniel Donoghue + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis broadcast dialplan application + * + * \author Daniel Donoghue + */ + +/*** MODULEINFO + res_stasis + res_stasis_broadcast + extended + ***/ + +#include "asterisk.h" + +#include "asterisk/app.h" +#include "asterisk/module.h" +#include "asterisk/pbx.h" +#include "asterisk/stasis_app_broadcast.h" +#include "asterisk/stasis_app_impl.h" + +/*** DOCUMENTATION + + + 20.17.0 + 22.7.0 + 23.1.0 + + Broadcast a channel to multiple ARI applications for claiming, + then hand control to the winning application. + + + Timeout in milliseconds to wait for a claim. + Valid range: 0 to 60000ms + Default: 500ms + + + Regular expression to filter which ARI applications + receive the broadcast. Only applications with names matching + the regex will be notified. + Because arguments are comma-delimited, commas cannot + appear in the regex pattern. Use character classes + (e.g. [,]) if a literal comma is + needed, or omit the filter and handle selection in the + ARI application. + Default: all connected applications + + + Optional colon-delimited arguments passed to the winning + application via the StasisStart event. These + are equivalent to the extra arguments in Stasis(). + Example: sales:priority-high + + + Whether to send a CallClaimed event to + ARI applications when a channel is claimed. + When enabled, the CallClaimed event is + sent only to applications that matched the + app_filter (or all applications if no + filter was set). + Disabled by default to minimise WebSocket traffic under + high load. Losing claimants already receive a + 409 HTTP response. + Default: no + + + + Broadcasts the incoming channel to all connected ARI applications + (or a filtered subset) via a CallBroadcast event. + ARI applications can respond with a claim request. The first application + to claim the channel wins, and subsequent claims are rejected. + If an application claims the channel within the timeout, the channel + is automatically placed under Stasis control with the winning application, + exactly as if Stasis(winner_app) had been called. + The winning application receives a StasisStart event + and has full channel control until it calls continue + or the channel hangs up. + If no application claims the channel within the timeout, control + returns to the dialplan immediately, allowing fallback handling. + This application will set the following channel variables: + + + + An application claimed the channel and the Stasis + session completed without failures. + + + An application claimed the channel but a failure + occurred when executing the Stasis application. + + + No application claimed the channel within the + timeout period. + + + + + ; Broadcast with default timeout (500ms) to all apps + ; Channel automatically enters Stasis with the winner + exten => _X.,1,StasisBroadcast() + same => n,GotoIf($["${STASISSTATUS}"="TIMEOUT"]?no_route) + same => n,Hangup() + same => n(no_route),Playback(sorry-no-agent) + same => n,Hangup() + + + ; Broadcast with custom timeout, app filter, and args for the winner + exten => _X.,1,StasisBroadcast(2000,^ivr-.*,sales:priority-high) + same => n,GotoIf($["${STASISSTATUS}"="TIMEOUT"]?no_route) + same => n,Hangup() + same => n(no_route),Playback(sorry-no-agent) + same => n,Hangup() + + + + ***/ + +/*! \brief Dialplan application name */ +static const char *app = "StasisBroadcast"; + +/*! \brief Default timeout in milliseconds */ +#define DEFAULT_TIMEOUT_MS 500 + +/*! \brief Maximum timeout in milliseconds */ +#define MAX_TIMEOUT_MS 60000 + +/*! \brief Maximum number of Stasis arguments */ +#define MAX_STASIS_ARGS 128 + +/*! \brief StasisBroadcast dialplan application callback */ +static int stasis_broadcast_exec(struct ast_channel *chan, const char *data) +{ + char *parse = NULL; + int timeout_ms = DEFAULT_TIMEOUT_MS; + const char *app_filter = NULL; + const char *stasis_args_raw = NULL; + unsigned int flags = STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED; + char *winner = NULL; + int result = 0; + int stasis_argc = 0; + char *stasis_argv[MAX_STASIS_ARGS]; + + AST_DECLARE_APP_ARGS(args, + AST_APP_ARG(timeout); + AST_APP_ARG(app_filter); + AST_APP_ARG(stasis_args); + AST_APP_ARG(notify_claimed); + ); + + ast_assert(chan != NULL); + + /* Initialize channel variable */ + pbx_builtin_setvar_helper(chan, "STASISSTATUS", ""); + + /* Parse positional arguments if provided */ + if (!ast_strlen_zero(data)) { + parse = ast_strdupa(data); + AST_STANDARD_APP_ARGS(args, parse); + + if (!ast_strlen_zero(args.timeout)) { + if (sscanf(args.timeout, "%d", &timeout_ms) != 1 + || timeout_ms < 0 || timeout_ms > MAX_TIMEOUT_MS) { + ast_log(LOG_WARNING, + "Channel %s: invalid timeout value '%s' (must be 0-%dms), using default %dms\n", + ast_channel_name(chan), args.timeout, MAX_TIMEOUT_MS, DEFAULT_TIMEOUT_MS); + timeout_ms = DEFAULT_TIMEOUT_MS; + } + } + + if (!ast_strlen_zero(args.app_filter)) { + app_filter = args.app_filter; + } + + if (!ast_strlen_zero(args.stasis_args)) { + stasis_args_raw = args.stasis_args; + } + + if (!ast_strlen_zero(args.notify_claimed) && ast_true(args.notify_claimed)) { + flags &= ~STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED; + } + } + + /* + * Parse colon-delimited Stasis arguments. stasis_argv[] holds + * pointers into the stack-allocated args_copy buffer. This is + * safe because stasis_app_exec is called within this same + * function scope so the stack frame remains alive. + */ + if (!ast_strlen_zero(stasis_args_raw)) { + char *args_copy = ast_strdupa(stasis_args_raw); + char *arg; + + while ((arg = strsep(&args_copy, ":")) != NULL && stasis_argc < MAX_STASIS_ARGS) { + stasis_argv[stasis_argc++] = arg; + } + } + + ast_debug(3, "Broadcasting channel %s (timeout=%dms, filter=%s, args=%d)\n", + ast_channel_name(chan), timeout_ms, app_filter ? app_filter : "none", + stasis_argc); + + /* Start the broadcast */ + result = stasis_app_broadcast_channel(chan, timeout_ms, app_filter, flags); + if (result) { + ast_log(LOG_ERROR, "Failed to broadcast channel %s: %s\n", + ast_channel_name(chan), + result == AST_OPTIONAL_API_UNAVAILABLE ? "res_stasis_broadcast not loaded" : "internal error"); + pbx_builtin_setvar_helper(chan, "STASISSTATUS", "FAILED"); + return 0; + } + + /* Wait for a claim. A late claim can arrive between the timeout + * expiring and our cleanup call, so always check for a winner + * regardless of the wait result. */ + stasis_app_broadcast_wait(chan, timeout_ms); + winner = stasis_app_broadcast_winner(ast_channel_uniqueid(chan)); + + if (winner) { + int ret; + + ast_debug(3, "Channel %s claimed by %s, entering Stasis\n", + ast_channel_name(chan), winner); + + /* Defer cleanup until after Stasis so concurrent claimants can still + * find the context (with claimed=1) and receive 409 Conflict instead + * of 404 Not Found. */ + ret = stasis_app_exec(chan, winner, stasis_argc, stasis_argv); + ast_free(winner); + + /* Clean up now that the Stasis session has ended */ + stasis_app_broadcast_cleanup(ast_channel_uniqueid(chan)); + + if (ret) { + pbx_builtin_setvar_helper(chan, "STASISSTATUS", "FAILED"); + if (ast_check_hangup(chan)) { + return -1; + } + } else { + pbx_builtin_setvar_helper(chan, "STASISSTATUS", "SUCCESS"); + } + } else { + /* No winner: clean up immediately, nothing to race against */ + stasis_app_broadcast_cleanup(ast_channel_uniqueid(chan)); + ast_log(LOG_WARNING, "Channel %s: not claimed within %dms timeout\n", + ast_channel_name(chan), timeout_ms); + pbx_builtin_setvar_helper(chan, "STASISSTATUS", "TIMEOUT"); + } + + return 0; +} + +static int load_module(void) +{ + return ast_register_application_xml(app, stasis_broadcast_exec); +} + +static int unload_module(void) +{ + return ast_unregister_application(app); +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, + "Stasis application broadcast", + .support_level = AST_MODULE_SUPPORT_EXTENDED, + .load = load_module, + .unload = unload_module, + .requires = "res_stasis,res_stasis_broadcast", +); diff --git a/include/asterisk/stasis_app_broadcast.h b/include/asterisk/stasis_app_broadcast.h new file mode 100644 index 0000000000..640667a5be --- /dev/null +++ b/include/asterisk/stasis_app_broadcast.h @@ -0,0 +1,131 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2026, Aurora Innovation AB + * + * Daniel Donoghue + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_STASIS_APP_BROADCAST_H +#define _ASTERISK_STASIS_APP_BROADCAST_H + +/*! \file + * + * \brief Stasis Application Broadcast API + * + * \author Daniel Donoghue + * + * This module provides the infrastructure for broadcasting incoming channels + * to multiple ARI applications and handling first-claim winner logic. + */ + +#include "asterisk/channel.h" +#include "asterisk/optional_api.h" + +/*! \brief Suppress CallClaimed event for this broadcast */ +#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED (1 << 0) + +/*! + * \brief Start a broadcast for a channel + * + * \since 20 + * + * Broadcasts a channel to all ARI applications (or filtered applications) + * allowing them to claim the channel. Only the first claim will succeed. + * + * When a channel is claimed, a CallClaimed event is sent only to applications + * that matched the \a app_filter (or all apps if no filter was set). This can + * be suppressed entirely with #STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED. + * + * \param chan The channel to broadcast + * \param timeout_ms Timeout in milliseconds to wait for a claim + * \param app_filter Optional regex filter for application names (NULL for all) + * \param flags Combination of STASIS_BROADCAST_FLAG_* values + * + * \retval 0 on success + * \retval -1 on error + * \retval AST_OPTIONAL_API_UNAVAILABLE if res_stasis_broadcast is not loaded + */ +AST_OPTIONAL_API(int, stasis_app_broadcast_channel, + (struct ast_channel *chan, int timeout_ms, const char *app_filter, + unsigned int flags), + { return AST_OPTIONAL_API_UNAVAILABLE; }); + +/*! + * \brief Attempt to claim a broadcast channel + * + * \since 20 + * + * Atomically attempts to claim a channel that is in broadcast state. + * Only the first claim for a given channel will succeed. + * + * \param channel_id The unique ID of the channel + * \param app_name The name of the application claiming the channel + * + * \retval 0 if claim successful + * \retval -1 if channel not found + * \retval -2 if already claimed by another application + * \retval AST_OPTIONAL_API_UNAVAILABLE if res_stasis_broadcast is not loaded + */ +AST_OPTIONAL_API(int, stasis_app_claim_channel, + (const char *channel_id, const char *app_name), + { return AST_OPTIONAL_API_UNAVAILABLE; }); + +/*! + * \brief Get the winner app name for a broadcast channel + * + * \since 20 + * + * \param channel_id The unique ID of the channel + * + * \return A copy of the winner app name (caller must free with ast_free), + * or NULL if not claimed or not found + * \retval NULL if res_stasis_broadcast is not loaded + */ +AST_OPTIONAL_API(char *, stasis_app_broadcast_winner, + (const char *channel_id), + { return NULL; }); + +/*! + * \brief Wait for a broadcast channel to be claimed + * + * \since 20 + * + * Blocks until the channel is claimed or the timeout expires. + * + * \param chan The channel + * \param timeout_ms Maximum time to wait in milliseconds + * + * \retval 0 if claimed within timeout + * \retval -1 if timeout expired or error + * \retval AST_OPTIONAL_API_UNAVAILABLE if res_stasis_broadcast is not loaded + */ +AST_OPTIONAL_API(int, stasis_app_broadcast_wait, + (struct ast_channel *chan, int timeout_ms), + { return AST_OPTIONAL_API_UNAVAILABLE; }); + +/*! + * \brief Clean up broadcast context for a channel + * + * \since 20 + * + * Removes the broadcast context when the channel is done or leaving the + * broadcast state. + * + * \param channel_id The unique ID of the channel + */ +AST_OPTIONAL_API(void, stasis_app_broadcast_cleanup, + (const char *channel_id), + { return; }); + +#endif /* _ASTERISK_STASIS_APP_BROADCAST_H */ diff --git a/res/ari/ari_model_validators.c b/res/ari/ari_model_validators.c index 5eb2cd323a..fb1338f451 100644 --- a/res/ari/ari_model_validators.c +++ b/res/ari/ari_model_validators.c @@ -3696,6 +3696,230 @@ ari_validator ast_ari_validate_bridge_video_source_changed_fn(void) return ast_ari_validate_bridge_video_source_changed; } +int ast_ari_validate_call_broadcast(struct ast_json *json) +{ + int res = 1; + struct ast_json_iter *iter; + int has_type = 0; + int has_application = 0; + int has_timestamp = 0; + int has_channel = 0; + + for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) { + if (strcmp("asterisk_id", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field asterisk_id failed validation\n"); + res = 0; + } + } else + if (strcmp("type", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_type = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field type failed validation\n"); + res = 0; + } + } else + if (strcmp("application", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_application = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field application failed validation\n"); + res = 0; + } + } else + if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_timestamp = 1; + prop_is_valid = ast_ari_validate_date( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field timestamp failed validation\n"); + res = 0; + } + } else + if (strcmp("called", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field called failed validation\n"); + res = 0; + } + } else + if (strcmp("caller", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field caller failed validation\n"); + res = 0; + } + } else + if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_channel = 1; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallBroadcast field channel failed validation\n"); + res = 0; + } + } else + { + ast_log(LOG_ERROR, + "ARI CallBroadcast has undocumented field %s\n", + ast_json_object_iter_key(iter)); + res = 0; + } + } + + if (!has_type) { + ast_log(LOG_ERROR, "ARI CallBroadcast missing required field type\n"); + res = 0; + } + + if (!has_application) { + ast_log(LOG_ERROR, "ARI CallBroadcast missing required field application\n"); + res = 0; + } + + if (!has_timestamp) { + ast_log(LOG_ERROR, "ARI CallBroadcast missing required field timestamp\n"); + res = 0; + } + + if (!has_channel) { + ast_log(LOG_ERROR, "ARI CallBroadcast missing required field channel\n"); + res = 0; + } + + return res; +} + +ari_validator ast_ari_validate_call_broadcast_fn(void) +{ + return ast_ari_validate_call_broadcast; +} + +int ast_ari_validate_call_claimed(struct ast_json *json) +{ + int res = 1; + struct ast_json_iter *iter; + int has_type = 0; + int has_application = 0; + int has_timestamp = 0; + int has_channel = 0; + int has_winner_app = 0; + + for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) { + if (strcmp("asterisk_id", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallClaimed field asterisk_id failed validation\n"); + res = 0; + } + } else + if (strcmp("type", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_type = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallClaimed field type failed validation\n"); + res = 0; + } + } else + if (strcmp("application", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_application = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallClaimed field application failed validation\n"); + res = 0; + } + } else + if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_timestamp = 1; + prop_is_valid = ast_ari_validate_date( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallClaimed field timestamp failed validation\n"); + res = 0; + } + } else + if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_channel = 1; + prop_is_valid = ast_ari_validate_channel( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallClaimed field channel failed validation\n"); + res = 0; + } + } else + if (strcmp("winner_app", ast_json_object_iter_key(iter)) == 0) { + int prop_is_valid; + has_winner_app = 1; + prop_is_valid = ast_ari_validate_string( + ast_json_object_iter_value(iter)); + if (!prop_is_valid) { + ast_log(LOG_ERROR, "ARI CallClaimed field winner_app failed validation\n"); + res = 0; + } + } else + { + ast_log(LOG_ERROR, + "ARI CallClaimed has undocumented field %s\n", + ast_json_object_iter_key(iter)); + res = 0; + } + } + + if (!has_type) { + ast_log(LOG_ERROR, "ARI CallClaimed missing required field type\n"); + res = 0; + } + + if (!has_application) { + ast_log(LOG_ERROR, "ARI CallClaimed missing required field application\n"); + res = 0; + } + + if (!has_timestamp) { + ast_log(LOG_ERROR, "ARI CallClaimed missing required field timestamp\n"); + res = 0; + } + + if (!has_channel) { + ast_log(LOG_ERROR, "ARI CallClaimed missing required field channel\n"); + res = 0; + } + + if (!has_winner_app) { + ast_log(LOG_ERROR, "ARI CallClaimed missing required field winner_app\n"); + res = 0; + } + + return res; +} + +ari_validator ast_ari_validate_call_claimed_fn(void) +{ + return ast_ari_validate_call_claimed; +} + int ast_ari_validate_channel_caller_id(struct ast_json *json) { int res = 1; @@ -6193,6 +6417,12 @@ int ast_ari_validate_event(struct ast_json *json) if (strcmp("BridgeVideoSourceChanged", discriminator) == 0) { return ast_ari_validate_bridge_video_source_changed(json); } else + if (strcmp("CallBroadcast", discriminator) == 0) { + return ast_ari_validate_call_broadcast(json); + } else + if (strcmp("CallClaimed", discriminator) == 0) { + return ast_ari_validate_call_claimed(json); + } else if (strcmp("ChannelCallerId", discriminator) == 0) { return ast_ari_validate_channel_caller_id(json); } else @@ -6412,6 +6642,12 @@ int ast_ari_validate_message(struct ast_json *json) if (strcmp("BridgeVideoSourceChanged", discriminator) == 0) { return ast_ari_validate_bridge_video_source_changed(json); } else + if (strcmp("CallBroadcast", discriminator) == 0) { + return ast_ari_validate_call_broadcast(json); + } else + if (strcmp("CallClaimed", discriminator) == 0) { + return ast_ari_validate_call_claimed(json); + } else if (strcmp("ChannelCallerId", discriminator) == 0) { return ast_ari_validate_channel_caller_id(json); } else diff --git a/res/ari/ari_model_validators.h b/res/ari/ari_model_validators.h index e29873abf3..ab2718627a 100644 --- a/res/ari/ari_model_validators.h +++ b/res/ari/ari_model_validators.h @@ -749,6 +749,38 @@ int ast_ari_validate_bridge_video_source_changed(struct ast_json *json); */ ari_validator ast_ari_validate_bridge_video_source_changed_fn(void); +/*! + * \brief Validator for CallBroadcast. + * + * Notification that a channel is being broadcast to ARI applications for claiming. + * + * \param json JSON object to validate. + * \retval True (non-zero) if valid. + * \retval False (zero) if invalid. + */ +int ast_ari_validate_call_broadcast(struct ast_json *json); + +/*! + * \brief Function pointer to ast_ari_validate_call_broadcast(). + */ +ari_validator ast_ari_validate_call_broadcast_fn(void); + +/*! + * \brief Validator for CallClaimed. + * + * Notification that a broadcast channel has been successfully claimed by an ARI application. + * + * \param json JSON object to validate. + * \retval True (non-zero) if valid. + * \retval False (zero) if invalid. + */ +int ast_ari_validate_call_claimed(struct ast_json *json); + +/*! + * \brief Function pointer to ast_ari_validate_call_claimed(). + */ +ari_validator ast_ari_validate_call_claimed_fn(void); + /*! * \brief Validator for ChannelCallerId. * @@ -1703,6 +1735,21 @@ ari_validator ast_ari_validate_application_fn(void); * - timestamp: Date (required) * - bridge: Bridge (required) * - old_video_source_id: string + * CallBroadcast + * - asterisk_id: string + * - type: string (required) + * - application: string (required) + * - timestamp: Date (required) + * - called: string + * - caller: string + * - channel: Channel (required) + * CallClaimed + * - asterisk_id: string + * - type: string (required) + * - application: string (required) + * - timestamp: Date (required) + * - channel: Channel (required) + * - winner_app: string (required) * ChannelCallerId * - asterisk_id: string * - type: string (required) diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c index cd4c249e47..93610afe13 100644 --- a/res/ari/resource_events.c +++ b/res/ari/resource_events.c @@ -32,6 +32,7 @@ #include "resource_events.h" #include "internal.h" #include "asterisk/stasis_app.h" +#include "asterisk/stasis_app_broadcast.h" void ast_ari_events_user_event(struct ast_variable *headers, struct ast_ari_events_user_event_args *args, @@ -87,3 +88,50 @@ void ast_ari_events_user_event(struct ast_variable *headers, "Error processing request"); } } + +void ast_ari_events_claim_channel(struct ast_variable *headers, + struct ast_ari_events_claim_channel_args *args, + struct ast_ari_response *response) +{ + int res; + + if (ast_strlen_zero(args->channel_id)) { + ast_ari_response_error(response, 400, "Bad Request", + "channelId parameter is required"); + return; + } + + if (ast_strlen_zero(args->application)) { + ast_ari_response_error(response, 400, "Bad Request", + "application parameter is required"); + return; + } + + res = stasis_app_claim_channel(args->channel_id, args->application); + + switch (res) { + case 0: + /* Success */ + ast_ari_response_no_content(response); + break; + case -1: + /* Channel not found */ + ast_ari_response_error(response, 404, "Not Found", + "Channel not found or not in broadcast state"); + break; + case -2: + /* Already claimed */ + ast_ari_response_error(response, 409, "Conflict", + "Channel has already been claimed by another application"); + break; + case AST_OPTIONAL_API_UNAVAILABLE: + /* Module not loaded */ + ast_ari_response_error(response, 501, "Not Implemented", + "Broadcast functionality not available (res_stasis_broadcast not loaded)"); + break; + default: + ast_ari_response_error(response, 500, "Internal Server Error", + "Failed to claim channel"); + break; + } +} diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h index d067559818..2d4270b7d3 100644 --- a/res/ari/resource_events.h +++ b/res/ari/resource_events.h @@ -84,5 +84,33 @@ int ast_ari_events_user_event_parse_body( * \param[out] response HTTP response */ void ast_ari_events_user_event(struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response); +/*! Argument struct for ast_ari_events_claim_channel() */ +struct ast_ari_events_claim_channel_args { + /*! The ID of the channel to claim */ + const char *channel_id; + /*! The name of the application claiming the channel */ + const char *application; +}; +/*! + * \brief Body parsing function for /events/claim. + * \param body The JSON body from which to parse parameters. + * \param[out] args The args structure to parse into. + * \retval zero on success + * \retval non-zero on failure + */ +int ast_ari_events_claim_channel_parse_body( + struct ast_json *body, + struct ast_ari_events_claim_channel_args *args); + +/*! + * \brief Claim a broadcast channel for this application. + * + * Atomically claims a channel that is in broadcast state. Only the first claim succeeds. + * + * \param headers HTTP headers + * \param args Swagger parameters + * \param[out] response HTTP response + */ +void ast_ari_events_claim_channel(struct ast_variable *headers, struct ast_ari_events_claim_channel_args *args, struct ast_ari_response *response); #endif /* _ASTERISK_RESOURCE_EVENTS_H */ diff --git a/res/res_ari_events.c b/res/res_ari_events.c index 71d7dbf71f..7304199b10 100644 --- a/res/res_ari_events.c +++ b/res/res_ari_events.c @@ -207,6 +207,91 @@ fin: __attribute__((unused)) ast_free(args.source); return; } +int ast_ari_events_claim_channel_parse_body( + struct ast_json *body, + struct ast_ari_events_claim_channel_args *args) +{ + struct ast_json *field; + /* Parse query parameters out of it */ + field = ast_json_object_get(body, "channelId"); + if (field) { + args->channel_id = ast_json_string_get(field); + } + field = ast_json_object_get(body, "application"); + if (field) { + args->application = ast_json_string_get(field); + } + return 0; +} + +/*! + * \brief Parameter parsing callback for /events/claim. + * \param ser TCP/TLS session object + * \param get_params GET parameters in the HTTP request. + * \param path_vars Path variables extracted from the request. + * \param headers HTTP headers. + * \param body + * \param[out] response Response to the HTTP request. + */ +static void ast_ari_events_claim_channel_cb( + struct ast_tcptls_session_instance *ser, + struct ast_variable *get_params, struct ast_variable *path_vars, + struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response) +{ + struct ast_ari_events_claim_channel_args args = {}; + struct ast_variable *i; +#if defined(AST_DEVMODE) + int is_valid; + int code; +#endif /* AST_DEVMODE */ + + for (i = get_params; i; i = i->next) { + if (strcmp(i->name, "channelId") == 0) { + args.channel_id = (i->value); + } else + if (strcmp(i->name, "application") == 0) { + args.application = (i->value); + } else + {} + } + if (ast_ari_events_claim_channel_parse_body(body, &args)) { + ast_ari_response_alloc_failed(response); + goto fin; + } + ast_ari_events_claim_channel(headers, &args, response); +#if defined(AST_DEVMODE) + code = response->response_code; + + switch (code) { + case 0: /* Implementation is still a stub, or the code wasn't set */ + is_valid = response->message == NULL; + break; + case 500: /* Internal Server Error */ + case 501: /* Not Implemented */ + case 404: /* Channel not found or not in broadcast state. */ + case 409: /* Channel has already been claimed by another application. */ + is_valid = 1; + break; + default: + if (200 <= code && code <= 299) { + is_valid = ast_ari_validate_void( + response->message); + } else { + ast_log(LOG_ERROR, "Invalid error response %d for /events/claim\n", code); + is_valid = 0; + } + } + + if (!is_valid) { + ast_log(LOG_ERROR, "Response validation failed for /events/claim\n"); + ast_ari_response_error(response, 500, + "Internal Server Error", "Response validation failed"); + } +#endif /* AST_DEVMODE */ + +fin: __attribute__((unused)) + return; +} /*! \brief REST handler for /api-docs/events.json */ static struct stasis_rest_handlers events_user_eventName = { @@ -227,12 +312,21 @@ static struct stasis_rest_handlers events_user = { .children = { &events_user_eventName, } }; /*! \brief REST handler for /api-docs/events.json */ +static struct stasis_rest_handlers events_claim = { + .path_segment = "claim", + .callbacks = { + [AST_HTTP_POST] = ast_ari_events_claim_channel_cb, + }, + .num_children = 0, + .children = { } +}; +/*! \brief REST handler for /api-docs/events.json */ static struct stasis_rest_handlers events = { .path_segment = "events", .callbacks = { }, - .num_children = 1, - .children = { &events_user, } + .num_children = 2, + .children = { &events_user,&events_claim, } }; static int unload_module(void) diff --git a/res/res_stasis_broadcast.c b/res/res_stasis_broadcast.c new file mode 100644 index 0000000000..e9fb74b5d4 --- /dev/null +++ b/res/res_stasis_broadcast.c @@ -0,0 +1,933 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2026, Aurora Innovation AB + * + * Daniel Donoghue + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Stasis application broadcast resource + * + * \author Daniel Donoghue + */ + +/*** MODULEINFO + res_stasis + res_ari + extended + ***/ + +#include "asterisk.h" + +#include +#include + +#include "asterisk/astobj2.h" +#include "asterisk/channel.h" +#include "asterisk/http.h" +#include "asterisk/json.h" +#include "asterisk/lock.h" +#include "asterisk/module.h" +#include "asterisk/pbx.h" +#include "asterisk/stasis_app.h" +#include "asterisk/stasis_app_impl.h" +#include "asterisk/stasis_channels.h" +#include "asterisk/time.h" +#include "asterisk/utils.h" + +#define AST_API_MODULE /* Mark this as the module providing the API */ +#include "asterisk/stasis_app_broadcast.h" + +#define BROADCAST_BUCKETS 37 + +/*! \brief Maximum length for app_filter regex pattern */ +#define MAX_REGEX_LENGTH 256 + +/*! \brief Maximum depth for regex group nesting */ +#define MAX_GROUP_DEPTH 10 + +/*! \brief Maximum number of nested quantifiers in regex */ +#define MAX_NESTED_QUANTIFIERS 3 + +/*! \brief Maximum value for brace quantifier bounds {m,n} */ +#define MAX_QUANTIFIER_BOUND 100 + +/*! \brief Maximum alternations allowed in deeply nested groups */ +#define MAX_ALTERNATIONS 20 + +/*! \brief Group depth threshold for alternation limits */ +#define ALTERNATION_DEPTH_THRESHOLD 2 + +/*! \brief Maximum broadcast timeout in milliseconds (24 hours) */ +#define MAX_BROADCAST_TIMEOUT_MS (24 * 60 * 60 * 1000) + +/*! \brief Interval in ms between hangup checks while waiting for a claim */ +#define BROADCAST_POLL_INTERVAL_MS 200 + +/*! \brief Broadcast context stored on channel */ +struct stasis_broadcast_ctx { + /*! The unique ID of the channel */ + char channel_id[AST_MAX_PUBLIC_UNIQUEID]; + /*! Name of the winning application (dynamically allocated, NULL until claimed) */ + char *winner_app; + /*! Regex pattern used to filter broadcast recipients */ + char app_filter[MAX_REGEX_LENGTH + 1]; + /*! Compiled regex for app_filter (valid only when filter_compiled is set) */ + regex_t compiled_filter; + /*! Flag indicating if channel was claimed */ + unsigned int claimed:1; + /*! Whether compiled_filter is valid and must be freed */ + unsigned int filter_compiled:1; + /*! Set when the PBX thread retrieves the winner; prevents late claims */ + unsigned int finished:1; + /*! Broadcast behaviour flags (STASIS_BROADCAST_FLAG_*) */ + unsigned int flags; + /*! Reference to the global container (prevents use-after-free during module unload) */ + struct ao2_container *container; + /*! Condition variable for claim notification */ + ast_cond_t cond; +}; + +/*! \brief Container for all active broadcast contexts */ +static struct ao2_container *broadcast_contexts; + +/*! \brief Destructor for broadcast datastore + * + * Called when the channel is destroyed. Ensures the broadcast context + * is unlinked from the global container even if the caller never + * reached stasis_app_broadcast_cleanup (e.g. abnormal channel teardown). + */ +static void broadcast_datastore_destroy(void *data) +{ + struct stasis_broadcast_ctx *ctx = data; + + if (ctx->container) { + ao2_unlink(ctx->container, ctx); + } + ao2_cleanup(ctx); +} + +/*! \brief Datastore information for broadcast context */ +static const struct ast_datastore_info broadcast_datastore_info = { + .type = "stasis_broadcast_context", + .destroy = broadcast_datastore_destroy, +}; + +AO2_STRING_FIELD_HASH_FN(stasis_broadcast_ctx, channel_id) +AO2_STRING_FIELD_CMP_FN(stasis_broadcast_ctx, channel_id) + +/*! \brief Destructor for broadcast context */ +static void broadcast_ctx_destructor(void *obj) +{ + struct stasis_broadcast_ctx *ctx = obj; + ast_free(ctx->winner_app); + if (ctx->filter_compiled) { + regfree(&ctx->compiled_filter); + } + ao2_cleanup(ctx->container); + ast_cond_destroy(&ctx->cond); +} + +static int validate_regex_pattern(const char *pattern); + +/*! \brief Create a new broadcast context + * + * Validates and compiles the app_filter regex if provided. On regex + * failure the context is still created but broadcasts will be sent + * to all applications (i.e. no filtering). + */ +static struct stasis_broadcast_ctx *broadcast_ctx_create( + const char *channel_id, const char *app_filter, unsigned int flags) +{ + struct stasis_broadcast_ctx *ctx; + + ctx = ao2_alloc(sizeof(*ctx), broadcast_ctx_destructor); + if (!ctx) { + return NULL; + } + + /* ao2_alloc zeroes the struct; only set non-zero fields explicitly */ + ast_copy_string(ctx->channel_id, channel_id, sizeof(ctx->channel_id)); + ctx->flags = flags; + ctx->container = ao2_bump(broadcast_contexts); + ast_cond_init(&ctx->cond, NULL); + + /* Validate and compile app_filter regex if provided */ + if (!ast_strlen_zero(app_filter)) { + ast_copy_string(ctx->app_filter, app_filter, sizeof(ctx->app_filter)); + if (validate_regex_pattern(app_filter) != 0) { + ast_log(LOG_WARNING, + "Channel %s: rejecting app_filter regex as potentially dangerous: %s\n", + channel_id, app_filter); + } else if (regcomp(&ctx->compiled_filter, app_filter, + REG_EXTENDED | REG_NOSUB) != 0) { + ast_log(LOG_WARNING, + "Channel %s: failed to compile app_filter regex '%s'\n", + channel_id, app_filter); + } else { + ctx->filter_compiled = 1; + } + + if (!ctx->filter_compiled) { + ast_log(LOG_WARNING, + "Channel %s: proceeding without application filtering due to invalid regex\n", + channel_id); + } + } + + ast_debug(1, "Created broadcast context for channel %s (filter: %s, flags: 0x%x)\n", + ctx->channel_id, + ctx->filter_compiled ? ctx->app_filter : "none", + ctx->flags); + + return ctx; +} + +/*! + * \brief Validate a regex pattern for safety + * + * Checks that the regex pattern is within length limits and doesn't contain + * patterns that could cause excessive backtracking or denial of service. + * + * \param pattern The regex pattern to validate + * \return 0 if valid, -1 if invalid + */ +static int validate_regex_pattern(const char *pattern) +{ + size_t len; + int group_depth = 0; + int quantified_groups = 0; + int in_class = 0; /* Inside [...] */ + /* Track alternations per group depth. Index 0 is outside groups and unused. */ + int alternations_per_depth[MAX_GROUP_DEPTH + 1] = { 0 }; + const char *p; + + if (ast_strlen_zero(pattern)) { + return 0; /* Empty pattern is valid (will be skipped) */ + } + + /* Check maximum length to prevent excessive regex compilation time */ + len = strlen(pattern); + if (len > MAX_REGEX_LENGTH) { + ast_debug(3, "Regex pattern exceeds maximum length of %d characters (got %zu)\n", + MAX_REGEX_LENGTH, len); + return -1; + } + + /* + * Check for potentially dangerous patterns that could cause + * excessive regex compilation or matching time. Look for: + * - Excessive group nesting depth + * - Too many quantified groups (groups followed by +, *, or ?) + * + * Note: This is a heuristic approach that catches common dangerous + * patterns. Combined with the length limit, it provides reasonable + * protection against ReDoS while allowing legitimate regex usage. + */ + for (p = pattern; *p; p++) { + /* Handle character classes: enter on unescaped '[' and exit on unescaped ']' */ + if (!in_class && *p == '[' && (p == pattern || *(p - 1) != '\\')) { + in_class = 1; + /* In POSIX ERE, ']' immediately after '[' or '[^' is a + * literal, not the end of the class. Advance past the + * optional negation caret and the literal ']' so the + * main loop does not leave in_class prematurely. */ + if (*(p + 1) == '^') { + p++; + } + if (*(p + 1) == ']') { + p++; + } + continue; + } else if (in_class) { + if (*p == '\\') { + /* Skip the next escaped character inside character class */ + if (*(p + 1)) { + p++; + } + continue; + } + if (*p == ']') { + in_class = 0; + } + /* Ignore everything inside character classes for heuristics */ + continue; + } + switch (*p) { + case '(': + group_depth++; + if (group_depth > MAX_GROUP_DEPTH) { + ast_debug(3, "Regex pattern has too many nested groups (max %d)\n", + MAX_GROUP_DEPTH); + return -1; + } + /* Reset alternation counter for newly entered group depth */ + alternations_per_depth[group_depth] = 0; + break; + case ')': + if (group_depth > 0) { + /* Clear alternations count for this depth before leaving */ + alternations_per_depth[group_depth] = 0; + group_depth--; + } + break; + case '+': + case '*': + case '?': + /* + * Count quantified groups - patterns like (...)+ or (...)* + * Too many of these can cause slow matching on certain inputs. + */ + if (p > pattern && *(p - 1) == ')') { + quantified_groups++; + } + break; + case '{': { + /* Parse POSIX quantifier {m}, {m,}, {m,n} with overflow and bound checks */ + const char *q = p + 1; + long m = 0, n = -1; /* n=-1 means open upper bound */ + int valid = 0; + int digit; + int overflow = 0; + + if (*q >= '0' && *q <= '9') { + /* Parse m safely */ + while (*q >= '0' && *q <= '9') { + digit = (*q - '0'); + if (m > (LONG_MAX - digit) / 10) { /* overflow on next step */ + overflow = 1; + break; + } + m = (m * 10) + digit; + if (m > MAX_QUANTIFIER_BOUND) { /* early bound exceed */ + overflow = 1; + break; + } + q++; + } + if (!overflow && *q == ',') { + q++; + if (*q >= '0' && *q <= '9') { + long nn = 0; + while (*q >= '0' && *q <= '9') { + digit = (*q - '0'); + if (nn > (LONG_MAX - digit) / 10) { + overflow = 1; + break; + } + nn = (nn * 10) + digit; + if (nn > MAX_QUANTIFIER_BOUND) { + overflow = 1; + break; + } + q++; + } + n = nn; + } else { + n = -1; /* open upper bound */ + } + } else if (!overflow) { + n = m; /* {m} */ + } + if (!overflow && *q == '}') { + valid = 1; + } + } + if (overflow) { + ast_debug(3, "Regex quantifier overflow or exceeds max bound (max %d)\n", MAX_QUANTIFIER_BOUND); + return -1; + } + if (valid) { + /* Additional bounds check (defensive) */ + if (m > MAX_QUANTIFIER_BOUND || (n != -1 && n > MAX_QUANTIFIER_BOUND)) { + ast_debug(3, "Regex quantifier bounds too large (max %d)\n", MAX_QUANTIFIER_BOUND); + return -1; + } + if (p > pattern && *(p - 1) == ')') { + quantified_groups++; + } + p = q; /* q currently points to '}' */ + } + break; + } + case '|': + if (group_depth > 0) { + alternations_per_depth[group_depth]++; + if (group_depth > ALTERNATION_DEPTH_THRESHOLD && + alternations_per_depth[group_depth] > MAX_ALTERNATIONS) { + ast_debug(3, + "Regex has too many alternations in deep group (depth %d, count %d, max %d)\n", + group_depth, + alternations_per_depth[group_depth], + MAX_ALTERNATIONS); + return -1; + } + } + break; + case '\\': + /* + * Skip the next character entirely from heuristic processing. + * This ensures escaped characters (metacharacters in BRE or literals + * in ERE like \(, \), \+, \*, \?, etc.) do not affect group depth + * or quantified group counts. + */ + if (*(p + 1)) { + p++; + } + /* Continue to next loop iteration without evaluating the escaped char */ + continue; + } + } + + /* + * Reject patterns with too many quantified groups, as these are + * often indicators of potentially slow patterns that could be + * exploited for denial of service. + */ + if (quantified_groups > MAX_NESTED_QUANTIFIERS) { + ast_debug(3, "Regex pattern has too many quantified groups (max %d)\n", + MAX_NESTED_QUANTIFIERS); + return -1; + } + + return 0; +} + +/*! \brief Create and send broadcast event to all applications + * + * Uses the compiled regex cached in \a ctx for application filtering. + */ +static int send_broadcast_event(struct ast_channel *chan, + struct stasis_broadcast_ctx *ctx) +{ + RAII_VAR(struct ast_json *, event, NULL, ast_json_unref); + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup); + struct ao2_iterator iter; + char *app_name; + const char *caller = NULL; + const char *called = NULL; + + /* Get snapshot and caller/called info under a single channel lock */ + ast_channel_lock(chan); + snapshot = ao2_bump(ast_channel_snapshot(chan)); + caller = ast_strdupa(S_OR(ast_channel_caller(chan)->id.number.str, "")); + called = ast_strdupa(S_OR(ast_channel_exten(chan), "")); + ast_channel_unlock(chan); + + /* Build the broadcast event. Channel variables configured in + * ari.conf "channelvars" are already included in the channel + * snapshot produced by ast_channel_snapshot_to_json(). */ + event = ast_json_pack("{s: s, s: o, s: o, s: s?, s: s?}", + "type", "CallBroadcast", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), + "channel", ast_channel_snapshot_to_json(snapshot, NULL), + "caller", caller, + "called", called); + + if (!event) { + ast_log(LOG_ERROR, "Channel %s: failed to create broadcast event\n", + ast_channel_name(chan)); + return -1; + } + + /* Get all registered applications */ + apps = stasis_app_get_all(); + if (!apps) { + ast_log(LOG_ERROR, "Channel %s: failed to get stasis applications\n", + ast_channel_name(chan)); + return -1; + } + + ast_debug(2, "Broadcasting to %d registered Stasis applications\n", + ao2_container_count(apps)); + + /* + * Broadcast to all matching applications. + * + * We collect matching apps into a plain array, Fisher-Yates shuffle it, + * then call stasis_app_send() for each. stasis_app_send() writes + * directly to each app's WebSocket socket synchronously on the calling + * thread. The shuffle ensures no single ARI application is consistently + * first to receive the event — every app gets a fair chance to claim the + * channel regardless of its position in the ao2 hash container. + */ + { + int app_count; + char **matching_arr; + int n = 0; + int i; + + app_count = ao2_container_count(apps); + if (app_count == 0) { + ast_debug(2, "Channel %s: no Stasis applications registered\n", + ast_channel_uniqueid(chan)); + return 0; + } + + matching_arr = ast_malloc(app_count * sizeof(*matching_arr)); + if (!matching_arr) { + ast_log(LOG_ERROR, "Channel %s: failed to allocate matching apps array\n", + ast_channel_name(chan)); + return -1; + } + + /* First pass: collect all matching app names (transfer refs to array) */ + iter = ao2_iterator_init(apps, 0); + while ((app_name = ao2_iterator_next(&iter)) && n < app_count) { + if (ctx->filter_compiled && + regexec(&ctx->compiled_filter, app_name, 0, NULL, 0) == REG_NOMATCH) { + ast_debug(3, "App '%s' does not match filter, skipping\n", app_name); + ao2_ref(app_name, -1); + continue; + } + matching_arr[n++] = app_name; /* ref transferred to array */ + } + ao2_iterator_destroy(&iter); + + ast_debug(2, "Broadcasting channel %s to %d matching applications\n", + ast_channel_uniqueid(chan), n); + + /* Fisher-Yates shuffle: randomise delivery order so no app is + * consistently first to receive the broadcast event. */ + for (i = n - 1; i > 0; i--) { + int j = ast_random() % (i + 1); + char *tmp = matching_arr[i]; + matching_arr[i] = matching_arr[j]; + matching_arr[j] = tmp; + } + + /* + * Second pass: send to each matching app. A deep copy of the event + * is required for each call because stasis_app_send() mutates the + * message in-place (adds "asterisk_id" via ast_json_object_set). + */ + for (i = 0; i < n; i++) { + char *match_name = matching_arr[i]; + struct ast_json *event_copy; + + ast_debug(3, "Sending broadcast to app '%s'\n", match_name); + + event_copy = ast_json_deep_copy(event); + if (!event_copy) { + ast_log(LOG_ERROR, + "Channel %s: failed to deep-copy event for app '%s'\n", + ast_channel_uniqueid(chan), match_name); + ao2_ref(match_name, -1); + continue; + } + + stasis_app_send(match_name, event_copy); + ast_json_unref(event_copy); + ao2_ref(match_name, -1); + } + + ast_free(matching_arr); + } + + return 0; +} + +/*! + * \brief Start a broadcast for a channel + * \param chan The channel to broadcast + * \param timeout_ms Timeout in milliseconds + * \param app_filter Optional regex filter for applications + * \return 0 on success, -1 on error + */ +int AST_OPTIONAL_API_NAME(stasis_app_broadcast_channel)(struct ast_channel *chan, int timeout_ms, + const char *app_filter, unsigned int flags) +{ + RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup); + struct ast_datastore *datastore; + + if (!chan) { + return -1; + } + + if (!broadcast_contexts) { + return -1; + } + + /* Remove any previous broadcast datastore from a prior attempt. + * This supports failover scenarios where StasisBroadcast() is + * called multiple times for the same channel. The datastore + * destructor unlinks the old context from the container. */ + { + struct ast_datastore *old_ds; + ast_channel_lock(chan); + old_ds = ast_channel_datastore_find(chan, &broadcast_datastore_info, NULL); + if (old_ds) { + ast_channel_datastore_remove(chan, old_ds); + } + ast_channel_unlock(chan); + if (old_ds) { + ast_datastore_free(old_ds); + } + } + + /* Create broadcast context (validates and compiles app_filter regex) */ + ctx = broadcast_ctx_create(ast_channel_uniqueid(chan), app_filter, flags); + if (!ctx) { + ast_log(LOG_ERROR, "Channel %s: failed to create broadcast context\n", + ast_channel_uniqueid(chan)); + return -1; + } + + /* Store context in container */ + ao2_link(broadcast_contexts, ctx); + + /* Create and attach datastore to channel */ + datastore = ast_datastore_alloc(&broadcast_datastore_info, ast_channel_uniqueid(chan)); + if (!datastore) { + ast_log(LOG_ERROR, "Channel %s: failed to allocate broadcast datastore\n", + ast_channel_uniqueid(chan)); + ao2_unlink(broadcast_contexts, ctx); + return -1; + } + + datastore->data = ao2_bump(ctx); + ast_channel_lock(chan); + if (ast_channel_datastore_add(chan, datastore)) { + ast_channel_unlock(chan); + ast_log(LOG_ERROR, "Channel %s: failed to attach broadcast datastore\n", + ast_channel_uniqueid(chan)); + ast_datastore_free(datastore); + ao2_unlink(broadcast_contexts, ctx); + return -1; + } + ast_channel_unlock(chan); + + ast_debug(1, "Starting broadcast for channel %s (timeout: %dms, filter: %s)\n", + ast_channel_uniqueid(chan), timeout_ms, app_filter ? app_filter : "none"); + + /* Send broadcast event to all matching applications */ + if (send_broadcast_event(chan, ctx) != 0) { + ast_log(LOG_ERROR, "Channel %s: failed to send broadcast event\n", + ast_channel_uniqueid(chan)); + ast_channel_lock(chan); + ast_channel_datastore_remove(chan, datastore); + ast_channel_unlock(chan); + ast_datastore_free(datastore); + ao2_unlink(broadcast_contexts, ctx); + return -1; + } + + return 0; +} + +/*! + * \brief Attempt to claim a broadcast channel + * \param channel_id The unique ID of the channel + * \param app_name The name of the application claiming the channel + * \return 0 if claim successful, -1 if channel not found, -2 if already claimed + */ +int AST_OPTIONAL_API_NAME(stasis_app_claim_channel)(const char *channel_id, const char *app_name) +{ + RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup); + + if (ast_strlen_zero(channel_id) || ast_strlen_zero(app_name)) { + return -1; + } + + if (!broadcast_contexts) { + return -1; + } + + /* Find broadcast context */ + ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY); + if (!ctx) { + ast_debug(1, "No broadcast context found for channel %s\n", channel_id); + return -1; + } + + /* Atomically check and set claimed flag. + * Check claimed before finished: if the channel was claimed and then the + * broadcast finished, a late claim should return -2 (409 Conflict) rather + * than -1 (404) so callers can distinguish "already taken" from "not found". */ + ao2_lock(ctx); + if (ctx->claimed) { + ast_debug(1, "Channel %s already claimed by %s (attempt by %s denied)\n", + channel_id, ctx->winner_app ? ctx->winner_app : "(unknown)", app_name); + ao2_unlock(ctx); + return -2; + } + if (ctx->finished) { + ast_debug(1, "Channel %s broadcast already finished (late claim by %s rejected)\n", + channel_id, app_name); + ao2_unlock(ctx); + return -1; + } + ctx->winner_app = ast_strdup(app_name); + if (!ctx->winner_app) { + ast_log(LOG_ERROR, + "Failed to allocate winner app name for channel %s\n", + channel_id); + ao2_unlock(ctx); + return -1; + } + ctx->claimed = 1; + ast_verb(3, "Channel %s claimed by application %s\n", + channel_id, app_name); + /* Signal waiting thread that channel was claimed */ + ast_cond_signal(&ctx->cond); + ao2_unlock(ctx); + + /* Send CallClaimed event to matching apps */ + if (!(ctx->flags & STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED)) { + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, event, NULL, ast_json_unref); + RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup); + struct ao2_iterator iter; + char *app_name_iter; + + snapshot = ast_channel_snapshot_get_latest(channel_id); + if (snapshot) { + event = ast_json_pack("{s: s, s: o, s: o, s: s}", + "type", "CallClaimed", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), + "channel", ast_channel_snapshot_to_json(snapshot, NULL), + "winner_app", app_name); + } + if (event) { + apps = stasis_app_get_all(); + } + if (apps) { + iter = ao2_iterator_init(apps, 0); + while ((app_name_iter = ao2_iterator_next(&iter))) { + struct ast_json *event_copy; + + /* Only send to apps that matched the original broadcast filter */ + if (ctx->filter_compiled && + regexec(&ctx->compiled_filter, app_name_iter, + 0, NULL, 0) == REG_NOMATCH) { + ao2_ref(app_name_iter, -1); + continue; + } + + event_copy = ast_json_deep_copy(event); + if (!event_copy) { + ao2_ref(app_name_iter, -1); + continue; + } + + stasis_app_send(app_name_iter, event_copy); + ast_json_unref(event_copy); + ao2_ref(app_name_iter, -1); + } + ao2_iterator_destroy(&iter); + } + } + + return 0; +} + +/*! + * \brief Get the winner app name for a broadcast channel + * \param channel_id The unique ID of the channel + * \return A copy of the winner app name (caller must free with ast_free), + * or NULL if not claimed or not found + */ +char *AST_OPTIONAL_API_NAME(stasis_app_broadcast_winner)(const char *channel_id) +{ + RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup); + char *winner = NULL; + + if (ast_strlen_zero(channel_id)) { + return NULL; + } + + ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY); + if (!ctx) { + return NULL; + } + + ao2_lock(ctx); + if (ctx->claimed) { + winner = ast_strdup(ctx->winner_app); + } + /* Mark the broadcast as finished so no new claims can succeed. + * This closes the race window between reading the winner and + * the subsequent broadcast_cleanup call. */ + ctx->finished = 1; + ao2_unlock(ctx); + + return winner; +} + +/*! + * \brief Wait for a broadcast channel to be claimed + * + * Blocks until the channel is claimed, the timeout expires, or the + * channel hangs up. The hangup check runs every + * #BROADCAST_POLL_INTERVAL_MS so that a dead channel does not tie up + * a PBX thread for the full timeout period. + * + * \param chan The channel + * \param timeout_ms Maximum time to wait in milliseconds + * \return 0 if claimed within timeout, -1 otherwise + */ +int AST_OPTIONAL_API_NAME(stasis_app_broadcast_wait)(struct ast_channel *chan, int timeout_ms) +{ + RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup); + const char *channel_id; + struct timeval deadline; + int result = -1; + + if (!chan) { + return -1; + } + + channel_id = ast_channel_uniqueid(chan); + + if (!broadcast_contexts) { + return -1; + } + + ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY); + if (!ctx) { + ast_log(LOG_WARNING, "No broadcast context for channel %s\n", channel_id); + return -1; + } + + /* Cap excessive timeouts to prevent arithmetic overflow */ + if (timeout_ms < 0) { + timeout_ms = 0; + } else if (timeout_ms > MAX_BROADCAST_TIMEOUT_MS) { + timeout_ms = MAX_BROADCAST_TIMEOUT_MS; + } + + /* Calculate absolute deadline */ + deadline = ast_tvadd(ast_tvnow(), + ast_tv(timeout_ms / 1000, (timeout_ms % 1000) * 1000)); + + ao2_lock(ctx); + while (!ctx->claimed) { + struct timeval now; + struct timespec poll_spec; + long remaining_ms; + long poll_ms; + int wait_result; + + /* Check for hangup so we don't block on a dead channel */ + if (ast_check_hangup(chan)) { + ast_debug(3, "Channel %s hung up during broadcast wait\n", + channel_id); + break; + } + + /* Check if we've passed the overall deadline */ + now = ast_tvnow(); + remaining_ms = ast_tvdiff_ms(deadline, now); + if (remaining_ms <= 0) { + ast_debug(3, "Broadcast timeout for channel %s after %dms\n", + channel_id, timeout_ms); + break; + } + + /* Sleep for the shorter of the remaining time and the poll interval */ + poll_ms = remaining_ms; + if (poll_ms > BROADCAST_POLL_INTERVAL_MS) { + poll_ms = BROADCAST_POLL_INTERVAL_MS; + } + + poll_spec.tv_sec = now.tv_sec + (poll_ms / 1000); + poll_spec.tv_nsec = (long)(now.tv_usec) * 1000L + + (long)(poll_ms % 1000) * 1000000L; + while (poll_spec.tv_nsec >= 1000000000) { + poll_spec.tv_sec++; + poll_spec.tv_nsec -= 1000000000; + } + + wait_result = ast_cond_timedwait(&ctx->cond, ao2_object_get_lockaddr(ctx), &poll_spec); + if (wait_result != 0 && wait_result != ETIMEDOUT) { + ast_log(LOG_WARNING, + "Channel %s: unexpected error waiting for claim: %s (%d)\n", + channel_id, strerror(wait_result), wait_result); + break; + } + /* Loop back: re-check claimed, then hangup, then deadline */ + } + + if (ctx->claimed) { + ast_debug(1, "Channel %s claimed by %s\n", + channel_id, ctx->winner_app); + result = 0; + } + ao2_unlock(ctx); + + return result; +} + +/*! + * \brief Clean up broadcast context for a channel + * + * This is the normal-path cleanup called by the dialplan application + * after the broadcast completes. The channel datastore destructor + * (broadcast_datastore_destroy) also unlinks the context as a safety + * net for abnormal teardown; ao2_unlink is idempotent so the double + * call is harmless. + * + * \param channel_id The unique ID of the channel + */ +void AST_OPTIONAL_API_NAME(stasis_app_broadcast_cleanup)(const char *channel_id) +{ + RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup); + + if (ast_strlen_zero(channel_id) || !broadcast_contexts) { + return; + } + + ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY | OBJ_UNLINK); + if (ctx) { + ast_debug(3, "Cleaning up broadcast context for %s\n", channel_id); + } +} + +static int load_module(void) +{ + broadcast_contexts = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, + BROADCAST_BUCKETS, stasis_broadcast_ctx_hash_fn, NULL, stasis_broadcast_ctx_cmp_fn); + + if (!broadcast_contexts) { + return AST_MODULE_LOAD_DECLINE; + } + + ast_debug(1, "Stasis broadcast module loaded\n"); + return AST_MODULE_LOAD_SUCCESS; +} + +static int unload_module(void) +{ + /* NULL the global pointer before releasing the reference so that + * concurrent lookups see NULL (safe) rather than a freed pointer. */ + { + struct ao2_container *old_contexts = broadcast_contexts; + broadcast_contexts = NULL; + ao2_cleanup(old_contexts); + } + + ast_debug(1, "Stasis broadcast module unloaded\n"); + return 0; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, + "Stasis application broadcast", + .support_level = AST_MODULE_SUPPORT_EXTENDED, + .load = load_module, + .unload = unload_module, + .requires = "res_stasis,res_ari,http", + .load_pri = AST_MODPRI_APP_DEPEND - 1, +); diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json index b7d14ef194..63d344056a 100644 --- a/rest-api/api-docs/events.json +++ b/rest-api/api-docs/events.json @@ -110,6 +110,52 @@ ] } ] + }, + { + "path": "/events/claim", + "description": "Broadcast channel claim operations", + "operations": [ + { + "httpMethod": "POST", + "since": [ + "20.17.0", + "22.7.0", + "23.1.0" + ], + "summary": "Claim a broadcast channel for this application.", + "notes": "Atomically claims a channel that is in broadcast state. Only the first claim succeeds.", + "nickname": "claimChannel", + "responseClass": "void", + "parameters": [ + { + "name": "channelId", + "description": "The ID of the channel to claim", + "paramType": "query", + "required": true, + "allowMultiple": false, + "dataType": "string" + }, + { + "name": "application", + "description": "The name of the application claiming the channel", + "paramType": "query", + "required": true, + "allowMultiple": false, + "dataType": "string" + } + ], + "errorResponses": [ + { + "code": 404, + "reason": "Channel not found or not in broadcast state." + }, + { + "code": 409, + "reason": "Channel has already been claimed by another application." + } + ] + } + ] } ], "models": { @@ -202,7 +248,9 @@ "ChannelConnectedLine", "PeerStatusChange", "ChannelTransfer", - "RESTResponse" + "RESTResponse", + "CallBroadcast", + "CallClaimed" ] }, "ContactInfo": { @@ -1167,6 +1215,43 @@ "description": "Response message body" } } + }, + "CallBroadcast": { + "id": "CallBroadcast", + "description": "Notification that a channel is being broadcast to ARI applications for claiming.", + "properties": { + "channel": { + "required": true, + "type": "Channel", + "description": "The channel being broadcast." + }, + "caller": { + "required": false, + "type": "string", + "description": "The caller ID number." + }, + "called": { + "required": false, + "type": "string", + "description": "The called number." + } + } + }, + "CallClaimed": { + "id": "CallClaimed", + "description": "Notification that a broadcast channel has been successfully claimed by an ARI application.", + "properties": { + "channel": { + "required": true, + "type": "Channel", + "description": "The channel that was claimed." + }, + "winner_app": { + "required": true, + "type": "string", + "description": "The name of the ARI application that claimed the channel." + } + } } } }