Extreme IAX2 trunking performance improvements

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@2783 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Spencer
2004-04-27 15:18:55 +00:00
parent 5385ca0a0e
commit 7596de9ac8

View File

@@ -230,10 +230,6 @@ struct iax2_peer {
int delme; /* I need to be deleted */
int temponly; /* I'm only a temp */
int trunk; /* Treat as an IAX trunking */
struct timeval txtrunktime; /* Transmit trunktime */
struct timeval rxtrunktime; /* Receive trunktime */
struct timeval lasttxtime; /* Last transmitted trunktime */
unsigned int lastsent; /* Last sent time */
/* Qualification */
int callno; /* Call number of POKE request */
@@ -246,6 +242,28 @@ struct iax2_peer {
int notransfer;
};
#define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr))
static struct iax2_trunk_peer {
ast_mutex_t lock;
struct sockaddr_in addr;
struct timeval txtrunktime; /* Transmit trunktime */
struct timeval rxtrunktime; /* Receive trunktime */
struct timeval lasttxtime; /* Last transmitted trunktime */
struct timeval trunkact; /* Last trunk activity */
unsigned int lastsent; /* Last sent time */
/* Trunk data and length */
unsigned char *trunkdata;
unsigned int trunkdatalen;
unsigned int trunkdataalloc;
struct iax2_trunk_peer *next;
int trunkerror;
int calls;
int firstcallno;
} *tpeers = NULL;
static ast_mutex_t tpeerlock = AST_MUTEX_INITIALIZER;
struct iax_firmware {
struct iax_firmware *next;
int fd;
@@ -290,7 +308,8 @@ static struct iax2_registry *registrations;
#define MAX_RETRY_TIME 10000
#define MAX_JITTER_BUFFER 50
#define MAX_TRUNKDATA 640 /* 40ms, uncompressed linear */
#define DEFAULT_TRUNKDATA 640 * 10 /* 40ms, uncompressed linear * 10 channels */
#define MAX_TRUNKDATA 640 * 200 /* 40ms, uncompressed linear * 200 channels */
/* If we have more than this much excess real jitter buffer, srhink it. */
static int max_jitter_buffer = MAX_JITTER_BUFFER;
@@ -426,10 +445,6 @@ struct chan_iax2_pvt {
int amaflags;
/* This is part of a trunk interface */
int trunk;
/* Trunk data and length */
unsigned char trunkdata[MAX_TRUNKDATA];
unsigned int trunkdatalen;
int trunkerror;
struct iax2_dpcache *dpentries;
int notransfer; /* do we want native bridging */
};
@@ -2530,39 +2545,38 @@ static struct ast_channel *ast_iax2_new(struct chan_iax2_pvt *i, int state, int
return tmp;
}
static unsigned int calc_txpeerstamp(struct iax2_peer *peer, int sampms)
static unsigned int calc_txpeerstamp(struct iax2_trunk_peer *tpeer, int sampms, struct timeval *tv)
{
struct timeval tv;
long int mssincetx;
long int ms, pred;
gettimeofday(&tv, NULL);
mssincetx = (tv.tv_sec - peer->lasttxtime.tv_sec) * 1000 + (tv.tv_usec - peer->lasttxtime.tv_usec) / 1000;
tpeer->trunkact = *tv;
mssincetx = (tv->tv_sec - tpeer->lasttxtime.tv_sec) * 1000 + (tv->tv_usec - tpeer->lasttxtime.tv_usec) / 1000;
if (mssincetx > 5000) {
/* If it's been at least 5 seconds since the last time we transmitted on this trunk, reset our timers */
peer->txtrunktime.tv_sec = tv.tv_sec;
peer->txtrunktime.tv_usec = tv.tv_usec;
peer->lastsent = 999999;
tpeer->txtrunktime.tv_sec = tv->tv_sec;
tpeer->txtrunktime.tv_usec = tv->tv_usec;
tpeer->lastsent = 999999;
}
/* Update last transmit time now */
peer->lasttxtime.tv_sec = tv.tv_sec;
peer->lasttxtime.tv_usec = tv.tv_usec;
tpeer->lasttxtime.tv_sec = tv->tv_sec;
tpeer->lasttxtime.tv_usec = tv->tv_usec;
/* Calculate ms offset */
ms = (tv.tv_sec - peer->txtrunktime.tv_sec) * 1000 + (tv.tv_usec - peer->txtrunktime.tv_usec) / 1000;
ms = (tv->tv_sec - tpeer->txtrunktime.tv_sec) * 1000 + (tv->tv_usec - tpeer->txtrunktime.tv_usec) / 1000;
/* Predict from last value */
pred = peer->lastsent + sampms;
pred = tpeer->lastsent + sampms;
if (abs(ms - pred) < 640)
ms = pred;
/* We never send the same timestamp twice, so fudge a little if we must */
if (ms == peer->lastsent)
ms = peer->lastsent + 1;
peer->lastsent = ms;
if (ms == tpeer->lastsent)
ms = tpeer->lastsent + 1;
tpeer->lastsent = ms;
return ms;
}
static unsigned int fix_peerts(struct iax2_peer *peer, int callno, unsigned int ts)
static unsigned int fix_peerts(struct iax2_trunk_peer *peer, int callno, unsigned int ts)
{
long ms; /* NOT unsigned */
if (!iaxs[callno]->rxcore.tv_sec && !iaxs[callno]->rxcore.tv_usec) {
@@ -2677,6 +2691,85 @@ static unsigned int calc_rxstamp(struct chan_iax2_pvt *p)
return ms;
}
struct iax2_trunk_peer *find_tpeer(struct sockaddr_in *sin)
{
struct iax2_trunk_peer *tpeer;
/* Finds and locks trunk peer */
ast_mutex_lock(&tpeerlock);
tpeer = tpeers;
while(tpeer) {
/* We don't lock here because tpeer->addr *never* changes */
if (!inaddrcmp(&tpeer->addr, sin)) {
ast_mutex_lock(&tpeer->lock);
break;
}
tpeer = tpeer->next;
}
if (!tpeer) {
tpeer = malloc(sizeof(struct iax2_trunk_peer));
if (tpeer) {
memset(tpeer, 0, sizeof(struct iax2_trunk_peer));
ast_mutex_init(&tpeer->lock);
tpeer->lastsent = 9999;
memcpy(&tpeer->addr, sin, sizeof(tpeer->addr));
gettimeofday(&tpeer->trunkact, NULL);
ast_mutex_lock(&tpeer->lock);
tpeer->next = tpeers;
tpeers = tpeer;
ast_log(LOG_DEBUG, "Created trunk peer for '%s:%d'\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
}
}
ast_mutex_unlock(&tpeerlock);
return tpeer;
}
static int iax2_trunk_queue(struct chan_iax2_pvt *pvt, struct ast_frame *f)
{
struct iax2_trunk_peer *tpeer;
void *tmp, *ptr;
struct ast_iax2_meta_trunk_entry *met;
tpeer = find_tpeer(&pvt->addr);
if (tpeer) {
if (tpeer->trunkdatalen + f->datalen + 4 >= tpeer->trunkdataalloc) {
/* Need to reallocate space */
if (tpeer->trunkdataalloc < MAX_TRUNKDATA) {
tmp = realloc(tpeer->trunkdata, tpeer->trunkdataalloc + DEFAULT_TRUNKDATA + IAX2_TRUNK_PREFACE);
if (tmp) {
tpeer->trunkdataalloc += DEFAULT_TRUNKDATA;
tpeer->trunkdata = tmp;
ast_log(LOG_DEBUG, "Expanded trunk '%s:%d' to %d bytes\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), tpeer->trunkdataalloc);
} else {
ast_log(LOG_WARNING, "Insufficient memory to expand trunk data to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
ast_mutex_unlock(&tpeer->lock);
return -1;
}
} else {
ast_log(LOG_WARNING, "Maximum trunk data space exceeded to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
ast_mutex_unlock(&tpeer->lock);
return -1;
}
}
/* Append to meta frame */
ptr = tpeer->trunkdata + IAX2_TRUNK_PREFACE + tpeer->trunkdatalen;
met = (struct ast_iax2_meta_trunk_entry *)ptr;
/* Store call number and length in meta header */
met->callno = htons(pvt->callno);
met->len = htons(f->datalen);
/* Advance pointers/decrease length past trunk entry header */
ptr += sizeof(struct ast_iax2_meta_trunk_entry);
tpeer->trunkdatalen += sizeof(struct ast_iax2_meta_trunk_entry);
/* Copy actual trunk data */
memcpy(ptr, f->data, f->datalen);
tpeer->trunkdatalen += f->datalen;
if (!tpeer->firstcallno)
tpeer->firstcallno = pvt->callno;
tpeer->calls++;
ast_mutex_unlock(&tpeer->lock);
}
return 0;
}
static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned int ts, int seqno, int now, int transfer, int final)
{
/* Queue a packet for delivery on a given private structure. Use "ts" for
@@ -2794,17 +2887,7 @@ static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned in
res = iax2_transmit(fr);
} else {
if (pvt->trunk) {
/* Queue for transmission in a meta frame */
if ((sizeof(pvt->trunkdata) - pvt->trunkdatalen) >= fr->af.datalen) {
memcpy(pvt->trunkdata + pvt->trunkdatalen, fr->af.data, fr->af.datalen);
pvt->trunkdatalen += fr->af.datalen;
res = 0;
pvt->trunkerror = 0;
} else {
if (!pvt->trunkerror)
ast_log(LOG_WARNING, "Out of trunk data space on call number %d, dropping\n", pvt->callno);
pvt->trunkerror = 1;
}
iax2_trunk_queue(pvt, &fr->af);
res = 0;
} else if (fr->af.frametype == AST_FRAME_VIDEO) {
/* Video frame have no sequence number */
@@ -4192,96 +4275,70 @@ static int iax2_poke_peer_s(void *data)
return 0;
}
static int send_trunk(struct iax2_peer *peer)
static int send_trunk(struct iax2_trunk_peer *tpeer, struct timeval *now)
{
int x;
int calls = 0;
int res = 0;
int firstcall = 0;
unsigned char buf[65536 + sizeof(struct iax_frame)], *ptr;
int len = 65536;
struct iax_frame *fr;
struct ast_iax2_meta_hdr *meta;
struct ast_iax2_meta_trunk_hdr *mth;
struct ast_iax2_meta_trunk_entry *met;
int calls = 0;
/* Point to frame */
fr = (struct iax_frame *)buf;
fr = (struct iax_frame *)tpeer->trunkdata;
/* Point to meta data */
meta = (struct ast_iax2_meta_hdr *)fr->afdata;
mth = (struct ast_iax2_meta_trunk_hdr *)meta->data;
/* Point past meta data for first meta trunk entry */
ptr = fr->afdata + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
len -= sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
/* Search through trunked calls for a match with this peer */
for (x=TRUNK_CALL_START;x<maxtrunkcall; x++) {
ast_mutex_lock(&iaxsl[x]);
#if 0
if (iaxtrunkdebug)
ast_verbose("Call %d is at %s:%d (%d)\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port), iaxs[x]->addr.sin_family);
#endif
if (iaxs[x] && iaxs[x]->trunk && iaxs[x]->trunkdatalen && !inaddrcmp(&iaxs[x]->addr, &peer->addr)) {
if (iaxtrunkdebug)
ast_verbose(" -- Sending call %d via trunk to %s:%d\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port));
if (len >= iaxs[x]->trunkdatalen + sizeof(struct ast_iax2_meta_trunk_entry)) {
met = (struct ast_iax2_meta_trunk_entry *)ptr;
/* Store call number and length in meta header */
met->callno = htons(x);
met->len = htons(iaxs[x]->trunkdatalen);
/* Advance pointers/decrease length past trunk entry header */
ptr += sizeof(struct ast_iax2_meta_trunk_entry);
len -= sizeof(struct ast_iax2_meta_trunk_entry);
/* Copy actual trunk data */
memcpy(ptr, iaxs[x]->trunkdata, iaxs[x]->trunkdatalen);
/* Advance pointeres/decrease length for actual data */
ptr += iaxs[x]->trunkdatalen;
len -= iaxs[x]->trunkdatalen;
} else
ast_log(LOG_WARNING, "Out of space in frame for trunking call %d\n", x);
iaxs[x]->trunkdatalen = 0;
calls++;
if (!firstcall)
firstcall = x;
}
ast_mutex_unlock(&iaxsl[x]);
}
if (calls) {
if (tpeer->trunkdatalen) {
/* We're actually sending a frame, so fill the meta trunk header and meta header */
meta->zeros = 0;
meta->metacmd = IAX_META_TRUNK;
meta->cmddata = 0;
mth->ts = htonl(calc_txpeerstamp(peer, trunkfreq));
mth->ts = htonl(calc_txpeerstamp(tpeer, trunkfreq, now));
/* And the rest of the ast_iax2 header */
fr->direction = DIRECTION_OUTGRESS;
fr->retrans = -1;
fr->transfer = 0;
/* Any appropriate call will do */
fr->callno = firstcall;
fr->callno = tpeer->firstcallno;
fr->data = fr->afdata;
fr->datalen = 65536 - len;
fr->datalen = tpeer->trunkdatalen + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
#if 0
ast_log(LOG_DEBUG, "Trunking %d calls in %d bytes, ts=%d\n", calls, fr->datalen, ntohl(mth->ts));
#endif
res = send_packet(fr);
calls = tpeer->calls;
/* Reset transmit trunk side data */
tpeer->trunkdatalen = 0;
tpeer->calls = 0;
tpeer->firstcallno = 0;
}
if (res < 0)
return res;
return calls;
}
static inline int iax2_trunk_expired(struct iax2_trunk_peer *tpeer, struct timeval *now)
{
/* Drop when trunk is about 5 seconds idle */
if (now->tv_sec > tpeer->trunkact.tv_sec + 5)
return 1;
return 0;
}
static int timing_read(int *id, int fd, short events, void *cbdata)
{
char buf[1024];
int res;
struct iax2_peer *peer;
struct iax2_trunk_peer *tpeer, *prev = NULL, *drop=NULL;
int processed = 0;
int totalcalls = 0;
#ifdef ZT_TIMERACK
int x = 1;
#endif
struct timeval now;
if (iaxtrunkdebug)
ast_verbose("Beginning trunk processing\n");
gettimeofday(&now, NULL);
if (events & AST_IO_PRI) {
#ifdef ZT_TIMERACK
/* Great, this is a timing interface, just call the ioctl */
@@ -4299,20 +4356,43 @@ static int timing_read(int *id, int fd, short events, void *cbdata)
}
}
/* For each peer that supports trunking... */
ast_mutex_lock(&peerl.lock);
peer = peerl.peers;
while(peer) {
if (peer->trunk) {
processed++;
res = send_trunk(peer);
ast_mutex_lock(&tpeerlock);
tpeer = tpeers;
while(tpeer) {
processed++;
ast_mutex_lock(&tpeer->lock);
/* We can drop a single tpeer per pass. That makes all this logic
substantially easier */
if (!drop && iax2_trunk_expired(tpeer, &now)) {
/* Take it out of the list, but don't free it yet, because it
could be in use */
if (prev)
prev->next = tpeer->next;
else
tpeers = tpeer->next;
drop = tpeer;
} else {
res = send_trunk(tpeer, &now);
if (iaxtrunkdebug)
ast_verbose("Processed trunk peer '%s' (%s:%d) with %d call(s)\n", peer->name, inet_ntoa(peer->addr.sin_addr), ntohs(peer->addr.sin_port), res);
totalcalls += res;
res = 0;
}
peer = peer->next;
ast_verbose("Processed trunk peer (%s:%d) with %d call(s)\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), res);
}
totalcalls += res;
res = 0;
ast_mutex_unlock(&tpeer->lock);
prev = tpeer;
tpeer = tpeer->next;
}
ast_mutex_unlock(&tpeerlock);
if (drop) {
ast_mutex_lock(&drop->lock);
/* Once we have this lock, we're sure nobody else is using it or could use it once we release it,
because by the time they could get tpeerlock, we've already grabbed it */
ast_log(LOG_DEBUG, "Dropping unused iax2 trunk peer '%s:%d'\n", inet_ntoa(drop->addr.sin_addr), ntohs(drop->addr.sin_port));
free(drop->trunkdata);
ast_mutex_unlock(&drop->lock);
free(drop);
}
ast_mutex_unlock(&peerl.lock);
if (iaxtrunkdebug)
ast_verbose("Ending trunk processing with %d peers and %d calls processed\n", processed, totalcalls);
iaxtrunkdebug =0;
@@ -4466,6 +4546,7 @@ static int iax_park(struct ast_channel *chan1, struct ast_channel *chan2)
return -1;
}
static int socket_read(int *id, int fd, short events, void *cbdata)
{
struct sockaddr_in sin;
@@ -4488,6 +4569,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
struct ast_channel *c;
struct iax2_dpcache *dp;
struct iax2_peer *peer;
struct iax2_trunk_peer *tpeer;
struct iax_ies ies;
struct iax_ie_data ied0, ied1;
int format;
@@ -4526,21 +4608,16 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
ts = ntohl(mth->ts);
res -= (sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr));
ptr = mth->data;
ast_mutex_lock(&peerl.lock);
peer = peerl.peers;
while(peer) {
if (!inaddrcmp(&peer->addr, &sin))
break;
peer = peer->next;
}
ast_mutex_unlock(&peerl.lock);
if (!peer) {
tpeer = find_tpeer(&sin);
if (!tpeer) {
ast_log(LOG_WARNING, "Unable to accept trunked packet from '%s:%d': No matching peer\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
return 1;
}
if (!ts || (!peer->rxtrunktime.tv_sec && !peer->rxtrunktime.tv_usec)) {
gettimeofday(&peer->rxtrunktime, NULL);
}
if (!ts || (!tpeer->rxtrunktime.tv_sec && !tpeer->rxtrunktime.tv_usec)) {
gettimeofday(&tpeer->rxtrunktime, NULL);
tpeer->trunkact = tpeer->rxtrunktime;
} else
gettimeofday(&tpeer->trunkact, NULL);
while(res >= sizeof(struct ast_iax2_meta_trunk_entry)) {
/* Process channels */
mte = (struct ast_iax2_meta_trunk_entry *)ptr;
@@ -4566,7 +4643,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
f.data = ptr;
else
f.data = NULL;
fr.ts = fix_peerts(peer, fr.callno, ts);
fr.ts = fix_peerts(tpeer, fr.callno, ts);
/* Don't pass any packets until we're started */
if ((iaxs[fr.callno]->state & IAX_STATE_STARTED)) {
/* Common things */
@@ -4602,6 +4679,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
ptr += len;
res -= len;
}
ast_mutex_unlock(&tpeer->lock);
}
return 1;
@@ -5746,7 +5824,6 @@ static struct iax2_peer *build_peer(char *name, struct ast_variable *v)
memset(peer, 0, sizeof(struct iax2_peer));
peer->expire = -1;
peer->pokeexpire = -1;
peer->lastsent = 999999;
}
if (peer) {
if (!found) {