trade a straw for a fire hose

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@7789 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2008-03-04 23:53:23 +00:00
parent bed25ed751
commit fcdb3925ac
6 changed files with 187 additions and 54 deletions

View File

@ -854,6 +854,9 @@ SWITCH_DECLARE(const char *) switch_dir_next_file(switch_dir_t *thedir, char *bu
//APR_DECLARE(apr_status_t) apr_threadattr_stacksize_set(apr_threadattr_t *attr, switch_size_t stacksize)
SWITCH_DECLARE(switch_status_t) switch_threadattr_stacksize_set(switch_threadattr_t * attr, switch_size_t stacksize);
SWITCH_DECLARE(switch_status_t) switch_threadattr_priority_increase(switch_threadattr_t *attr);
/**
* Create and initialize a new threadattr variable
* @param new_attr The newly created threadattr.

View File

@ -262,7 +262,7 @@ switch_status_t sofia_on_hangup(switch_core_session_t *session)
switch_core_hash_delete(tech_pvt->profile->chat_hash, tech_pvt->hash_key);
}
if (session) {
if (session && (tech_pvt->profile->pflags & PFLAG_PRESENCE)) {
char *sql = switch_mprintf("delete from sip_dialogs where call_id='%q'", tech_pvt->call_id);
switch_assert(sql);
sofia_glue_execute_sql(tech_pvt->profile, SWITCH_FALSE, sql, tech_pvt->profile->ireg_mutex);
@ -712,6 +712,13 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
goto end;
}
if (msg->message_id == SWITCH_MESSAGE_INDICATE_ANSWER || msg->message_id == SWITCH_MESSAGE_INDICATE_PROGRESS) {
const char *var;
if ((var = switch_channel_get_variable(channel, SOFIA_SECURE_MEDIA_VARIABLE)) && switch_true(var)) {
switch_set_flag_locked(tech_pvt, TFLAG_SECURE);
}
}
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_VIDEO_REFRESH_REQ:
{

View File

@ -88,6 +88,11 @@ typedef struct private_object private_object_t;
#include <sofia-sip/nea.h>
#include <sofia-sip/msg_addr.h>
typedef struct {
switch_bool_t master;
char *sql;
} sofia_sql_job_t;
typedef enum {
DTMF_2833,
@ -130,7 +135,8 @@ typedef enum {
PFLAG_TLS = (1 << 13),
PFLAG_CHECKUSER = (1 << 14),
PFLAG_SECURE = (1 << 15),
PFLAG_BLIND_AUTH = (1 << 16)
PFLAG_BLIND_AUTH = (1 << 16),
PFLAG_WORKER_RUNNING = (1 << 17),
} PFLAGS;
typedef enum {
@ -298,6 +304,7 @@ struct sofia_profile {
char *odbc_user;
char *odbc_pass;
switch_odbc_handle_t *master_odbc;
switch_queue_t *sql_queue;
};
struct private_object {
@ -503,6 +510,7 @@ void sofia_presence_handle_sip_i_subscribe(int status,
nua_t *nua, sofia_profile_t *profile, nua_handle_t *nh, sofia_private_t *sofia_private, sip_t const *sip, tagi_t tags[]);
void sofia_glue_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex);
void sofia_glue_actually_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex);
void sofia_reg_check_expire(sofia_profile_t *profile, time_t now);
void sofia_reg_check_gateway(sofia_profile_t *profile, time_t now);
void sofia_reg_unregister(sofia_profile_t *profile);

View File

@ -419,13 +419,71 @@ void event_handler(switch_event_t *event)
}
}
void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj)
{
sofia_profile_t *profile = (sofia_profile_t *) obj;
uint32_t ireg_loops = 0;
uint32_t gateway_loops = 0;
void *pop;
sofia_sql_job_t *job;
int loops = 0;
ireg_loops = IREG_SECONDS;
gateway_loops = GATEWAY_SECONDS;
sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
switch_queue_create(&profile->sql_queue, 500000, profile->pool);
while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || switch_queue_size(profile->sql_queue)) {
while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
job = (sofia_sql_job_t *) pop;
sofia_glue_actually_execute_sql(profile, job->master, job->sql, NULL);
free(job->sql);
free(job);
job = NULL;
}
if (++loops >= 100) {
if (++ireg_loops >= IREG_SECONDS) {
sofia_reg_check_expire(profile, switch_timestamp(NULL));
ireg_loops = 0;
}
if (++gateway_loops >= GATEWAY_SECONDS) {
sofia_reg_check_gateway(profile, switch_timestamp(NULL));
gateway_loops = 0;
}
loops = 0;
}
switch_yield(10000);
}
sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING);
return NULL;
}
void launch_sofia_worker_thread(sofia_profile_t *profile)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_threadattr_create(&thd_attr, profile->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&thread, thd_attr, sofia_profile_worker_thread_run, profile, profile->pool);
switch_yield(1000000);
}
void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void *obj)
{
sofia_profile_t *profile = (sofia_profile_t *) obj;
switch_memory_pool_t *pool;
sip_alias_node_t *node;
uint32_t ireg_loops = 0;
uint32_t gateway_loops = 0;
switch_event_t *s_event;
int tportlog = 0;
@ -516,9 +574,6 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
ireg_loops = IREG_SECONDS;
gateway_loops = GATEWAY_SECONDS;
if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
(sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
@ -547,18 +602,9 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
switch_yield(1000000);
sofia_set_pflag_locked(profile, PFLAG_RUNNING);
launch_sofia_worker_thread(profile);
while (mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) {
if (++ireg_loops >= IREG_SECONDS) {
sofia_reg_check_expire(profile, switch_timestamp(NULL));
ireg_loops = 0;
}
if (++gateway_loops >= GATEWAY_SECONDS) {
sofia_reg_check_gateway(profile, switch_timestamp(NULL));
gateway_loops = 0;
}
while (mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING) && sofia_test_pflag(profile, PFLAG_WORKER_RUNNING)) {
su_root_step(profile->s_root, 1000);
}
@ -568,6 +614,13 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
nua_shutdown(profile->nua);
su_root_run(profile->s_root);
sofia_clear_pflag_locked(profile, PFLAG_RUNNING);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "waiting for worker thread\n");
while(sofia_test_pflag(profile, PFLAG_WORKER_RUNNING)) {
switch_yield(100000);
}
while(profile->inuse) {
switch_yield(100000);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "waiting for %d session(s)\n", profile->inuse);
@ -630,9 +683,12 @@ void launch_sofia_profile_thread(sofia_profile_t *profile)
switch_threadattr_create(&thd_attr, profile->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&thread, thd_attr, sofia_profile_thread_run, profile, profile->pool);
}
static void logger(void *logarg, char const *fmt, va_list ap)
{
char *data = NULL;
@ -1415,26 +1471,28 @@ static void sofia_handle_sip_r_invite(switch_core_session_t *session, int status
contact_host = switch_str_nil(contact->url_host);
}
sql = switch_mprintf(
"insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')",
call_id,
switch_core_session_get_uuid(session),
to_user,
to_host,
from_user,
from_host,
contact_user,
contact_host,
astate,
"outbound",
user_agent
);
if (profile->pflags & PFLAG_PRESENCE) {
sql = switch_mprintf(
"insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')",
call_id,
switch_core_session_get_uuid(session),
to_user,
to_host,
from_user,
from_host,
contact_user,
contact_host,
astate,
"outbound",
user_agent
);
switch_assert(sql);
sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex);
free(sql);
} else if (status == 200) {
switch_assert(sql);
sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex);
free(sql);
}
} else if (status == 200 && (profile->pflags & PFLAG_PRESENCE)) {
char *sql = NULL;
sql = switch_mprintf("update sip_dialogs set state='%s' where uuid='%s';\n", astate, switch_core_session_get_uuid(session));
switch_assert(sql);
@ -2364,6 +2422,7 @@ void sofia_handle_sip_i_invite(nua_t *nua, sofia_profile_t *profile, nua_handle_
int is_auth = 0, calling_myself = 0;
su_addrinfo_t *my_addrinfo = msg_addrinfo(nua_current_request(nua));
if (sess_count >= sess_max || !(profile->pflags & PFLAG_RUNNING)) {
nua_respond(nh, 480, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END());
return;
@ -2795,25 +2854,29 @@ void sofia_handle_sip_i_invite(nua_t *nua, sofia_profile_t *profile, nua_handle_
contact_host = switch_str_nil(contact->url_host);
}
sql = switch_mprintf(
"insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')",
call_id,
tech_pvt->sofia_private->uuid,
to_user,
to_host,
dialog_from_user,
dialog_from_host,
contact_user,
contact_host,
"confirmed",
"inbound",
user_agent
);
switch_assert(sql);
if (profile->pflags & PFLAG_PRESENCE) {
sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex);
free(sql);
sql = switch_mprintf(
"insert into sip_dialogs values('%q','%q','%q','%q','%q','%q','%q','%q','%q','%q','%q')",
call_id,
tech_pvt->sofia_private->uuid,
to_user,
to_host,
dialog_from_user,
dialog_from_host,
contact_user,
contact_host,
"confirmed",
"inbound",
user_agent
);
switch_assert(sql);
sofia_glue_execute_sql(profile, SWITCH_FALSE, sql, profile->ireg_mutex);
free(sql);
}
return;
}

View File

@ -2444,6 +2444,31 @@ void sofia_glue_sql_close(sofia_profile_t *profile)
void sofia_glue_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex)
{
switch_status_t status = SWITCH_STATUS_FALSE;
sofia_sql_job_t *job = NULL;
if (profile->sql_queue) {
switch_zmalloc(job, sizeof(*job));
job->sql = strdup(sql);
switch_assert(job->sql);
job->master = master;
status = switch_queue_trypush(profile->sql_queue, job);
}
if (status != SWITCH_STATUS_SUCCESS) {
if (job) {
free(job->sql);
free(job);
}
sofia_glue_actually_execute_sql(profile, master, sql, mutex);
}
}
void sofia_glue_actually_execute_sql(sofia_profile_t *profile, switch_bool_t master, char *sql, switch_mutex_t *mutex)
{
switch_core_db_t *db;

View File

@ -486,6 +486,33 @@ SWITCH_DECLARE(switch_status_t) switch_threadattr_stacksize_set(switch_threadatt
return apr_threadattr_stacksize_set(attr, stacksize);
}
#ifndef WIN32
struct apr_threadattr_t {
apr_pool_t *pool;
pthread_attr_t attr;
};
#endif
SWITCH_DECLARE(switch_status_t) switch_threadattr_priority_increase(switch_threadattr_t *attr)
{
int stat = 0;
#ifndef WIN32
struct sched_param param;
struct apr_threadattr_t *myattr = attr;
pthread_attr_getschedparam(&myattr->attr, &param);
param.sched_priority = 50;
stat = pthread_attr_setschedparam(&myattr->attr, &param);
if (stat == 0) {
return SWITCH_STATUS_SUCCESS;
}
#endif
return stat;
}
SWITCH_DECLARE(switch_status_t) switch_thread_create(switch_thread_t ** new_thread, switch_threadattr_t * attr,
switch_thread_start_t func, void *data, switch_memory_pool_t *cont)
{