Addition of scheduler engine and a few applications to use it.

This patch adds a scheduler thread to the core and moves the heartbeat
event to use the new scheduler as an example.

Also The following features are implemented that use this scheduler:

sched_hangup dialplan application:

<action application="sched_hangup" data="+10 normal_clearing bleg"/>

** The cause code is optional and the optional bleg keyword will only hangup the
   channel the current channel is bridged to if the call is in a bridge.

sched_transfer dialplan application:

<action application="sched_transfer" data="+10 1000 XML default"/>

** The last 2 args (dialplan and context) are optional

sched_broadcast dialplan application:

<action application="sched_broadcast" data="+10 playback:/tmp/foo.wav"/>
<action application="sched_broadcast" data="+10 playback!normal_clearing:/tmp/foo.wav"/>

** The optional !<cause_code> can be added to make the channel hangup after broadcasting the file.


sched_hangup api function:

sched_hangup +10 <uuid_string> normal_clearing

** The cause code is optional

sched_transfer api function:

sched_transfer +10 <uuid_string> 1000 XML default

** The last 2 args (dialplan and context) are optional

sched_broadcast api function:

sched_broadcast +10 <uuid_str> playback:/tmp/foo.wav
sched_broadcast +10 <uuid_str> playback!normal_clearing:/tmp/foo.wav

** The optional !<cause_code> can be added to make the channel hangup after broadcasting the file.

The new C functions in the core are documented in the doxeygen.

*NOTE* This commit should satisfy at least 2 bounties on the wiki



git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@4785 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2007-03-28 23:37:12 +00:00
parent 24b1a46ceb
commit 8a4406ece2
9 changed files with 855 additions and 51 deletions

View File

