extension_state: Add new extension state API.

Extension state to this point has been an API implemented
inside the PBX core resulting in its state being intermingled
with that of the dialplan. This increased the complexity of
the PBX core and made it difficult to enact improvements.

This change adds a new separate extension state API
which receives updates from the PBX core as configuration
changes but maintains its own separate state. The API is
also written to fully take advantage of modern APIs in a
more selective manner by subscribing each extension state to
only the devices it is interested in, ultimately reducing
resource consumption during updates. Presence state updates
being infrequently done use a single shared subscription that
goes through the extension states to find and update ones
that the update is applicable to.

Legacy API support is provided by reimplementing the API
as wrappers over the new extension state API. This improves
the legacy API by making it multithreaded, with each callback
being individually subscribed.

Autohints support is maintained but has been separated out
into a self contained new implementation.

Synchronous subscription support has also been added to
Stasis to remove the overhead of asynchronous publishing when
the handling of published messages is small and fast.
This commit is contained in:
Joshua C. Colp
2026-02-10 17:42:50 -04:00
parent e45da15315
commit a1ffcfe37d
11 changed files with 2660 additions and 2144 deletions
+3
View File
@@ -59,6 +59,9 @@ void ast_msg_shutdown(void); /*!< Provided by message.c */
int aco_init(void); /*!< Provided by config_options.c */
int dns_core_init(void); /*!< Provided by dns_core.c */
int ast_refer_init(void); /*!< Provided by refer.c */
int ast_extension_state_init(void); /*!< Provided by extension_state.c */
int ast_extension_state_legacy_init(void); /*!< Provided by extension_state_legacy.c */
int ast_extension_state_autohints_init(void); /*!< Provided by extension_state_autohints.c */
/*!
* \brief Initialize malloc debug phase 1.
+210
View File
@@ -0,0 +1,210 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2026, Sangoma Technologies Corporation
*
* Joshua C. Colp <jcolp@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
* \ref ExtensionState
*
* \page ExtensionState API providing extension state management.
Before we talk about extension state let's talk about the fundamentals that
drive it. Extension state is driven based on three things: hints, device state,
and presence state. Hints are the stateless configuration that map a dialplan location,
context and extension, to zero or more devices and/or zero or more presence state
providers. Device state provides information about the associated device(s).
Presence state provides information about the associated presence state provider(s).
The extension state API itself acts as an aggregator of the device state and presence
state information using the hint configuration to determine both the individual
identifier as well as the aggregation sources. The API provides the ability to query
the current state of an extension, as well as subscribe to be notified when the state
of an extension changes.
When hint configuration changes this is reconciled and the extension state is updated
accordingly. If a hint has been added the corresponding extension state is created. If
a hint has been removed the corresponding extension state is removed. For cases where
the hint has been added or updated the sources of information are updated on the
extension state and its state is recalculated. This may involve subscribing to new
device state topics or unsubscribing from old ones.
Extension state uses synchronous per-device subscriptions to receive device state
updates. Synchronous is used as the overhead of asynchronous delivery is not worth
the added overhead and CPU for the small amount of work done when device states change.
On receipt of a device state update the extension device state is recalculated and if
the state has changed the extension device state is updated and any subscribers are
notified.
Presence state uses a single global subscription to receive presence state updates as
no per-presence state provider topic is available and also due to the extremely small
number of presence state updates that occur on a system. Just like device state
updates the extension presence state is recalculated and if it has changed it is
updated and any subscribers are notified.
To minimize querying of other APIs in Asterisk extension state keeps an internal
cache of device states and presence state on each extension state. This cache is
updated when device state or presence state changes and is used to determine the
aggregated state of an extension. The aggregated state is also cached on the
extension state for quick access by API users who do not subscribe to receive
updates.
*/
#ifndef _ASTERISK_EXTENSION_STATE_H
#define _ASTERISK_EXTENSION_STATE_H
#include "asterisk/pbx.h"
/*! \brief Individual device states that contributed to snapshot */
struct ast_extension_state_device_state_info {
/*! \brief The state of the device */
enum ast_device_state state;
/*! \brief The name of the device */
char device[0];
};
/*! \brief Device snapshot for an extension state*/
struct ast_extension_state_device_snapshot {
/*! \brief The state of the extension */
enum ast_extension_states state;
/*! \brief The device that caused this update */
struct ast_extension_state_device_state_info *causing_device;
/*! \brief Vector of additional device states that contributed to update */
AST_VECTOR(, struct ast_extension_state_device_state_info *) additional_devices;
};
/*! \brief Presence snapshot for an extension state */
struct ast_extension_state_presence_snapshot {
/*! \brief The presence state of the extension */
enum ast_presence_state presence_state;
/*! \brief The subtype of the presence state */
char *presence_subtype;
/*! \brief An optional message for the presence */
char *presence_message;
};
/*! \brief Stasis message for extension state update message */
struct ast_extension_state_update_message {
/*! \brief The old device snapshot */
struct ast_extension_state_device_snapshot *old_device_snapshot;
/*! \brief The new device snapshot - will be pointer equivalent to old if unchanged */
struct ast_extension_state_device_snapshot *new_device_snapshot;
/*! \brief The old presence snapshot */
struct ast_extension_state_presence_snapshot *old_presence_snapshot;
/*! \brief The new presence snapshot - will be pointer equivalent to old if unchanged */
struct ast_extension_state_presence_snapshot *new_presence_snapshot;
/*! \brief The dialplan context */
char *context;
/*! \brief The dialplan extension */
char extension[0];
};
/*! \brief Stasis message for extension state removal message */
struct ast_extension_state_remove_message {
/*! \brief The dialplan context */
char *context;
/*! \brief The dialplan extension */
char extension[0];
};
/*!
* \brief Get the Stasis topic to receive all extension state messages
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \return The topic for extension state messages
* \retval NULL if it has not been allocated
*/
struct stasis_topic *ast_extension_state_topic_all(void);
/*!
* \brief Get the Stasis topic to receive extension state messages for a specific extension
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \param exten The extension to receive extension state messages for
* \param context The context of the extension
* \return The topic for extension state messages
* \retval NULL if it has not been allocated
*/
struct stasis_topic *ast_extension_state_topic(const char *exten, const char *context);
/*!
* \brief Get the latest device state message for an extension
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \param chan The optional channel to get the underlying hint from, if it needs to be created
* \param exten The extension to get the device state message for
* \param context The context of the extension
* \return The latest device snapshot for the extension
* \retval NULL if the extension does not have a configured hint
*/
struct ast_extension_state_device_snapshot *ast_extension_state_get_latest_device_snapshot(struct ast_channel *chan,
const char *exten, const char *context);
/*!
* \brief Get the latest presence state message for an extension
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \param chan The optional channel to get the underlying hint from, if it needs to be created
* \param exten The extension to get the presence state message for
* \param context The context of the extension
* \return The latest presence snapshot for the extension
* \retval NULL if the extension does not have a configured hint
*/
struct ast_extension_state_presence_snapshot *ast_extension_state_get_latest_presence_snapshot(struct ast_channel *chan,
const char *exten, const char *context);
/*!
* \brief Get the channel that is causing the device to be in the given state, if any
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \param device The device itself
* \param device_state The state of the device
* \return The channel that is causing the device to be in the given state
* \retval NULL if there is no channel causing the device to be in the given state
*/
struct ast_channel *ast_extension_state_get_device_causing_channel(const char *device, enum ast_device_state device_state);
/*!
* \brief Get extension state update message type
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \retval Stasis message type for extension state update messages
*/
struct stasis_message_type *ast_extension_state_update_message_type(void);
/*!
* \brief Get extension state remove message type
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* \retval Stasis message type for extension state remove messages
*/
struct stasis_message_type *ast_extension_state_remove_message_type(void);
#endif /* ASTERISK_EXTENSION_STATE_H */
+31
View File
@@ -679,6 +679,37 @@ struct stasis_subscription *__stasis_subscribe_pool(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
#define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Create a subscription whose callbacks occur synchronously on message publishing
* \since 23.5.0
* \since 22.11.0
* \since 20.21.0
*
* In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
* up this reference), the subscription must be explicitly unsubscribed from its
* topic using stasis_unsubscribe().
*
* The invocations of the callback are serialized, but will almost certainly not
* always happen on the same thread. The invocation order of different subscriptions
* is unspecified.
*
* This subscription will be invoked on the same thread that is publishing the message.
*
* \param topic Topic to subscribe to.
* \param callback Callback function for subscription messages.
* \param data Data to be passed to the callback, in addition to the message.
* \param file, lineno, func
* \return New \ref stasis_subscription object.
* \retval NULL on error.
*
* \note This callback will receive a callback with a message indicating it
* has been subscribed. This occurs immediately before accepted message
* types can be set and the callback must expect to receive it.
*/
struct stasis_subscription *__stasis_subscribe_synchronous(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
#define stasis_subscribe_synchronous(topic, callback, data) __stasis_subscribe_synchronous(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Indicate to a subscription that we are interested in a message type.
*
+3
View File
@@ -4342,6 +4342,9 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou
check_init(load_pbx_switch(), "PBX Switch Support");
check_init(load_pbx_app(), "PBX Application Support");
check_init(load_pbx_hangup_handler(), "PBX Hangup Handler Support");
check_init(ast_extension_state_init(), "Extension State Support");
check_init(ast_extension_state_legacy_init(), "Extension State Legacy Support");
check_init(ast_extension_state_autohints_init(), "Extension State Autohints Support");
check_init(ast_local_init(), "Local Proxy Channel Driver");
check_init(ast_refer_init(), "Refer API");
File diff suppressed because it is too large Load Diff
+183
View File
@@ -0,0 +1,183 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2026, Sangoma Technologies Corporation
*
* Joshua Colp <jcolp@sangoma.com>
*
* See https://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/pbx.h"
#include "asterisk/stasis.h"
#include "asterisk/vector.h"
#include "pbx_private.h"
/*! \brief Lock to protect the autohints vector */
AST_MUTEX_DEFINE_STATIC(extension_state_autohints_lock);
/*! \brief Contexts which have autohints enabled */
static AST_VECTOR(, struct ast_context *) extension_state_autohints;
/*! \brief Subscription to receive updates so we can create hints as needed on autohint enabled contexts */
static struct stasis_subscription *autohints_subscription;
/*! \brief The static registrar for the added dialplan hints */
static const char registrar[] = "autohints";
/*!
* \internal
* \brief Callback for device state updates to create hints for autohint enabled contexts.
*
* \param userdata The user data passed to the subscription.
* \param sub The subscription that received the message.
* \param msg The message received by the subscription.
*/
static void extension_state_autohints_device_state_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg)
{
struct ast_device_state_message *device_state;
char *virtual_device, *type, *device_name;
int i;
if (stasis_message_type(msg) != ast_device_state_message_type()) {
return;
}
device_state = stasis_message_data(msg);
/* We only care about the aggregate state */
if (device_state->eid) {
return;
}
type = ast_strdupa(device_state->device);
if (ast_strlen_zero(type)) {
return;
}
/* Determine if this is a virtual/custom device or a real device */
virtual_device = strchr(type, ':');
device_name = strchr(type, '/');
if (virtual_device && (!device_name || (virtual_device < device_name))) {
device_name = virtual_device;
}
/* Invalid device state name - not a virtual/custom device and not a real device */
if (ast_strlen_zero(device_name)) {
return;
}
*device_name++ = '\0';
ast_wrlock_contexts();
ast_mutex_lock(&extension_state_autohints_lock);
for (i = 0; i < AST_VECTOR_SIZE(&extension_state_autohints); i++) {
struct ast_context *context = AST_VECTOR_GET(&extension_state_autohints, i);
struct pbx_find_info q = { .stacklen = 0 };
if (pbx_find_extension(NULL, NULL, &q, ast_get_context_name(context), device_name,
PRIORITY_HINT, NULL, "", E_MATCH)) {
continue;
}
/* The device has no hint in the context referenced by this autohint so create one */
ast_add_extension(ast_get_context_name(context), 0, device_name, PRIORITY_HINT, NULL, NULL,
device_state->device, ast_strdup(device_state->device), ast_free_ptr, registrar);
}
ast_mutex_unlock(&extension_state_autohints_lock);
ast_unlock_contexts();
}
/*!
* \internal
* \brief Compare an autohint's context name to a provided context name for searching the autohints vector
*/
#define AUTOHINT_CMP_CONTEXT_NAME(elem, name) (!strcmp(ast_get_context_name(elem), name))
void pbx_extension_state_autohint_set(struct ast_context *context)
{
ast_rdlock_contexts();
ast_mutex_lock(&extension_state_autohints_lock);
/* Since we store a pointer to the context we remove the old one by name and append the new one, easy */
AST_VECTOR_REMOVE_CMP_UNORDERED(&extension_state_autohints, ast_get_context_name(context), AUTOHINT_CMP_CONTEXT_NAME,
AST_VECTOR_ELEM_CLEANUP_NOOP);
AST_VECTOR_APPEND(&extension_state_autohints, context);
if (AST_VECTOR_SIZE(&extension_state_autohints) && !autohints_subscription) {
/* We have at least one context enabled with autohints and no subscription, so subscribe */
autohints_subscription = stasis_subscribe(ast_device_state_topic_all(), extension_state_autohints_device_state_cb, NULL);
}
ast_mutex_unlock(&extension_state_autohints_lock);
ast_unlock_contexts();
}
void pbx_extension_state_autohint_remove(struct ast_context *context, unsigned int forced)
{
int removed;
ast_wrlock_contexts();
ast_mutex_lock(&extension_state_autohints_lock);
if (!forced) {
removed = AST_VECTOR_REMOVE_CMP_UNORDERED(&extension_state_autohints, context, AST_VECTOR_ELEM_DEFAULT_CMP,
AST_VECTOR_ELEM_CLEANUP_NOOP);
} else {
removed = AST_VECTOR_REMOVE_CMP_UNORDERED(&extension_state_autohints, ast_get_context_name(context), AUTOHINT_CMP_CONTEXT_NAME,
AST_VECTOR_ELEM_CLEANUP_NOOP);
}
if (removed) {
ast_context_destroy_by_name(ast_get_context_name(context), registrar);
}
if (!AST_VECTOR_SIZE(&extension_state_autohints) && autohints_subscription) {
/* There's no more autohint enabled contexts, so unsubscribe */
autohints_subscription = stasis_unsubscribe(autohints_subscription);
}
ast_mutex_unlock(&extension_state_autohints_lock);
ast_unlock_contexts();
}
/*!
* \internal
* \brief Clean up the autohints extension state system, called at shutdown.
*/
static void extension_state_autohints_cleanup(void)
{
ast_mutex_lock(&extension_state_autohints_lock);
if (autohints_subscription) {
autohints_subscription = stasis_unsubscribe(autohints_subscription);
}
/* The vector just contains pointers so no need to free the individual items */
AST_VECTOR_FREE(&extension_state_autohints);
ast_mutex_unlock(&extension_state_autohints_lock);
}
int ast_extension_state_autohints_init(void)
{
/* Most likely there will be at most one context configured for autohints, or zero */
if (AST_VECTOR_INIT(&extension_state_autohints, 1)) {
return -1;
}
ast_register_cleanup(extension_state_autohints_cleanup);
return 0;
}
+477
View File
@@ -0,0 +1,477 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2026, Sangoma Technologies Corporation
*
* Joshua Colp <jcolp@sangoma.com>
*
* See https://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/extension_state.h"
#include "asterisk/pbx.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/astobj2.h"
#include "asterisk/lock.h"
#include "asterisk/vector.h"
struct extension_state_legacy_state_cb {
/*! Watcher ID returned when registered. */
int id;
/*! Message router for the traffic to this callback */
struct stasis_message_router *router;
/*! Arbitrary data passed for callbacks. */
void *data;
/*! Flag if this callback is an extended callback containing detailed device status */
int extended;
/*! Callback when state changes. */
ast_state_cb_type change_cb;
/*! Callback when destroyed so any resources given by the registerer can be freed. */
ast_state_cb_destroy_type destroy_cb;
};
/*! \brief Lock to protect the callbacks vector */
AST_MUTEX_DEFINE_STATIC(extension_state_legacy_callbacks_lock);
/*! \brief Legacy callbacks, the index of it in the vector is the id given to the API user for per-extension */
static AST_VECTOR(, struct extension_state_legacy_state_cb *) extension_state_legacy_callbacks;
int ast_extension_state(struct ast_channel *c, const char *context, const char *exten)
{
struct ast_extension_state_device_snapshot *device_snapshot;
enum ast_extension_states device_state;
device_snapshot = ast_extension_state_get_latest_device_snapshot(c, exten, context);
if (!device_snapshot) {
return -1;
}
device_state = device_snapshot->state;
ao2_ref(device_snapshot, -1);
return device_state;
}
int ast_hint_presence_state(struct ast_channel *c, const char *context, const char *exten, char **subtype, char **message)
{
struct ast_extension_state_presence_snapshot *presence_snapshot;
enum ast_presence_state presence_state;
presence_snapshot = ast_extension_state_get_latest_presence_snapshot(c, exten, context);
if (!presence_snapshot) {
return -1;
}
presence_state = presence_snapshot->presence_state;
if (presence_snapshot->presence_subtype) {
*subtype = ast_strdup(presence_snapshot->presence_subtype);
}
if (presence_snapshot->presence_message) {
*message = ast_strdup(presence_snapshot->presence_message);
}
ao2_ref(presence_snapshot, -1);
return presence_state;
}
/*!
* \internal
* \brief Destroy function for device state info objects.
*
* \param obj The device state info object to destroy.
*/
static void device_state_info_destroy(void *obj)
{
struct ast_device_state_info *info = obj;
ao2_cleanup(info->causing_channel);
}
/*!
* \internal
* \brief Create a container of device state info objects from an extension device state message.
*
* \param device_state_message The extension device state message to create device state info from.
* \return A container of device state info objects, or NULL on failure.
*/
static struct ao2_container *extension_state_legacy_create_device_state_info(struct ast_extension_state_device_snapshot *device_snapshot)
{
struct ao2_container *device_state_info = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
int i;
if (!device_state_info) {
return NULL;
}
for (i = 0; i < AST_VECTOR_SIZE(&device_snapshot->additional_devices); i++) {
struct ast_extension_state_device_state_info *source_info = AST_VECTOR_GET(&device_snapshot->additional_devices, i);
struct ast_device_state_info *obj;
obj = ao2_alloc_options(sizeof(*obj) + strlen(source_info->device) + 1, device_state_info_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!obj) {
ao2_ref(device_state_info, -1);
return NULL;
}
obj->device_state = source_info->state;
strcpy(obj->device_name, source_info->device); /* Safe */
obj->causing_channel = ast_extension_state_get_device_causing_channel(source_info->device, source_info->state);
ao2_link(device_state_info, obj);
ao2_ref(obj, -1);
}
return device_state_info;
}
int ast_extension_state_extended(struct ast_channel *c, const char *context, const char *exten,
struct ao2_container **device_state_info)
{
struct ast_extension_state_device_snapshot *device_snapshot;
enum ast_extension_states device_state;
device_snapshot = ast_extension_state_get_latest_device_snapshot(c, exten, context);
if (!device_snapshot) {
return -1;
}
device_state = device_snapshot->state;
/* The caller wants device state info, so allocate a container and populate it */
if (device_state_info) {
*device_state_info = extension_state_legacy_create_device_state_info(device_snapshot);
}
ao2_ref(device_snapshot, -1);
return device_state;
}
/*!
* \internal
* \brief Callback for subscription state change, used for reference handling.
*
* \param userdata The user data passed to the subscription.
* \param sub The subscription that received the message.
* \param msg The message received by the subscription.
*/
static void extension_state_legacy_subscription_change_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg)
{
if (stasis_subscription_final_message(sub, msg)) {
ao2_cleanup(userdata);
}
}
/*!
* \internal
* \brief Destructor for \ref extension_state_legacy_state_cb.
*
* \param obj The extension state legacy state callback object to destroy.
*/
static void extension_state_legacy_state_cb_destroy(void *obj)
{
struct extension_state_legacy_state_cb *cb = obj;
if (cb->destroy_cb) {
cb->destroy_cb(cb->id, cb->data);
}
}
/*!
* \internal
* \brief Callback for extension state updates.
*
* \param userdata The user data passed to the subscription.
* \param sub The subscription that received the message.
* \param msg The message received by the subscription.
*/
static void extension_state_legacy_update_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg)
{
struct extension_state_legacy_state_cb *cb = userdata;
struct ast_extension_state_update_message *update_message = stasis_message_data(msg);
struct ast_state_cb_info info = {
.exten_state = update_message->new_device_snapshot->state,
.presence_state = update_message->new_presence_snapshot->presence_state,
.presence_subtype = S_OR(update_message->new_presence_snapshot->presence_subtype, ""),
.presence_message = S_OR(update_message->new_presence_snapshot->presence_message, ""),
};
/* If the presence has changed, notify the callback */
if (update_message->new_presence_snapshot != update_message->old_presence_snapshot) {
info.reason = AST_HINT_UPDATE_PRESENCE;
cb->change_cb(update_message->context, update_message->extension, &info, cb->data);
}
/* If the device state has changed, notify the callback */
if (update_message->new_device_snapshot != update_message->old_device_snapshot) {
info.reason = AST_HINT_UPDATE_DEVICE;
/* If they want extended information we need to provide the channels */
if (cb->extended) {
info.device_state_info = extension_state_legacy_create_device_state_info(update_message->new_device_snapshot);
}
cb->change_cb(update_message->context, update_message->extension, &info, cb->data);
ao2_cleanup(info.device_state_info);
}
}
/*!
* \internal
* \brief Callback for extension state removal updates.
*
* \param userdata The user data passed to the subscription.
* \param sub The subscription that received the message.
* \param msg The message received by the subscription.
*/
static void extension_state_legacy_remove_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg)
{
struct extension_state_legacy_state_cb *cb = userdata;
struct ast_extension_state_remove_message *remove_message = stasis_message_data(msg);
struct ast_state_cb_info info = {
.reason = AST_HINT_UPDATE_DEVICE,
.exten_state = AST_EXTENSION_REMOVED,
};
cb->change_cb(remove_message->context, remove_message->extension, &info, cb->data);
}
/*!
* \internal
* \brief Add a legacy extension state callback.
*
* \param context The context to monitor.
* \param exten The extension to monitor.
* \param change_cb The callback to call when the extension state changes.
* \param destroy_cb The callback to call when the callback is removed.
* \param data The data to pass to the callback.
* \param extended Whether to include extended information in the callback.
* \return The ID of the callback, or -1 on failure.
*/
static int extension_state_legacy_add_destroy(const char *context, const char *exten,
ast_state_cb_type change_cb, ast_state_cb_destroy_type destroy_cb, void *data, int extended)
{
struct extension_state_legacy_state_cb *state_cb;
int id;
state_cb = ao2_alloc_options(sizeof(*state_cb), extension_state_legacy_state_cb_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!state_cb) {
return -1;
}
state_cb->change_cb = change_cb;
state_cb->destroy_cb = destroy_cb;
state_cb->data = data;
state_cb->extended = extended;
ast_mutex_lock(&extension_state_legacy_callbacks_lock);
/*
* Callbacks for both per-extension and all are stored in a single vector which may have gaps in it.
* When adding a new callback, we look for the first gap in the vector and insert the callback there.
* If there are no gaps, we append it to the end of the vector.
* For per-extension the ID of a callback is its index in the vector + 1, since 0 is reserved for "all" callbacks.
*/
for (id = 0; id < AST_VECTOR_SIZE(&extension_state_legacy_callbacks); id++) {
if (AST_VECTOR_GET(&extension_state_legacy_callbacks, id)) {
continue;
}
state_cb->id = id + 1;
/* This can't fail since the vector would have already resized */
AST_VECTOR_REPLACE(&extension_state_legacy_callbacks, id, state_cb);
break;
}
if (!state_cb->id) {
/* The vector will resize when we append, which can fail, so handle it */
if (AST_VECTOR_APPEND(&extension_state_legacy_callbacks, state_cb)) {
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
ao2_ref(state_cb, -1);
return -1;
}
state_cb->id = AST_VECTOR_SIZE(&extension_state_legacy_callbacks);
}
/* At this point it is guaranteed that the callback has been inserted so we can setup
* the message router to accept messages from the appropriate topic and translate into
* the legacy callback.
*/
if (!context && !exten) {
/*
* The all topic will receive all extension state updates which can end up being quite
* a lot, so we use a dedicated thread for each legacy callback to ensure that the
* pool of stasis threads does not become overloaded.
*/
state_cb->router = stasis_message_router_create(ast_extension_state_topic_all());
} else {
struct stasis_topic *topic = ast_extension_state_topic(exten, context);
/*
* Per-extension on the other hand will have comparatively few extension state updates
* so we use the pool for it instead. Additionally the creation of the message router will
* fail if topic is NULL, so we don't do an explicit check and just let it try.
*/
state_cb->router = stasis_message_router_create_pool(topic);
ao2_cleanup(topic);
}
/* If there is no message router allocated this callback is useless, so bail */
if (!state_cb->router) {
AST_VECTOR_REPLACE(&extension_state_legacy_callbacks, state_cb->id - 1, NULL);
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
ao2_ref(state_cb, -1);
return -1;
}
/*
* Each of the message router callbacks translates the extension state messages into
* the legacy callback format and then calls the legacy callback with the appropriate data.
*/
stasis_message_router_add(state_cb->router, stasis_subscription_change_type(),
extension_state_legacy_subscription_change_cb, ao2_bump(state_cb));
stasis_message_router_add(state_cb->router, ast_extension_state_update_message_type(),
extension_state_legacy_update_cb, state_cb);
stasis_message_router_add(state_cb->router, ast_extension_state_remove_message_type(),
extension_state_legacy_remove_cb, state_cb);
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
/*
* We don't hold a reference directly but the vector does and since we haven't given the ID back
* there's no way for the caller to remove it, thus it has to be valid even now.
*/
return (!context && !exten) ? 0 : state_cb->id;
}
int ast_extension_state_add_destroy(const char *context, const char *exten,
ast_state_cb_type change_cb, ast_state_cb_destroy_type destroy_cb, void *data)
{
return extension_state_legacy_add_destroy(context, exten, change_cb, destroy_cb, data, 0);
}
int ast_extension_state_add(const char *context, const char *exten,
ast_state_cb_type change_cb, void *data)
{
return extension_state_legacy_add_destroy(context, exten, change_cb, NULL, data, 0);
}
int ast_extension_state_add_destroy_extended(const char *context, const char *exten,
ast_state_cb_type change_cb, ast_state_cb_destroy_type destroy_cb, void *data)
{
return extension_state_legacy_add_destroy(context, exten, change_cb, destroy_cb, data, 1);
}
int ast_extension_state_add_extended(const char *context, const char *exten,
ast_state_cb_type change_cb, void *data)
{
return extension_state_legacy_add_destroy(context, exten, change_cb, NULL, data, 1);
}
int ast_extension_state_del(int id, ast_state_cb_type change_cb)
{
struct extension_state_legacy_state_cb *cb;
/* A negative id is considered invalid */
if (id < 0) {
return -1;
}
if (!id) { /* id == 0 is a callback without extension */
if (!change_cb) {
return -1;
}
/*
* Global callbacks all have the ID of 0 so we need to find the actual index
* for them in the vector for removal based on callback.
*/
ast_mutex_lock(&extension_state_legacy_callbacks_lock);
for (id = 0; id < AST_VECTOR_SIZE(&extension_state_legacy_callbacks); id++) {
cb = AST_VECTOR_GET(&extension_state_legacy_callbacks, id);
if (cb && cb->change_cb == change_cb) {
AST_VECTOR_REPLACE(&extension_state_legacy_callbacks, id, NULL);
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
stasis_message_router_unsubscribe(cb->router);
ao2_ref(cb, -1);
return 0;
}
}
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
return -1;
}
ast_mutex_lock(&extension_state_legacy_callbacks_lock);
if (id <= AST_VECTOR_SIZE(&extension_state_legacy_callbacks)) {
cb = AST_VECTOR_GET(&extension_state_legacy_callbacks, id - 1);
if (cb) {
AST_VECTOR_REPLACE(&extension_state_legacy_callbacks, id - 1, NULL);
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
stasis_message_router_unsubscribe(cb->router);
ao2_ref(cb, -1);
return 0;
}
}
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
return -1;
}
/*!
* \internal
* \brief Clean up the legacy extension state system, called at shutdown.
*
* This function unregisters all legacy extension state callbacks and cleans up
* the associated resources.
*/
static void extension_state_legacy_cleanup(void)
{
int i;
ast_mutex_lock(&extension_state_legacy_callbacks_lock);
for (i = 0; i < AST_VECTOR_SIZE(&extension_state_legacy_callbacks); i++) {
struct extension_state_legacy_state_cb *cb = AST_VECTOR_GET(&extension_state_legacy_callbacks, i);
if (!cb) {
continue;
}
AST_VECTOR_REPLACE(&extension_state_legacy_callbacks, i, NULL);
stasis_message_router_unsubscribe_and_join(cb->router);
ao2_ref(cb, -1);
}
ast_mutex_unlock(&extension_state_legacy_callbacks_lock);
AST_VECTOR_FREE(&extension_state_legacy_callbacks);
}
int ast_extension_state_legacy_init(void)
{
/* Since we're not pre-allocating for any callbacks this can't fail */
AST_VECTOR_INIT(&extension_state_legacy_callbacks, 0);
ast_register_cleanup(extension_state_legacy_cleanup);
return 0;
}
+127 -2144
View File
File diff suppressed because it is too large Load Diff
+8
View File
@@ -65,6 +65,14 @@ struct ast_switch *pbx_findswitch(const char *sw);
/*! pbx_app.c functions needed by pbx.c */
const char *app_name(struct ast_app *app);
/*! extension_state.c functions needed by pbx.c */
void pbx_extension_state_hint_set(struct ast_exten *exten, struct ast_context *context);
void pbx_extension_state_hint_remove(struct ast_exten *exten, struct ast_context *context);
/*! extension_state_autohints.c functions needed by pbx.c */
void pbx_extension_state_autohint_set(struct ast_context *context);
void pbx_extension_state_autohint_remove(struct ast_context *context, unsigned int forced);
#define VAR_BUF_SIZE 4096
#endif /* _PBX_PRIVATE_H */
+11
View File
@@ -1028,6 +1028,17 @@ struct stasis_subscription *__stasis_subscribe_pool(
return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
}
struct stasis_subscription *__stasis_subscribe_synchronous(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
const char *file,
int lineno,
const char *func)
{
return internal_stasis_subscribe(topic, callback, data, 0, 0, file, lineno, func);
}
static int sub_cleanup(void *data)
{
struct stasis_subscription *sub = data;
+10
View File
@@ -1699,6 +1699,11 @@ static int pbx_load_config(const char *config_file)
const char *newpm, *ovsw;
struct ast_flags config_flags = { 0 };
char lastextension[256];
struct timeval begin_time;
double parsing_time;
begin_time = ast_tvnow();
cfg = ast_config_load(config_file, config_flags);
if (!cfg || cfg == CONFIG_STATUS_FILEINVALID)
return 0;
@@ -1969,6 +1974,11 @@ process_extension:
}
}
ast_config_destroy(cfg);
parsing_time = ast_tvdiff_us(ast_tvnow(), begin_time);
parsing_time /= 1000000.0;
ast_verb(5, "Time to parse %s dialplan configuration: %8.6f sec\n", config, parsing_time);
return 1;
}