Merge "res_pjsip.c: Split ast_sip_push_task_synchronous() to fit expectations."

This commit is contained in:
George Joseph
2018-04-16 11:12:30 -05:00
committed by Gerrit Code Review
13 changed files with 185 additions and 71 deletions

View File

@@ -718,7 +718,7 @@ static int chan_pjsip_answer(struct ast_channel *ast)
can occur between this thread and bridging (specifically when native bridging
attempts to do direct media) */
ast_channel_unlock(ast);
res = ast_sip_push_task_synchronous(session->serializer, answer, session);
res = ast_sip_push_task_wait_serializer(session->serializer, answer, session);
if (res) {
if (res == -1) {
ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n",
@@ -2502,10 +2502,10 @@ static struct ast_channel *chan_pjsip_request_with_stream_topology(const char *t
req_data.topology = topology;
req_data.dest = data;
/* Default failure value in case ast_sip_push_task_synchronous() itself fails. */
/* Default failure value in case ast_sip_push_task_wait_servant() itself fails. */
req_data.cause = AST_CAUSE_FAILURE;
if (ast_sip_push_task_synchronous(NULL, request, &req_data)) {
if (ast_sip_push_task_wait_servant(NULL, request, &req_data)) {
*cause = req_data.cause;
return NULL;
}

View File

@@ -897,7 +897,7 @@ int pjsip_acf_channel_read(struct ast_channel *chan, const char *cmd, char *data
func_args.field = args.field;
func_args.buf = buf;
func_args.len = len;
if (ast_sip_push_task_synchronous(func_args.session->serializer, read_pjsip, &func_args)) {
if (ast_sip_push_task_wait_serializer(func_args.session->serializer, read_pjsip, &func_args)) {
ast_log(LOG_WARNING, "Unable to read properties of channel %s: failed to push task\n", ast_channel_name(chan));
ao2_ref(func_args.session, -1);
return -1;
@@ -1219,7 +1219,7 @@ int pjsip_acf_media_offer_write(struct ast_channel *chan, const char *cmd, char
mdata.media_type = AST_MEDIA_TYPE_VIDEO;
}
return ast_sip_push_task_synchronous(channel->session->serializer, media_offer_write_av, &mdata);
return ast_sip_push_task_wait_serializer(channel->session->serializer, media_offer_write_av, &mdata);
}
int pjsip_acf_dtmf_mode_read(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
@@ -1390,7 +1390,7 @@ int pjsip_acf_dtmf_mode_write(struct ast_channel *chan, const char *cmd, char *d
ast_channel_unlock(chan);
return ast_sip_push_task_synchronous(channel->session->serializer, dtmf_mode_refresh_cb, &rdata);
return ast_sip_push_task_wait_serializer(channel->session->serializer, dtmf_mode_refresh_cb, &rdata);
}
static int refresh_write_cb(void *obj)
@@ -1438,5 +1438,5 @@ int pjsip_acf_session_refresh_write(struct ast_channel *chan, const char *cmd, c
rdata.method = AST_SIP_SESSION_REFRESH_METHOD_UPDATE;
}
return ast_sip_push_task_synchronous(channel->session->serializer, refresh_write_cb, &rdata);
return ast_sip_push_task_wait_serializer(channel->session->serializer, refresh_write_cb, &rdata);
}

View File

@@ -1543,27 +1543,91 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg);
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
/*!
* \brief Push a task to SIP servants and wait for it to complete
* \brief Push a task to SIP servants and wait for it to complete.
*
* Like \ref ast_sip_push_task except that it blocks until the task completes.
* Like \ref ast_sip_push_task except that it blocks until the task
* completes. If the current thread is a SIP servant thread then the
* task executes immediately. Otherwise, the specified serializer
* executes the task and the current thread waits for it to complete.
*
* \warning \b Never use this function in a SIP servant thread. This can potentially
* cause a deadlock. If you are in a SIP servant thread, just call your function
* in-line.
* \note PJPROJECT callbacks tend to have locks already held when
* called.
*
* \warning \b Never hold locks that may be acquired by a SIP servant thread when
* calling this function. Doing so may cause a deadlock if all SIP servant threads
* are blocked waiting to acquire the lock while the thread holding the lock is
* waiting for a free SIP servant thread.
* \warning \b Never hold locks that may be acquired by a SIP servant
* thread when calling this function. Doing so may cause a deadlock
* if all SIP servant threads are blocked waiting to acquire the lock
* while the thread holding the lock is waiting for a free SIP servant
* thread.
*
* \param serializer The SIP serializer to which the task belongs. May be NULL.
* \warning \b Use of this function in an ao2 destructor callback is a
* bad idea. You don't have control over which thread executes the
* destructor. Attempting to shift execution to another thread with
* this function is likely to cause deadlock.
*
* \param serializer The SIP serializer to execute the task if the
* current thread is not a SIP servant. NULL if any of the default
* serializers can be used.
* \param sip_task The task to execute
* \param task_data The parameter to pass to the task when it executes
* \retval 0 Success
* \retval -1 Failure
*
* \note The sip_task() return value may need to be distinguished from
* the failure to push the task.
*
* \return sip_task() return value on success.
* \retval -1 Failure to push the task.
*/
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
/*!
* \brief Push a task to SIP servants and wait for it to complete.
* \deprecated Replaced with ast_sip_push_task_wait_servant().
*/
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
/*!
* \brief Push a task to the serializer and wait for it to complete.
*
* Like \ref ast_sip_push_task except that it blocks until the task is
* completed by the specified serializer. If the specified serializer
* is the current thread then the task executes immediately.
*
* \note PJPROJECT callbacks tend to have locks already held when
* called.
*
* \warning \b Never hold locks that may be acquired by a SIP servant
* thread when calling this function. Doing so may cause a deadlock
* if all SIP servant threads are blocked waiting to acquire the lock
* while the thread holding the lock is waiting for a free SIP servant
* thread for the serializer to execute in.
*
* \warning \b Never hold locks that may be acquired by the serializer
* when calling this function. Doing so will cause a deadlock.
*
* \warning \b Never use this function in the pjsip monitor thread (It
* is a SIP servant thread). This is likely to cause a deadlock.
*
* \warning \b Use of this function in an ao2 destructor callback is a
* bad idea. You don't have control over which thread executes the
* destructor. Attempting to shift execution to another thread with
* this function is likely to cause deadlock.
*
* \param serializer The SIP serializer to execute the task. NULL if
* any of the default serializers can be used.
* \param sip_task The task to execute
* \param task_data The parameter to pass to the task when it executes
*
* \note It is generally better to call
* ast_sip_push_task_wait_servant() if you pass NULL for the
* serializer parameter.
*
* \note The sip_task() return value may need to be distinguished from
* the failure to push the task.
*
* \return sip_task() return value on success.
* \retval -1 Failure to push the task.
*/
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
/*!
* \brief Determine if the current thread is a SIP servant thread
*

View File

@@ -2743,7 +2743,7 @@ static int register_service(void *data)
int ast_sip_register_service(pjsip_module *module)
{
return ast_sip_push_task_synchronous(NULL, register_service, &module);
return ast_sip_push_task_wait_servant(NULL, register_service, &module);
}
static int unregister_service(void *data)
@@ -2759,7 +2759,7 @@ static int unregister_service(void *data)
void ast_sip_unregister_service(pjsip_module *module)
{
ast_sip_push_task_synchronous(NULL, unregister_service, &module);
ast_sip_push_task_wait_servant(NULL, unregister_service, &module);
}
static struct ast_sip_authenticator *registered_authenticator;
@@ -3009,7 +3009,7 @@ static char *cli_dump_endpt(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SHOWUSAGE;
}
ast_sip_push_task_synchronous(NULL, do_cli_dump_endpt, a);
ast_sip_push_task_wait_servant(NULL, do_cli_dump_endpt, a);
return CLI_SUCCESS;
}
@@ -4484,21 +4484,30 @@ static int serializer_pool_setup(void)
return 0;
}
static struct ast_taskprocessor *serializer_pool_pick(void)
{
struct ast_taskprocessor *serializer;
unsigned int pos;
/*
* Pick a serializer to use from the pool.
*
* Note: We don't care about any reentrancy behavior
* when incrementing serializer_pool_pos. If it gets
* incorrectly incremented it doesn't matter.
*/
pos = serializer_pool_pos++;
pos %= SERIALIZER_POOL_SIZE;
serializer = serializer_pool[pos];
return serializer;
}
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (!serializer) {
unsigned int pos;
/*
* Pick a serializer to use from the pool.
*
* Note: We don't care about any reentrancy behavior
* when incrementing serializer_pool_pos. If it gets
* incorrectly incremented it doesn't matter.
*/
pos = serializer_pool_pos++;
pos %= SERIALIZER_POOL_SIZE;
serializer = serializer_pool[pos];
serializer = serializer_pool_pick();
}
return ast_taskprocessor_push(serializer, sip_task, task_data);
@@ -4522,9 +4531,8 @@ static int sync_task(void *data)
/*
* Once we unlock std->lock after signaling, we cannot access
* std again. The thread waiting within
* ast_sip_push_task_synchronous() is free to continue and
* release its local variable (std).
* std again. The thread waiting within ast_sip_push_task_wait()
* is free to continue and release its local variable (std).
*/
ast_mutex_lock(&std->lock);
std->complete = 1;
@@ -4534,15 +4542,11 @@ static int sync_task(void *data)
return ret;
}
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
/* This method is an onion */
struct sync_task_data std;
if (ast_sip_thread_is_servant()) {
return sip_task(task_data);
}
memset(&std, 0, sizeof(std));
ast_mutex_init(&std.lock);
ast_cond_init(&std.cond, NULL);
@@ -4566,6 +4570,42 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
return std.fail;
}
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (ast_sip_thread_is_servant()) {
return sip_task(task_data);
}
return ast_sip_push_task_wait(serializer, sip_task, task_data);
}
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
return ast_sip_push_task_wait_servant(serializer, sip_task, task_data);
}
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (!serializer) {
/* Caller doesn't care which PJSIP serializer the task executes under. */
serializer = serializer_pool_pick();
if (!serializer) {
/* No serializer picked to execute the task */
return -1;
}
}
if (ast_taskprocessor_is_task(serializer)) {
/*
* We are the requested serializer so we must execute
* the task now or deadlock waiting on ourself to
* execute it.
*/
return sip_task(task_data);
}
return ast_sip_push_task_wait(serializer, sip_task, task_data);
}
void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)
{
size_t chars_to_copy = MIN(size - 1, pj_strlen(src));
@@ -5191,7 +5231,7 @@ static int reload_module(void)
* We must wait for the reload to complete so multiple
* reloads cannot happen at the same time.
*/
if (ast_sip_push_task_synchronous(NULL, reload_configuration_task, NULL)) {
if (ast_sip_push_task_wait_servant(NULL, reload_configuration_task, NULL)) {
ast_log(LOG_WARNING, "Failed to reload PJSIP\n");
return -1;
}
@@ -5208,7 +5248,7 @@ static int unload_module(void)
/* The thread this is called from cannot call PJSIP/PJLIB functions,
* so we have to push the work to the threadpool to handle
*/
ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL);
ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL);
ast_sip_destroy_scheduler();
serializer_pool_shutdown();
ast_threadpool_shutdown(sip_threadpool);