@ -104,17 +104,68 @@ struct switch_core_session;
struct switch_core_runtime;
struct switch_core_port_allocator;
struct switch_core_scheduler_task {
time_t created;
time_t runtime;
uint32_t cmd_id;
char *group;
void *cmd_arg;
uint32_t task_id;
};
/*!
\defgroup core1 Core Library
\ingroup FREESWITCH
\{
*/
///\defgroup mb1 Media Bugs
///\ingroup core1
///\{
///\defgroup sched1 Scheduler
///\ingroup core1
///\{
/*!
\brief Schedule a task in the future
\param runtime the time in epoch seconds to execute the task.
\param func the callback function to execute when the task is executed.
\param desc an arbitrary description of the task.
\param group a group id tag to link multiple tasks to a single entity.
\param cmd_id an arbitrary index number be used in the callback.
\param cmd_arg user data to be passed to the callback.
\param flags flags to alter behaviour
\return the id of the task
*/
SWITCH_DECLARE(uint32_t) switch_core_scheduler_add_task(time_t task_runtime,
switch_core_scheduler_func_t func,
char *desc,
char *group,
uint32_t cmd_id,
void *cmd_arg,
switch_scheduler_flag_t flags);
/*!
\brief Delete a scheduled task
\param task_id the id of the task
\return SWITCH_STATUS_SUCCESS if the task was deleted.
*/
SWITCH_DECLARE(switch_status_t) switch_core_scheduler_del_task_id(uint32_t task_id);
/*!
\brief Delete a scheduled task based on the group name
\param group the group name
\return SWITCH_STATUS_SUCCESS if any tasks were deleted
*/
SWITCH_DECLARE(switch_status_t) switch_core_scheduler_del_task_group(char *group);
///\}
/*!
\brief Add a media bug to the session
\param session the session to add the bug to

View File

@ -331,6 +331,28 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_signal_bridge(switch_core_session_t *
*/
SWITCH_DECLARE(switch_status_t) switch_ivr_session_transfer(switch_core_session_t *session, char *extension, char *dialplan, char *context);
/*!
\brief Transfer an existing session to another location in the future
\param runtime the time (int epoch seconds) to transfer the call
\param uuid the uuid of the session to transfer
\param extension the new extension
\param dialplan the new dialplan (OPTIONAL, may be NULL)
\param context the new context (OPTIONAL, may be NULL)
\return the id of the task
*/
SWITCH_DECLARE(uint32_t) switch_ivr_schedule_transfer(time_t runtime, char *uuid, char *extension, char *dialplan, char *context);
/*!
\brief Hangup an existing session in the future
\param runtime the time (int epoch seconds) to transfer the call
\param uuid the uuid of the session to hangup
\param cause the hanup cause code
\param bleg hangup up the B-Leg if possible
\return the id of the task
*/
SWITCH_DECLARE(uint32_t) switch_ivr_schedule_hangup(time_t runtime, char *uuid, switch_call_cause_t cause, switch_bool_t bleg);
/*!
\brief Bridge two existing sessions
\param originator_uuid the uuid of the originator
@ -383,6 +405,16 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_hold(switch_core_session_t *session);
*/
SWITCH_DECLARE(switch_status_t) switch_ivr_unhold(switch_core_session_t *session);
/*!
\brief Signal the session to broadcast audio in the future
\param runtime when (in epoch time) to run the broadcast
\param uuid the uuid of the session to broadcast on
\param path the path data of the broadcast "/path/to/file.wav [<timer name>]" or "speak:<engine>|<voice>|<Text to say>"
\param flags flags to send to the request (SMF_ECHO_BRIDGED to send the broadcast to both sides of the call)
\return the id of the task
*/
SWITCH_DECLARE(uint32_t) switch_ivr_schedule_broadcast(time_t runtime, char *uuid, char *path, switch_media_flag_t flags);
/*!
\brief Signal the session to broadcast audio
\param uuid the uuid of the session to broadcast on

View File

@ -167,6 +167,12 @@ typedef enum {
SMA_SET
} switch_management_action_t;
typedef enum {
SSHF_NONE = 0,
SSHF_OWN_THREAD = (1 << 0),
SSHF_FREE_ARG = (1 << 1)
} switch_scheduler_flag_t;
typedef enum {
SMF_NONE = 0,
SMF_REBRIDGE = (1 << 0),
@ -910,7 +916,8 @@ typedef enum {
SWITCH_CAUSE_LOSE_RACE = 502,
SWITCH_CAUSE_MANAGER_REQUEST = 503,
SWITCH_CAUSE_BLIND_TRANSFER = 600,
SWITCH_CAUSE_ATTENDED_TRANSFER = 601
SWITCH_CAUSE_ATTENDED_TRANSFER = 601,
SWITCH_CAUSE_ALLOTTED_TIMEOUT = 602
} switch_call_cause_t;
typedef enum {
@ -979,6 +986,8 @@ typedef switch_bool_t (*switch_media_bug_callback_t)(switch_media_bug_t *, void
typedef void (*switch_application_function_t)(switch_core_session_t *, char *);
typedef void (*switch_event_callback_t)(switch_event_t *);
typedef switch_caller_extension_t *(*switch_dialplan_hunt_function_t)(switch_core_session_t *, void *);
typedef struct switch_core_scheduler_task switch_core_scheduler_task_t;
typedef void (*switch_core_scheduler_func_t)(switch_core_scheduler_task_t *task);
typedef switch_status_t (*switch_state_handler_t)(switch_core_session_t *);
typedef switch_status_t (*switch_outgoing_channel_hook_t)(switch_core_session_t *, switch_caller_profile_t *, switch_core_session_t *);
typedef switch_status_t (*switch_answer_channel_hook_t)(switch_core_session_t *);

View File

@ -51,6 +51,9 @@ static switch_api_interface_t originate_api_interface;
static switch_api_interface_t media_api_interface;
static switch_api_interface_t hold_api_interface;
static switch_api_interface_t broadcast_api_interface;
static switch_api_interface_t sched_broadcast_api_interface;
static switch_api_interface_t sched_transfer_api_interface;
static switch_api_interface_t sched_hangup_api_interface;
static switch_status_t status_function(char *cmd, switch_core_session_t *session, switch_stream_handle_t *stream)
{
@ -245,6 +248,89 @@ static switch_status_t transfer_function(char *cmd, switch_core_session_t *isess
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t sched_transfer_function(char *cmd, switch_core_session_t *isession, switch_stream_handle_t *stream)
{
switch_core_session_t *session = NULL;
char *argv[6] = {0};
int argc = 0;
if (isession) {
return SWITCH_STATUS_FALSE;
}
argc = switch_separate_string(cmd, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
if (switch_strlen_zero(cmd) || argc < 2 || argc > 5) {
stream->write_function(stream, "USAGE: %s\n", sched_transfer_api_interface.syntax);
} else {
char *uuid = argv[1];
char *dest = argv[2];
char *dp = argv[3];
char *context = argv[4];
time_t when;
if (*argv[0] == '+') {
when = time (NULL) + atol(argv[0] + 1);
} else {
when = atol(argv[0]);
}
if ((session = switch_core_session_locate(uuid))) {
switch_ivr_schedule_transfer(when, uuid, dest, dp, context);
stream->write_function(stream, "OK\n");
switch_core_session_rwunlock(session);
} else {
stream->write_function(stream, "No Such Channel!\n");
}
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t sched_hangup_function(char *cmd, switch_core_session_t *isession, switch_stream_handle_t *stream)
{
switch_core_session_t *session = NULL;
char *argv[4] = {0};
int argc = 0;
if (isession) {
return SWITCH_STATUS_FALSE;
}
argc = switch_separate_string(cmd, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
if (switch_strlen_zero(cmd) || argc < 1) {
stream->write_function(stream, "USAGE: %s\n", sched_hangup_api_interface.syntax);
} else {
char *uuid = argv[1];
char *cause_str = argv[2];
time_t when;
switch_call_cause_t cause = SWITCH_CAUSE_ALLOTTED_TIMEOUT;
if (*argv[0] == '+') {
when = time (NULL) + atol(argv[0] + 1);
} else {
when = atol(argv[0]);
}
if (cause_str) {
cause = switch_channel_str2cause(cause_str);
}
if ((session = switch_core_session_locate(uuid))) {
switch_ivr_schedule_hangup(when, uuid, cause, SWITCH_FALSE);
stream->write_function(stream, "OK\n");
switch_core_session_rwunlock(session);
} else {
stream->write_function(stream, "No Such Channel!\n");
}
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t uuid_media_function(char *cmd, switch_core_session_t *isession, switch_stream_handle_t *stream)
{
char *argv[4] = {0};
@ -313,6 +399,50 @@ static switch_status_t uuid_broadcast_function(char *cmd, switch_core_session_t
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t sched_broadcast_function(char *cmd, switch_core_session_t *isession, switch_stream_handle_t *stream)
{
char *argv[4] = {0};
int argc = 0;
switch_status_t status = SWITCH_STATUS_FALSE;
if (isession) {
return status;
}
argc = switch_separate_string(cmd, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
if (switch_strlen_zero(cmd) || argc < 3) {
stream->write_function(stream, "USAGE: %s\n", sched_broadcast_api_interface.syntax);
} else {
switch_media_flag_t flags = SMF_NONE;
time_t when;
if (*argv[0] == '+') {
when = time (NULL) + atol(argv[0] + 1);
} else {
when = atol(argv[0]);
}
if (argv[3]) {
if (!strcmp(argv[3], "both")) {
flags |= (SMF_ECHO_ALEG | SMF_ECHO_BLEG);
} else if (!strcmp(argv[3], "aleg")) {
flags |= SMF_ECHO_ALEG;
} else if (!strcmp(argv[3], "bleg")) {
flags |= SMF_ECHO_BLEG;
}
} else {
flags |= SMF_ECHO_ALEG;
}
status = switch_ivr_schedule_broadcast(when, argv[1], argv[2], flags);
stream->write_function(stream, "+OK Message Scheduled\n");
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t uuid_hold_function(char *cmd, switch_core_session_t *isession, switch_stream_handle_t *stream)
{
char *argv[4] = {0};
@ -707,12 +837,36 @@ static switch_status_t help_function(char *cmd, switch_core_session_t *session,
return SWITCH_STATUS_SUCCESS;
}
static switch_api_interface_t sched_transfer_api_interface = {
/*.interface_name */ "sched_transfer",
/*.desc */ "Schedule a broadcast event to a running call",
/*.function */ sched_transfer_function,
/*.syntax */ "[+]<time> <uuid> <extension> [<dialplan>] [<context>]",
/*.next */ NULL
};
static switch_api_interface_t sched_broadcast_api_interface = {
/*.interface_name */ "sched_broadcast",
/*.desc */ "Schedule a broadcast event to a running call",
/*.function */ sched_broadcast_function,
/*.syntax */ "[+]<time> <uuid> <path> [aleg|bleg|both]",
/*.next */ &sched_transfer_api_interface
};
static switch_api_interface_t sched_hangup_api_interface = {
/*.interface_name */ "sched_hangup",
/*.desc */ "Schedule a running call to hangup",
/*.function */ sched_hangup_function,
/*.syntax */ "[+]<time> <uuid> [<cause>]",
/*.next */ &sched_broadcast_api_interface
};
static switch_api_interface_t version_api_interface = {
/*.interface_name */ "version",
/*.desc */ "Show version of the switch",
/*.function */ version_function,
/*.syntax */ "",
/*.next */ NULL
/*.next */ &sched_hangup_api_interface
};
static switch_api_interface_t help_api_interface = {

View File

@ -96,6 +96,99 @@ static void transfer_function(switch_core_session_t *session, char *data)
}
}
static void sched_transfer_function(switch_core_session_t *session, char *data)
{
int argc;
char *argv[4] = {0};
char *mydata;
if (data && (mydata = switch_core_session_strdup(session, data))) {
if ((argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) >= 2) {
time_t when;
if (*argv[0] == '+') {
when = time (NULL) + atol(argv[0] + 1);
} else {
when = atol(argv[0]);
}
switch_ivr_schedule_transfer(when, switch_core_session_get_uuid(session), argv[1], argv[2], argv[3]);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid Args\n");
}
}
}
static void sched_hangup_function(switch_core_session_t *session, char *data)
{
int argc;
char *argv[5] = {0};
char *mydata;
if (data && (mydata = switch_core_session_strdup(session, data))) {
if ((argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) >= 1) {
time_t when;
switch_call_cause_t cause = SWITCH_CAUSE_ALLOTTED_TIMEOUT;
switch_bool_t bleg = SWITCH_FALSE;
if (*argv[0] == '+') {
when = time (NULL) + atol(argv[0] + 1);
} else {
when = atol(argv[0]);
}
if (argv[1]) {
cause = switch_channel_str2cause(argv[1]);
}
if (argv[2] && !strcasecmp(argv[2], "bleg")) {
bleg = SWITCH_TRUE;
}
switch_ivr_schedule_hangup(when, switch_core_session_get_uuid(session), cause, bleg);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No time specified.\n");
}
}
}
static void sched_broadcast_function(switch_core_session_t *session, char *data)
{
int argc;
char *argv[6] = {0};
char *mydata;
if (data && (mydata = switch_core_session_strdup(session, data))) {
if ((argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) >= 2) {
time_t when;
switch_media_flag_t flags = SMF_NONE;
if (*argv[0] == '+') {
when = time (NULL) + atol(argv[0] + 1);
} else {
when = atol(argv[0]);
}
if (argv[2]) {
if (!strcmp(argv[2], "both")) {
flags |= (SMF_ECHO_ALEG | SMF_ECHO_BLEG);
} else if (!strcmp(argv[2], "aleg")) {
flags |= SMF_ECHO_ALEG;
} else if (!strcmp(argv[2], "bleg")) {
flags |= SMF_ECHO_BLEG;
}
} else {
flags |= SMF_ECHO_ALEG;
}
switch_ivr_schedule_broadcast(when, switch_core_session_get_uuid(session), argv[1], flags);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid Args\n");
}
}
}
static void sleep_function(switch_core_session_t *session, char *data)
{
@ -509,6 +602,7 @@ static void ivr_application_function(switch_core_session_t *session, char *data)
}
}
static switch_api_interface_t strepoch_api_interface = {
/*.interface_name */ "strepoch",
/*.desc */ "Convert a date string into epoch time",
@ -541,6 +635,39 @@ static switch_api_interface_t presence_api_interface = {
/*.next */ &dptools_api_interface
};
static switch_application_interface_t sched_transfer_application_interface = {
/*.interface_name */ "sched_transfer",
/*.application_function */ sched_transfer_function,
/*.long_desc */ "Schedule a transfer in the future",
/*.short_desc */ "Schedule a transfer in the future",
/*.syntax */ "[+]<time> <extension> <dialplan> <context>",
/* flags */ SAF_SUPPORT_NOMEDIA,
/*.next */ NULL
};
static switch_application_interface_t sched_broadcast_application_interface = {
/*.interface_name */ "sched_broadcast",
/*.application_function */ sched_broadcast_function,
/*.long_desc */ "Schedule a broadcast in the future",
/*.short_desc */ "Schedule a broadcast in the future",
/*.syntax */ "[+]<time> <path> [aleg|bleg|both]",
/* flags */ SAF_SUPPORT_NOMEDIA,
/*.next */ &sched_transfer_application_interface
};
static switch_application_interface_t sched_hangup_application_interface = {
/*.interface_name */ "sched_hangup",
/*.application_function */ sched_hangup_function,
/*.long_desc */ "Schedule a hangup in the future",
/*.short_desc */ "Schedule a hangup in the future",
/*.syntax */ "[+]<time> [<cause>]",
/* flags */ SAF_SUPPORT_NOMEDIA,
/*.next */ &sched_broadcast_application_interface
};
static const switch_application_interface_t queuedtmf_application_interface = {
/*.interface_name */ "queue_dtmf",
/*.application_function */ queue_dtmf_function,
@ -548,7 +675,7 @@ static const switch_application_interface_t queuedtmf_application_interface = {
/* short_desc */ "Queue dtmf to be sent",
/* syntax */ "<dtmf_data>",
/* flags */ SAF_SUPPORT_NOMEDIA,
/*.next */ NULL
/*.next */ &sched_hangup_application_interface
};
static const switch_application_interface_t redirect_application_interface = {

View File

@ -233,6 +233,7 @@ static void tts_function(switch_core_session_t *session, char *data)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Done\n");
}
#ifdef BUGTEST
static switch_bool_t bug_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type)
{
switch_frame_t *frame;
@ -249,9 +250,11 @@ static switch_bool_t bug_callback(switch_media_bug_t *bug, void *user_data, swit
return SWITCH_TRUE;
}
#endif
static void bugtest_function(switch_core_session_t *session, char *data)
{
#ifdef BUGTEST
switch_media_bug_t *bug;
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_status_t status;
@ -264,6 +267,11 @@ static void bugtest_function(switch_core_session_t *session, char *data)
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
return;
}
#endif
//switch_ivr_schedule_broadcast(time(NULL) + 10, switch_core_session_get_uuid(session), "/Users/anthm/sr8k.wav", SMF_ECHO_ALEG);
//switch_ivr_schedule_transfer(time(NULL) + 10, switch_core_session_get_uuid(session), "2000", NULL, NULL);
//switch_ivr_schedule_hangup(time(NULL) + 10, switch_core_session_get_uuid(session), SWITCH_CAUSE_ALLOTTED_TIMEOUT);
switch_ivr_play_file(session, NULL, data, NULL);
}

View File

@ -95,6 +95,7 @@ static struct switch_cause_table CAUSE_CHART[] = {
{ "MANAGER_REQUEST", SWITCH_CAUSE_MANAGER_REQUEST },
{ "BLIND_TRANSFER", SWITCH_CAUSE_BLIND_TRANSFER },
{ "ATTENDED_TRANSFER", SWITCH_CAUSE_ATTENDED_TRANSFER },
{ "ALLOTTED_TIMEOUT", SWITCH_CAUSE_ALLOTTED_TIMEOUT},
{ NULL, 0 }
};

View File

@ -147,6 +147,19 @@ struct switch_core_session {
SWITCH_DECLARE_DATA switch_directories SWITCH_GLOBAL_dirs = {0};
struct switch_core_scheduler_task_container {
switch_core_scheduler_task_t task;
time_t executed;
int in_thread;
int destroyed;
switch_core_scheduler_func_t func;
switch_memory_pool_t *pool;
uint32_t flags;
char *desc;
struct switch_core_scheduler_task_container *next;
};
typedef struct switch_core_scheduler_task_container switch_core_scheduler_task_container_t;
struct switch_core_runtime {
switch_time_t initiated;
uint32_t session_id;
@ -169,6 +182,10 @@ struct switch_core_runtime {
uint32_t shutting_down;
uint8_t running;
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
switch_core_scheduler_task_container_t *task_list;
switch_mutex_t *task_mutex;
uint32_t task_id;
int task_thread_running;
};
/* Prototypes */
@ -196,6 +213,238 @@ static void db_pick_path(char *dbname, char *buf, switch_size_t size)
}
}
static void send_heartbeat(void)
{
switch_event_t *event;
switch_core_time_duration_t duration;
switch_core_measure_time(switch_core_uptime(), &duration);
if (switch_event_create(&event, SWITCH_EVENT_HEARTBEAT) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Event-Info", "System Ready");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Up-Time",
"%u year%s, "
"%u day%s, "
"%u hour%s, "
"%u minute%s, "
"%u second%s, "
"%u millisecond%s, "
"%u microsecond%s\n",
duration.yr, duration.yr == 1 ? "" : "s",
duration.day, duration.day == 1 ? "" : "s",
duration.hr, duration.hr == 1 ? "" : "s",
duration.min, duration.min == 1 ? "" : "s",
duration.sec, duration.sec == 1 ? "" : "s",
duration.ms, duration.ms == 1 ? "" : "s",
duration.mms, duration.mms == 1 ? "" : "s"
);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Session-Count", "%u", switch_core_session_count());
switch_event_fire(&event);
}
}
static void heartbeat_callback(switch_core_scheduler_task_t *task)
{
send_heartbeat();
/* reschedule this task */
task->runtime += 20;
}
static void switch_core_scheduler_execute(switch_core_scheduler_task_container_t *tp)
{
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Executing task %u %s (%s)\n", tp->task.task_id, tp->desc, switch_str_nil(tp->task.group));
tp->func(&tp->task);
if (tp->task.runtime > tp->executed) {
tp->executed = 0;
} else {
tp->destroyed = 1;
}
}
static void *SWITCH_THREAD_FUNC task_own_thread(switch_thread_t *thread, void *obj)
{
switch_core_scheduler_task_container_t *tp = (switch_core_scheduler_task_container_t *) obj;
switch_memory_pool_t *pool;
pool = tp->pool;
tp->pool = NULL;
switch_core_scheduler_execute(tp);
switch_core_destroy_memory_pool(&pool);
tp->in_thread = 0;
return NULL;
}
static int task_thread_loop(int done)
{
switch_core_scheduler_task_container_t *tofree, *tp, *last = NULL;
switch_mutex_lock(runtime.task_mutex);
for (tp = runtime.task_list; tp; tp = tp->next) {
if (done) {
tp->destroyed = 1;
} else {
time_t now = time(NULL);
if (now >= tp->task.runtime && !tp->in_thread) {
tp->executed = now;
if (switch_test_flag(tp, SSHF_OWN_THREAD)) {
switch_thread_t *thread;
switch_threadattr_t *thd_attr;
assert(switch_core_new_memory_pool(&tp->pool) == SWITCH_STATUS_SUCCESS);
switch_threadattr_create(&thd_attr, tp->pool);
switch_threadattr_detach_set(thd_attr, 1);
tp->in_thread = 1;
switch_thread_create(&thread, thd_attr, task_own_thread, tp, tp->pool);
} else {
switch_core_scheduler_execute(tp);
}
}
}
}
switch_mutex_unlock(runtime.task_mutex);
switch_mutex_lock(runtime.task_mutex);
for (tp = runtime.task_list; tp; ) {
if (tp->destroyed && !tp->in_thread) {
tofree = tp;
tp = tp->next;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting task %u %s (%s)\n",
tofree->task.task_id, tofree->desc, switch_str_nil(tofree->task.group));
if (last) {
last->next = tofree->next;
} else {
runtime.task_list = tofree->next;
}
switch_safe_free(tofree->task.group);
if (tofree->task.cmd_arg && switch_test_flag(tofree, SSHF_FREE_ARG)) {
free(tofree->task.cmd_arg);
}
switch_safe_free(tofree->desc);
free(tofree);
} else {
last = tp;
tp = tp->next;
}
}
switch_mutex_unlock(runtime.task_mutex);
return done;
}
static void *SWITCH_THREAD_FUNC switch_core_task_thread(switch_thread_t *thread, void *obj)
{
runtime.task_thread_running = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting task thread\n");
while(runtime.task_thread_running == 1) {
if (task_thread_loop(0)) {
break;
}
switch_yield(500000);
}
task_thread_loop(1);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Task thread ending\n");
runtime.task_thread_running = 0;
return NULL;
}
static void switch_core_task_thread_launch(void)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr;
switch_threadattr_create(&thd_attr, runtime.memory_pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_thread_create(&thread, thd_attr, switch_core_task_thread, NULL, runtime.memory_pool);
}
SWITCH_DECLARE(uint32_t) switch_core_scheduler_add_task(time_t task_runtime,
switch_core_scheduler_func_t func,
char *desc,
char *group,
uint32_t cmd_id,
void *cmd_arg,
switch_scheduler_flag_t flags)
{
switch_core_scheduler_task_container_t *container, *tp;
switch_mutex_lock(runtime.task_mutex);
assert((container = malloc(sizeof(*container))));
memset(container, 0, sizeof(*container));
assert(func);
container->func = func;
time(&container->task.created);
container->task.runtime = task_runtime;
container->task.group = strdup(group ? group : "none");
container->task.cmd_id = cmd_id;
container->task.cmd_arg = cmd_arg;
container->flags = flags;
container->desc = strdup(desc ? desc : "none");
for (tp = runtime.task_list; tp && tp->next; tp = tp->next);
if (tp) {
tp->next = container;
} else {
runtime.task_list = container;
}
for (container->task.task_id = 0; !container->task.task_id; container->task.task_id = ++runtime.task_id);
switch_mutex_unlock(runtime.task_mutex);
tp = container;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Added task %u %s (%s) to run at %ld\n",
tp->task.task_id, tp->desc, switch_str_nil(tp->task.group), task_runtime);
return container->task.task_id;
}
SWITCH_DECLARE(switch_status_t) switch_core_scheduler_del_task_id(uint32_t task_id)
{
switch_core_scheduler_task_container_t *tp;
switch_status_t status = SWITCH_STATUS_FALSE;
switch_mutex_lock(runtime.task_mutex);
for (tp = runtime.task_list; tp; tp = tp->next) {
if (tp->task.task_id == task_id) {
tp->destroyed++;
status = SWITCH_STATUS_SUCCESS;
break;
}
}
switch_mutex_unlock(runtime.task_mutex);
return status;
}
SWITCH_DECLARE(switch_status_t) switch_core_scheduler_del_task_group(char *group)
{
switch_core_scheduler_task_container_t *tp;
switch_status_t status = SWITCH_STATUS_FALSE;
switch_mutex_lock(runtime.task_mutex);
for (tp = runtime.task_list; tp; tp = tp->next) {
if (!strcmp(tp->task.group, group)) {
tp->destroyed++;
status = SWITCH_STATUS_SUCCESS;
}
}
switch_mutex_unlock(runtime.task_mutex);
return status;
}
static void switch_core_media_bug_destroy(switch_media_bug_t *bug)
{
switch_buffer_destroy(&bug->raw_read_buffer);
@ -2148,7 +2397,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi
}
if (!session->read_codec && (*frame)->codec) {
need_codec = TRUE;
status = SWITCH_STATUS_FALSE;
goto done;
}
if (session->bugs && !need_codec) {
@ -2422,7 +2672,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess
}
if (!session->write_codec && frame->codec) {
need_codec = TRUE;
return SWITCH_STATUS_FALSE;
}
if (session->bugs && !need_codec) {
@ -3153,11 +3403,10 @@ static void switch_core_standard_on_execute(switch_core_session_t *session)
switch_event_t *event;
const switch_application_interface_t *application_interface;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Standard EXECUTE\n");
if ((extension = switch_channel_get_caller_extension(session->channel)) == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No Extension!\n");
switch_channel_hangup(session->channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_channel_hangup(session->channel, SWITCH_CAUSE_NORMAL_CLEARING);
return;
}
@ -3732,6 +3981,8 @@ SWITCH_DECLARE(void) switch_core_session_destroy(switch_core_session_t **session
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Close Channel %s\n", switch_channel_get_name((*session)->channel));
switch_core_scheduler_del_task_group((*session)->uuid_str);
switch_mutex_lock(runtime.session_table_mutex);
switch_core_hash_delete(runtime.session_table, (*session)->uuid_str);
if (runtime.session_count) {
@ -4143,7 +4394,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
char *sqlbuf = (char *) malloc(sql_len);
char *sql;
switch_size_t newlen;
uint32_t loops = 0;
if (!runtime.event_db) {
runtime.event_db = switch_core_db_handle();
@ -4198,38 +4448,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
*sqlbuf = '\0';
}
if (loops++ >= 5000) {
switch_event_t *event;
switch_core_time_duration_t duration;
switch_core_measure_time(switch_core_uptime(), &duration);
if (switch_event_create(&event, SWITCH_EVENT_HEARTBEAT) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Event-Info", "System Ready");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Up-Time",
"%u year%s, "
"%u day%s, "
"%u hour%s, "
"%u minute%s, "
"%u second%s, "
"%u millisecond%s, "
"%u microsecond%s\n",
duration.yr, duration.yr == 1 ? "" : "s",
duration.day, duration.day == 1 ? "" : "s",
duration.hr, duration.hr == 1 ? "" : "s",
duration.min, duration.min == 1 ? "" : "s",
duration.sec, duration.sec == 1 ? "" : "s",
duration.ms, duration.ms == 1 ? "" : "s",
duration.mms, duration.mms == 1 ? "" : "s"
);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Session-Count", "%u", switch_core_session_count());
switch_event_fire(&event);
}
loops = 0;
}
if (nothing_in_queue) {
switch_yield(1000);
}
@ -4672,11 +4890,17 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(char *console, const char **err
runtime.running = 1;
switch_core_hash_init(&runtime.session_table, runtime.memory_pool);
switch_mutex_init(&runtime.session_table_mutex, SWITCH_MUTEX_NESTED, runtime.memory_pool);
switch_mutex_init(&runtime.task_mutex, SWITCH_MUTEX_NESTED, runtime.memory_pool);
#ifdef CRASH_PROT
switch_core_hash_init(&runtime.stack_table, runtime.memory_pool);
#endif
switch_core_task_thread_launch();
runtime.initiated = switch_time_now();
switch_core_scheduler_add_task(time(NULL), heartbeat_callback, "heartbeat", "core", 0, NULL, SSHF_NONE);
switch_uuid_get(&uuid);
switch_uuid_format(runtime.uuid_str, &uuid);
@ -4836,12 +5060,29 @@ SWITCH_DECLARE(switch_status_t) switch_core_destroy(void)
while (switch_queue_size(runtime.sql_queue) > 0) {
switch_yield(10000);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping Task Thread\n");
if (runtime.task_thread_running == 1) {
int sanity = 0;
runtime.task_thread_running = -1;
while(runtime.task_thread_running) {
switch_yield(100000);
if (++sanity > 10) {
break;
}
}
}
switch_core_db_close(runtime.db);
switch_core_db_close(runtime.event_db);
switch_xml_destroy();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Finalizing Shutdown.\n");
switch_log_shutdown();
if(runtime.console != stdout && runtime.console != stderr) {
fclose(runtime.console);
runtime.console = NULL;

View File

@ -3240,6 +3240,157 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_unhold_uuid(char *uuid)
return SWITCH_STATUS_SUCCESS;
}
struct hangup_helper {
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
switch_bool_t bleg;
switch_call_cause_t cause;
};
static void sch_hangup_callback(switch_core_scheduler_task_t *task)
{
struct hangup_helper *helper;
switch_core_session_t *session, *other_session;
char *other_uuid;
assert(task);
helper = (struct hangup_helper *) task->cmd_arg;
if ((session = switch_core_session_locate(helper->uuid_str))) {
switch_channel_t *channel = switch_core_session_get_channel(session);
if (helper->bleg) {
if ((other_uuid = switch_channel_get_variable(channel, SWITCH_BRIDGE_VARIABLE)) &&
(other_session = switch_core_session_locate(other_uuid))) {
switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
switch_channel_hangup(other_channel, helper->cause);
switch_core_session_rwunlock(other_session);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No channel to hangup\n");
}
} else {
switch_channel_hangup(channel, helper->cause);
}
switch_core_session_rwunlock(session);
}
}
SWITCH_DECLARE(uint32_t) switch_ivr_schedule_hangup(time_t runtime, char *uuid, switch_call_cause_t cause, switch_bool_t bleg)
{
struct hangup_helper *helper;
size_t len = sizeof(*helper);
assert((helper = malloc(len)));
memset(helper, 0, len);
switch_copy_string(helper->uuid_str, uuid, sizeof(helper->uuid_str));
helper->cause = cause;
helper->bleg = bleg;
return switch_core_scheduler_add_task(runtime, sch_hangup_callback, (char *)__SWITCH_FUNC__, uuid, 0, helper, SSHF_FREE_ARG);
}
struct transfer_helper {
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
char *extension;
char *dialplan;
char *context;
};
static void sch_transfer_callback(switch_core_scheduler_task_t *task)
{
struct transfer_helper *helper;
switch_core_session_t *session;
assert(task);
helper = (struct transfer_helper *) task->cmd_arg;
if ((session = switch_core_session_locate(helper->uuid_str))) {
switch_ivr_session_transfer(session, helper->extension, helper->dialplan, helper->context);
switch_core_session_rwunlock(session);
}
}
SWITCH_DECLARE(uint32_t) switch_ivr_schedule_transfer(time_t runtime, char *uuid, char *extension, char *dialplan, char *context)
{
struct transfer_helper *helper;
size_t len = sizeof(*helper);
char *cur = NULL;
if (extension) {
len += strlen(extension) + 1;
}
if (dialplan) {
len += strlen(dialplan) + 1;
}
if (context) {
len += strlen(context) + 1;
}
assert((helper = malloc(len)));
memset(helper, 0, len);
switch_copy_string(helper->uuid_str, uuid, sizeof(helper->uuid_str));
cur = (char *) helper + sizeof(*helper);
if (extension) {
helper->extension = cur;
switch_copy_string(helper->extension, extension, strlen(extension) + 1);
cur += strlen(helper->extension) + 1;
}
if (dialplan) {
helper->dialplan = cur;
switch_copy_string(helper->dialplan, dialplan, strlen(dialplan) + 1);
cur += strlen(helper->dialplan) + 1;
}
if (context) {
helper->context = cur;
switch_copy_string(helper->context, context, strlen(context)+1);
}
return switch_core_scheduler_add_task(runtime, sch_transfer_callback, (char *)__SWITCH_FUNC__, uuid, 0, helper, SSHF_FREE_ARG);
}
struct broadcast_helper {
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
char *path;
switch_media_flag_t flags;
};
static void sch_broadcast_callback(switch_core_scheduler_task_t *task)
{
struct broadcast_helper *helper;
assert(task);
helper = (struct broadcast_helper *) task->cmd_arg;
switch_ivr_broadcast(helper->uuid_str, helper->path, helper->flags);
}
SWITCH_DECLARE(uint32_t) switch_ivr_schedule_broadcast(time_t runtime, char *uuid, char *path, switch_media_flag_t flags)
{
struct broadcast_helper *helper;
size_t len = sizeof(*helper) + strlen(path) + 1;
assert((helper = malloc(len)));
memset(helper, 0, len);
switch_copy_string(helper->uuid_str, uuid, sizeof(helper->uuid_str));
helper->flags = flags;
helper->path = (char *) helper + sizeof(*helper);
switch_copy_string(helper->path, path, len - sizeof(helper));
return switch_core_scheduler_add_task(runtime, sch_broadcast_callback, (char *)__SWITCH_FUNC__, uuid, 0, helper, SSHF_FREE_ARG);
}
SWITCH_DECLARE(switch_status_t) switch_ivr_broadcast(char *uuid, char *path, switch_media_flag_t flags)
{
@ -3249,9 +3400,15 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_broadcast(char *uuid, char *path, swi
switch_event_t *event;
switch_core_session_t *other_session = NULL;
char *other_uuid = NULL;
char *app = "playback";
assert(path);
if ((session = switch_core_session_locate(uuid))) {
char *app;
char *cause = NULL;
char *mypath = strdup(path);
char *p;
master = session;
channel = switch_core_session_get_channel(session);
@ -3261,11 +3418,17 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_broadcast(char *uuid, char *path, swi
switch_ivr_media(uuid, SMF_REBRIDGE);
}
if (!strncasecmp(path, "speak:", 6)) {
path += 6;
app = "speak";
} else {
app = "playback";
if ((p = strchr(mypath, ':'))) {
app = mypath;
*p++ = '\0';
path = p;
}
if ((cause = strchr(app, '!'))) {
*cause++ = '\0';
if (!cause) {
cause = "normal_clearing";
}
}
if ((flags & SMF_ECHO_BLEG) && (other_uuid = switch_channel_get_variable(channel, SWITCH_BRIDGE_VARIABLE))
@ -3299,8 +3462,21 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_broadcast(char *uuid, char *path, swi
}
}
if (cause) {
if (switch_event_create(&event, SWITCH_EVENT_MESSAGE) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "call-command", "execute");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "execute-app-name", "hangup");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "execute-app-arg", "%s", cause);
switch_core_session_queue_private_event(session, &event);
}
}
switch_core_session_rwunlock(session);
switch_safe_free(mypath);
}
return SWITCH_STATUS_SUCCESS;
}
@ -3449,7 +3625,12 @@ static switch_status_t signal_bridge_on_hangup(switch_core_session_t *session)
switch_channel_set_variable(channel, SWITCH_BRIDGE_VARIABLE, NULL);
switch_channel_set_variable(other_channel, SWITCH_BRIDGE_VARIABLE, NULL);
switch_channel_hangup(other_channel, switch_channel_get_cause(channel));
if (switch_channel_get_state(other_channel) < CS_HANGUP &&
switch_true(switch_channel_get_variable(other_channel, SWITCH_HANGUP_AFTER_BRIDGE_VARIABLE))) {
switch_channel_hangup(other_channel, switch_channel_get_cause(channel));
} else {
switch_channel_set_state(other_channel, CS_EXECUTE);
}
switch_core_session_rwunlock(other_session);
}