add/sub done but untested, mod remains. this commit also adds a message handler inside the rtp endpoint so it can properly flush streams, setup jitter buffering, and honor uuid_debug_audio requests

This commit is contained in:
Mathieu Rene 2012-07-19 17:51:25 -04:00
parent c6a75aa4ee
commit 909d464abd
4 changed files with 353 additions and 37 deletions

View File

@ -112,7 +112,7 @@ static switch_call_cause_t channel_outgoing_channel(switch_core_session_t *sessi
ctdm_private_t *tech_pvt = NULL;
if (zstr(szchanid) || zstr(szspanid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Both "kSPAN_ID" and "kCHAN_ID" have to be set.\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Both ["kSPAN_ID"] and ["kCHAN_ID"] have to be set.\n");
goto fail;
}

View File

@ -55,6 +55,8 @@ megaco_profile_t* megaco_get_profile_by_suId(SuId suId)
const void *var;
/*iterate through profile list to get requested suID profile */
switch_thread_rwlock_rdlock(megaco_globals.profile_rwlock);
for (hi = switch_hash_first(NULL, megaco_globals.profile_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
profile = (megaco_profile_t *) val;
@ -66,11 +68,170 @@ megaco_profile_t* megaco_get_profile_by_suId(SuId suId)
}
if(!found){
profile = NULL;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, " Not able to find profile associated with suId[%d]\n",suId);
}
switch_thread_rwlock_unlock(megaco_globals.profile_rwlock);
return profile;
}
/*
* Creates a freeswitch channel for the specified termination.
* The channel will be parked until future actions are taken
*/
switch_status_t megaco_activate_termination(mg_termination_t *term)
{
switch_event_t *var_event = NULL;
switch_core_session_t *session = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
char dialstring[100];
switch_call_cause_t cause;
if (!zstr(term->uuid)) {
/* A UUID is present, check if the channel still exists */
switch_core_session_t *session;
if ((session = switch_core_session_locate(term->uuid))) {
switch_core_session_rwunlock(session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Channel [%s] already exists for termination [%s]\n", term->uuid, term->name);
return SWITCH_STATUS_SUCCESS;
}
/* The referenced channel doesn't exist anymore, clear it */
term->uuid = NULL;
}
switch_event_create(&var_event, SWITCH_EVENT_CLONE);
if (term->type == MG_TERM_RTP) {
switch_snprintf(dialstring, sizeof dialstring, "rtp/%s", term->name);
switch_event_add_header_string(var_event, SWITCH_STACK_BOTTOM, kLOCALADDR, term->u.rtp.local_addr);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kLOCALPORT, "%d", term->u.rtp.local_port);
switch_event_add_header_string(var_event, SWITCH_STACK_BOTTOM, kREMOTEADDR, term->u.rtp.remote_addr);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kREMOTEPORT, "%d", term->u.rtp.remote_port);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kPTIME, "%d", term->u.rtp.ptime);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kPT, "%d", term->u.rtp.pt);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kRFC2833PT, "%d", term->u.rtp.rfc2833_pt);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kRATE, "%d", term->u.rtp.rate);
switch_event_add_header_string(var_event, SWITCH_STACK_BOTTOM, kCODEC, term->u.rtp.codec);
} else if (term->type == MG_TERM_TDM) {
switch_snprintf(dialstring, sizeof dialstring, "tdm/%s", term->name);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kSPAN_ID, "%d", term->u.tdm.span);
switch_event_add_header(var_event, SWITCH_STACK_BOTTOM, kCHAN_ID, "%d", term->u.tdm.channel);
}
/* Set common variables on the channel */
switch_event_add_header_string(var_event, SWITCH_STACK_BOTTOM, SWITCH_PARK_AFTER_BRIDGE_VARIABLE, "true");
if (switch_ivr_originate(NULL, &session, &cause, dialstring, 0, NULL, NULL, NULL, NULL, var_event, 0, NULL) != SWITCH_CAUSE_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to instanciate termination [%s]: %s\n", term->name, switch_channel_cause2str(cause));
status = SWITCH_STATUS_FALSE;
goto done;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Termination [%s] successfully instanciated as [%s] [%s]\n", term->name, dialstring, switch_core_session_get_uuid(session));
done:
if (session) {
switch_core_session_rwunlock(session);
}
switch_event_destroy(&var_event);
}
mg_termination_t *megaco_choose_termination(megaco_profile_t *profile, const char *prefix)
{
mg_termination_type_t termtype;
/* Check the termination type by prefix */
if (strncasecmp(prefix, profile->rtp_termination_id_prefix, strlen(profile->rtp_termination_id_prefix)) == 0) {
termtype = MG_TERM_RTP;
} else {
/* TODO Math: look through TDM channels */
return NULL;
}
return profile;
}
mg_termination_t *megaco_find_termination(megaco_profile_t *profile, const char *name)
{
mg_termination_t *term = switch_core_hash_find_rdlock(profile->terminations, name, profile->terminations_rwlock);
return term;
}
void megaco_termination_destroy(mg_termination_t *term)
{
/* Lookup the FS session and hang it up */
switch_core_session_t *session;
switch_channel_t *channel;
if ((session = switch_core_session_locate(term->uuid))) {
channel = switch_core_session_get_channel(session);
switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
switch_core_session_rwunlock(session);
term->uuid = NULL;
}
switch_core_hash_delete_wrlock(term->profile->terminations, term->name, term->profile->terminations_rwlock);
}
switch_status_t megaco_context_add_termination(mg_context_t *ctx, mg_termination_t *term)
{
switch_assert(ctx != NULL);
switch_assert(term != NULL);
/* Check if the current context has existing terminations */
if (ctx->terminations[0] && ctx->terminations[1]) {
/* Context is full */
return SWITCH_STATUS_FALSE;
}
if (ctx->terminations[0]) {
ctx->terminations[1] = term;
} else if (ctx->terminations[1]) {
ctx->terminations[0] = term;
} else {
ctx->terminations[0] = term;
}
if (ctx->terminations[0] && ctx->terminations[1]) {
if (zstr(ctx->terminations[0]->uuid)) {
megaco_activate_termination(ctx->terminations[0]);
}
if (zstr(ctx->terminations[1]->uuid)) {
megaco_activate_termination(ctx->terminations[1]);
}
switch_ivr_uuid_bridge(ctx->terminations[0]->uuid, ctx->terminations[1]->uuid);
}
}
switch_status_t megaco_context_sub_termination(mg_context_t *ctx, mg_termination_t *term)
{
switch_assert(ctx != NULL);
switch_assert(term != NULL);
/* Channels will automatically go to park once the bridge ends */
if (ctx->terminations[0] == term) {
ctx->terminations[0] = NULL;
} else if (ctx->terminations[1] == term) {
ctx->terminations[1] = NULL;
}
megaco_termination_destroy(term);
}
switch_status_t megaco_context_move_termination(mg_context_t *dst, mg_termination_t *term)
{
}
mg_context_t *megaco_find_context_by_suid(SuId suId, uint32_t context_id)
@ -87,20 +248,7 @@ mg_context_t *megaco_find_context_by_suid(SuId suId, uint32_t context_id)
return NULL;
}
switch_thread_rwlock_rdlock(profile->contexts_rwlock);
/* Context exists */
if (profile->contexts_bitmap[context_id % 8] & (1 << (context_id / 8))) {
for (result = profile->contexts[context_id % MG_CONTEXT_MODULO]; result; result = result->next) {
if (result->context_id == context_id) {
break;
}
}
}
switch_thread_rwlock_unlock(profile->contexts_rwlock);
return result;
return megaco_get_context(profile, context_id);
}
mg_context_t *megaco_get_context(megaco_profile_t *profile, uint32_t context_id)
@ -213,8 +361,9 @@ switch_status_t megaco_profile_start(const char *profilename)
switch_thread_rwlock_create(&profile->rwlock, pool);
switch_thread_rwlock_create(&profile->contexts_rwlock, pool);
switch_thread_rwlock_create(&profile->terminations_rwlock, pool);
// switch_core_hash_init(&profile->contexts_hash, pool);
switch_core_hash_init(&profile->terminations, pool);
if (SWITCH_STATUS_SUCCESS != config_profile(profile, SWITCH_FALSE)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error configuring profile %s\n", profile->name);

View File

@ -64,25 +64,46 @@ typedef enum {
typedef struct megaco_profile_s megaco_profile_t;
typedef struct mg_context_s mg_context_t;
/* RTP parameters understood by the controllable channel */
#define kLOCALADDR "local_addr"
#define kLOCALPORT "local_port"
#define kREMOTEADDR "remote_addr"
#define kREMOTEPORT "remote_port"
#define kCODEC "codec"
#define kPTIME "ptime"
#define kPT "pt"
#define kRFC2833PT "rfc2833_pt"
#define kMODE "mode"
#define kRATE "rate"
/* TDM parameters understood by the controllable channel */
#define kSPAN_ID "span"
#define kCHAN_ID "chan"
typedef struct mg_termination_s {
mg_termination_type_t type;
const char *uuid;
mg_context_t *context;
const char *name; /*!< Megaco Name */
const char *uuid; /*!< UUID of the associated FS channel, or NULL if it's not activated */
mg_context_t *context; /*!< Context in which this termination is connected, or NULL */
megaco_profile_t *profile; /*!< Parent MG profile */
union {
struct {
const char *codec;
int ptime;
const char *remote_address;
switch_port_t remote_port;
/* The RTP termination will automatically operate as "sendonly" or "recvonly" as soon as
* one of the network addresses are NULL */
const char *local_addr;
switch_port_t local_port;
CmSdpInfoSet *local_sdp;
CmSdpInfoSet *remote_sdp;
const char *remote_addr;
switch_port_t remote_port;
unsigned mode:2;
unsigned :0;
int ptime;
int pt;
int rfc2833_pt;
int rate;
const char *codec;
} rtp;
struct {
int span;
int channel;
@ -93,7 +114,7 @@ typedef struct mg_termination_s {
struct mg_context_s {
uint32_t context_id;
mg_termination_t terminations[MG_CONTEXT_MAX_TERMS];
mg_termination_t *terminations[MG_CONTEXT_MAX_TERMS];
megaco_profile_t *profile;
mg_context_t *next;
switch_memory_pool_t *pool;
@ -102,6 +123,7 @@ struct mg_context_s {
#define MG_CONTEXT_MODULO 16
#define MG_MAX_CONTEXTS 32768
struct megaco_profile_s {
char *name;
switch_memory_pool_t *pool;
@ -125,6 +147,9 @@ struct megaco_profile_s {
uint32_t next_context_id;
uint8_t contexts_bitmap[MG_MAX_CONTEXTS/8]; /* Availability matrix, enough bits for a 32768 bitmap */
mg_context_t *contexts[MG_CONTEXT_MODULO];
switch_hash_t *terminations;
switch_thread_rwlock_t *terminations_rwlock;
};

View File

@ -75,7 +75,11 @@ typedef struct {
switch_port_t remote_port;
switch_payload_t agreed_pt; /*XXX*/
sofia_dtmf_t dtmf_type;
enum {
RTP_SENDONLY,
RTP_RECVONLY,
RTP_SENDRECV
} mode;
} crtp_private_t;
static switch_status_t channel_on_init(switch_core_session_t *session);
@ -128,9 +132,11 @@ static switch_call_cause_t channel_outgoing_channel(switch_core_session_t *sessi
switch_channel_t *channel;
char name[128];
crtp_private_t *tech_pvt = NULL;
switch_caller_profile_t *caller_profile;
const char *err;
const char *local_addr = switch_event_get_header_nil(var_event, kLOCALADDR),
*szlocal_port = switch_event_get_header_nil(var_event, kLOCALPORT),
*remote_addr = switch_event_get_header_nil(var_event, kREMOTEADDR),
@ -151,6 +157,14 @@ static switch_call_cause_t channel_outgoing_channel(switch_core_session_t *sessi
rate = !zstr(szrate) ? atoi(szrate) : 8000,
pt = !zstr(szpt) ? atoi(szpt) : 0;
if (
((zstr(remote_addr) || remote_port == 0) && (zstr(local_addr) || local_port == 0)) ||
zstr(codec) ||
zstr(szpt)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing required arguments\n");
goto fail;
}
if (!(*new_session = switch_core_session_request(crtp.endpoint_interface, SWITCH_CALL_DIRECTION_OUTBOUND, 0, pool))) {
@ -171,10 +185,21 @@ static switch_call_cause_t channel_outgoing_channel(switch_core_session_t *sessi
tech_pvt->agreed_pt = pt;
tech_pvt->dtmf_type = DTMF_2833; /* XXX */
if (zstr(local_addr) || local_port == 0) {
tech_pvt->mode = RTP_SENDONLY;
} else if (zstr(remote_addr) || remote_port == 0) {
tech_pvt->mode = RTP_SENDRECV;
} else {
}
switch_core_session_set_private(*new_session, tech_pvt);
caller_profile = switch_caller_profile_clone(*new_session, outbound_profile);
switch_channel_set_caller_profile(channel, caller_profile);
snprintf(name, sizeof(name), "rtp/ctrl");
snprintf(name, sizeof(name), "rtp/%s", outbound_profile->destination_number);
switch_channel_set_name(channel, name);
switch_channel_set_state(channel, CS_INIT);
@ -251,7 +276,7 @@ static switch_status_t channel_on_init(switch_core_session_t *session)
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_channel_set_state(channel, CS_ROUTING);
switch_channel_set_state(channel, CS_CONSUME_MEDIA);
return SWITCH_STATUS_SUCCESS;
}
@ -319,7 +344,7 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc
{
crtp_private_t *tech_pvt;
switch_channel_t *channel;
int frames = 0, bytes = 0, samples = 0;
//int frames = 0, bytes = 0, samples = 0;
channel = switch_core_session_get_channel(session);
assert(channel != NULL);
@ -328,6 +353,7 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc
assert(tech_pvt != NULL);
#if 0
if (!switch_test_flag(frame, SFF_CNG) && !switch_test_flag(frame, SFF_PROXY_PACKET)) {
if (tech_pvt->read_codec.implementation->encoded_bytes_per_packet) {
bytes = tech_pvt->read_codec.implementation->encoded_bytes_per_packet;
@ -339,6 +365,8 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc
}
tech_pvt->timestamp_send += samples;
#endif
switch_rtp_write_frame(tech_pvt->rtp_session, frame);
return SWITCH_STATUS_SUCCESS;
@ -369,6 +397,120 @@ static switch_status_t channel_send_dtmf(switch_core_session_t *session, const s
static switch_status_t channel_receive_message(switch_core_session_t *session, switch_core_session_message_t *msg)
{
crtp_private_t *tech_pvt = NULL;
tech_pvt = switch_core_session_get_private(session);
assert(tech_pvt != NULL);
switch (msg->message_id) {
case SWITCH_MESSAGE_INDICATE_DEBUG_AUDIO:
{
if (switch_rtp_ready(tech_pvt->rtp_session) && !zstr(msg->string_array_arg[0]) && !zstr(msg->string_array_arg[1])) {
int32_t flags = 0;
if (!strcasecmp(msg->string_array_arg[0], "read")) {
flags |= SWITCH_RTP_FLAG_DEBUG_RTP_READ;
} else if (!strcasecmp(msg->string_array_arg[0], "write")) {
flags |= SWITCH_RTP_FLAG_DEBUG_RTP_WRITE;
} else if (!strcasecmp(msg->string_array_arg[0], "both")) {
flags |= SWITCH_RTP_FLAG_DEBUG_RTP_READ | SWITCH_RTP_FLAG_DEBUG_RTP_WRITE;
}
if (flags) {
if (switch_true(msg->string_array_arg[1])) {
switch_rtp_set_flag(tech_pvt->rtp_session, flags);
} else {
switch_rtp_clear_flag(tech_pvt->rtp_session, flags);
}
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Invalid Options\n");
}
}
break;
}
case SWITCH_MESSAGE_INDICATE_AUDIO_SYNC:
if (switch_rtp_ready(tech_pvt->rtp_session)) {
rtp_flush_read_buffer(tech_pvt->rtp_session, SWITCH_RTP_FLUSH_ONCE);
}
break;
case SWITCH_MESSAGE_INDICATE_JITTER_BUFFER:
{
if (switch_rtp_ready(tech_pvt->rtp_session)) {
int len = 0, maxlen = 0, qlen = 0, maxqlen = 50, max_drift = 0;
if (msg->string_arg) {
char *p, *q;
const char *s;
if (!strcasecmp(msg->string_arg, "pause")) {
switch_rtp_pause_jitter_buffer(tech_pvt->rtp_session, SWITCH_TRUE);
goto end;
} else if (!strcasecmp(msg->string_arg, "resume")) {
switch_rtp_pause_jitter_buffer(tech_pvt->rtp_session, SWITCH_FALSE);
goto end;
} else if (!strncasecmp(msg->string_arg, "debug:", 6)) {
s = msg->string_arg + 6;
if (s && !strcmp(s, "off")) {
s = NULL;
}
switch_rtp_debug_jitter_buffer(tech_pvt->rtp_session, s);
goto end;
}
if ((len = atoi(msg->string_arg))) {
qlen = len / (tech_pvt->read_codec.implementation->microseconds_per_packet / 1000);
if (qlen < 1) {
qlen = 3;
}
}
if (qlen) {
if ((p = strchr(msg->string_arg, ':'))) {
p++;
maxlen = atol(p);
if ((q = strchr(p, ':'))) {
q++;
max_drift = abs(atol(q));
}
}
}
if (maxlen) {
maxqlen = maxlen / (tech_pvt->read_codec.implementation->microseconds_per_packet / 1000);
}
}
if (qlen) {
if (maxqlen < qlen) {
maxqlen = qlen * 5;
}
if (switch_rtp_activate_jitter_buffer(tech_pvt->rtp_session, qlen, maxqlen,
tech_pvt->read_codec.implementation->samples_per_packet,
tech_pvt->read_codec.implementation->samples_per_second, max_drift) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(tech_pvt->session),
SWITCH_LOG_DEBUG, "Setting Jitterbuffer to %dms (%d frames) (%d max frames) (%d max drift)\n",
len, qlen, maxqlen, max_drift);
switch_channel_set_flag(tech_pvt->channel, CF_JITTERBUFFER);
if (!switch_false(switch_channel_get_variable(tech_pvt->channel, "sip_jitter_buffer_plc"))) {
switch_channel_set_flag(tech_pvt->channel, CF_JITTERBUFFER_PLC);
}
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(tech_pvt->session),
SWITCH_LOG_WARNING, "Error Setting Jitterbuffer to %dms (%d frames)\n", len, qlen);
}
} else {
switch_rtp_deactivate_jitter_buffer(tech_pvt->rtp_session);
}
}
}
break;
default:
break;
}
end:
return SWITCH_STATUS_SUCCESS;
}