View File

@@ -282,5 +282,5 @@ static int system_create_resolver_and_set_nameservers(void *data)
void ast_sip_initialize_dns(void)
{
ast_sip_push_task_synchronous(NULL, system_create_resolver_and_set_nameservers, NULL);
ast_sip_push_task_wait_servant(NULL, system_create_resolver_and_set_nameservers, NULL);
}

View File

@@ -267,7 +267,7 @@ static void sip_transport_state_destroy(void *obj)
{
struct ast_sip_transport_state *state = obj;
ast_sip_push_task_synchronous(NULL, destroy_sip_transport_state, state);
ast_sip_push_task_wait_servant(NULL, destroy_sip_transport_state, state);
}
/*! \brief Destructor for ast_sip_transport state information */

View File

@@ -153,7 +153,7 @@ static const struct ast_datastore_info header_datastore = {
.type = "header_datastore",
};
/*! \brief Data structure used for ast_sip_push_task_synchronous */
/*! \brief Data structure used for ast_sip_push_task_wait_serializer */
struct header_data {
struct ast_sip_channel_pvt *channel;
char *header_name;
@@ -480,11 +480,11 @@ static int func_read_header(struct ast_channel *chan, const char *function, char
header_data.len = len;
if (!strcasecmp(args.action, "read")) {
return ast_sip_push_task_synchronous(channel->session->serializer, read_header,
&header_data);
return ast_sip_push_task_wait_serializer(channel->session->serializer,
read_header, &header_data);
} else if (!strcasecmp(args.action, "remove")) {
return ast_sip_push_task_synchronous(channel->session->serializer, remove_header,
&header_data);
return ast_sip_push_task_wait_serializer(channel->session->serializer,
remove_header, &header_data);
} else {
ast_log(AST_LOG_ERROR,
"Unknown action '%s' is not valid, must be 'read' or 'remove'.\n",
@@ -539,14 +539,14 @@ static int func_write_header(struct ast_channel *chan, const char *cmd, char *da
header_data.len = 0;
if (!strcasecmp(args.action, "add")) {
return ast_sip_push_task_synchronous(channel->session->serializer, add_header,
&header_data);
return ast_sip_push_task_wait_serializer(channel->session->serializer,
add_header, &header_data);
} else if (!strcasecmp(args.action, "update")) {
return ast_sip_push_task_synchronous(channel->session->serializer, update_header,
&header_data);
return ast_sip_push_task_wait_serializer(channel->session->serializer,
update_header, &header_data);
} else if (!strcasecmp(args.action, "remove")) {
return ast_sip_push_task_synchronous(channel->session->serializer, remove_header,
&header_data);
return ast_sip_push_task_wait_serializer(channel->session->serializer,
remove_header, &header_data);
} else {
ast_log(AST_LOG_ERROR,
"Unknown action '%s' is not valid, must be 'add', 'update', or 'remove'.\n",

View File

@@ -1385,7 +1385,7 @@ static int unload_module(void)
ast_cli_unregister_multiple(cli_pjsip, ARRAY_LEN(cli_pjsip));
ast_sip_unregister_service(&logging_module);
ast_sip_push_task_synchronous(NULL, clear_history_entries, NULL);
ast_sip_push_task_wait_servant(NULL, clear_history_entries, NULL);
AST_VECTOR_FREE(&vector_history);
ast_pjproject_caching_pool_destroy(&cachingpool);

View File

@@ -1070,7 +1070,7 @@ static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
return NULL;
}
if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
if (ast_sip_push_task_wait_servant(NULL, sip_outbound_publisher_init, publisher)) {
ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
ao2_ref(publisher, -1);
@@ -1514,8 +1514,8 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish,
*/
old_publish = current_state->client->publish;
current_state->client->publish = publish;
if (ast_sip_push_task_synchronous(
NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
if (ast_sip_push_task_wait_servant(NULL, sip_outbound_publisher_reinit_all,
current_state->client->publishers)) {
/*
* If the state object fails to re-initialize then swap
* the old publish info back in.

View File

@@ -1480,7 +1480,7 @@ static int sip_outbound_registration_apply(const struct ast_sorcery *sorcery, vo
return -1;
}
if (ast_sip_push_task_synchronous(new_state->client_state->serializer,
if (ast_sip_push_task_wait_serializer(new_state->client_state->serializer,
sip_outbound_registration_regc_alloc, new_state)) {
return -1;
}
@@ -1850,8 +1850,7 @@ static int ami_outbound_registration_detail(void *obj, void *arg, int flags)
struct sip_ami_outbound *ami = arg;
ami->registration = obj;
return ast_sip_push_task_synchronous(
NULL, ami_outbound_registration_task, ami);
return ast_sip_push_task_wait_servant(NULL, ami_outbound_registration_task, ami);
}
static int ami_show_outbound_registrations(struct mansession *s,

View File

@@ -1318,7 +1318,8 @@ static void subscription_tree_destructor(void *obj)
destroy_subscriptions(sub_tree->root);
if (sub_tree->dlg) {
ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
ast_sip_push_task_wait_servant(sub_tree->serializer,
subscription_unreference_dialog, sub_tree);
}
ao2_cleanup(sub_tree->endpoint);
@@ -1665,7 +1666,8 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags)
}
recreate_data.persistence = persistence;
recreate_data.rdata = &rdata;
if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
if (ast_sip_push_task_wait_serializer(serializer, sub_persistence_recreate,
&recreate_data)) {
ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
persistence->endpoint);
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);

View File

@@ -316,7 +316,15 @@ static void refer_progress_on_evsub_state(pjsip_evsub *sub, pjsip_event *event)
/* It's possible that a task is waiting to remove us already, so bump the refcount of progress so it doesn't get destroyed */
ao2_ref(progress, +1);
pjsip_dlg_dec_lock(progress->dlg);
ast_sip_push_task_synchronous(progress->serializer, refer_progress_terminate, progress);
/*
* XXX We are always going to execute this inline rather than
* in the serializer because this function is a PJPROJECT
* callback and thus has to be a SIP servant thread.
*
* The likely remedy is to push most of this function into
* refer_progress_terminate() with ast_sip_push_task().
*/
ast_sip_push_task_wait_servant(progress->serializer, refer_progress_terminate, progress);
pjsip_dlg_inc_lock(progress->dlg);
ao2_ref(progress, -1);
@@ -960,7 +968,8 @@ static int refer_incoming_invite_request(struct ast_sip_session *session, struct
invite.session = other_session;
if (ast_sip_push_task_synchronous(other_session->serializer, invite_replaces, &invite)) {
if (ast_sip_push_task_wait_serializer(other_session->serializer, invite_replaces,
&invite)) {
response = 481;
goto inv_replace_failed;
}

View File

@@ -377,7 +377,7 @@ static void websocket_cb(struct ast_websocket *session, struct ast_variable *par
create_data.ws_session = session;
if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) {
if (ast_sip_push_task_wait_serializer(serializer, transport_create, &create_data)) {
ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
ast_taskprocessor_unreference(serializer);
ast_websocket_unref(session);
@@ -396,13 +396,13 @@ static void websocket_cb(struct ast_websocket *session, struct ast_variable *par
}
if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
ast_sip_push_task_synchronous(serializer, transport_read, &read_data);
ast_sip_push_task_wait_serializer(serializer, transport_read, &read_data);
} else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
break;
}
}
ast_sip_push_task_synchronous(serializer, transport_shutdown, transport);
ast_sip_push_task_wait_serializer(serializer, transport_shutdown, transport);
ast_taskprocessor_unreference(serializer);
ast_websocket_unref(session);