MODFORM-32 - mp3 improvements (seeking, cleanup)

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@14223 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Rupa Schomaker 2009-07-13 19:01:03 +00:00
parent 4170be7261
commit 154d82399b
1 changed files with 104 additions and 232 deletions

View File

@ -25,6 +25,7 @@
* *
* Anthony Minessale II <anthm@freeswitch.org> * Anthony Minessale II <anthm@freeswitch.org>
* Rupa Schomaker <rupa@rupa.com> * Rupa Schomaker <rupa@rupa.com>
* John Wehle <john@feith.com>
* *
* mod_shout.c -- Icecast Module * mod_shout.c -- Icecast Module
* *
@ -109,7 +110,6 @@ struct shout_context {
int err; int err;
int mp3err; int mp3err;
int dlen; int dlen;
switch_file_t *fd;
FILE *fp; FILE *fp;
int samplerate; int samplerate;
uint8_t thread_running; uint8_t thread_running;
@ -129,21 +129,35 @@ struct shout_context {
typedef struct shout_context shout_context_t; typedef struct shout_context shout_context_t;
static size_t decode_fd(shout_context_t *context, void *data, size_t bytes); static void decode_fd(shout_context_t *context, void *data, size_t bytes);
static inline void free_context(shout_context_t *context) static inline void free_context(shout_context_t *context)
{ {
int ret; int ret;
if (context) { if (context) {
switch_mutex_lock(context->audio_mutex);
context->err++; context->err++;
switch_thread_rwlock_wrlock(context->rwlock); switch_mutex_unlock(context->audio_mutex);
if (context->fd) { if (context->stream_url) {
switch_file_close(context->fd); int sanity = 0;
context->fd = NULL;
while (context->thread_running) {
switch_yield(500000);
if (++sanity > 10) {
break;
}
}
} }
switch_thread_rwlock_wrlock(context->rwlock);
if (context->mh) {
mpg123_close(context->mh);
mpg123_delete(context->mh);
}
if (context->fp) { if (context->fp) {
unsigned char mp3buffer[8192]; unsigned char mp3buffer[8192];
int len; int len;
@ -173,12 +187,6 @@ static inline void free_context(shout_context_t *context)
context->fp = NULL; context->fp = NULL;
} }
if (context->audio_buffer) {
switch_mutex_lock(context->audio_mutex);
switch_buffer_destroy(&context->audio_buffer);
switch_mutex_unlock(context->audio_mutex);
}
if (context->shout) { if (context->shout) {
shout_close(context->shout); shout_close(context->shout);
context->shout = NULL; context->shout = NULL;
@ -189,23 +197,12 @@ static inline void free_context(shout_context_t *context)
context->gfp = NULL; context->gfp = NULL;
} }
if (context->stream_url) { if (context->audio_buffer) {
int sanity = 0; switch_buffer_destroy(&context->audio_buffer);
while (context->thread_running) {
switch_yield(500000);
if (++sanity > 10) {
break;
}
}
} }
if (context->mh) { switch_mutex_destroy(context->audio_mutex);
mpg123_delete(context->mh);
}
switch_thread_rwlock_unlock(context->rwlock); switch_thread_rwlock_unlock(context->rwlock);
switch_thread_rwlock_destroy(context->rwlock); switch_thread_rwlock_destroy(context->rwlock);
} }
@ -289,154 +286,60 @@ static void log_msg(char const *fmt, va_list ap)
} }
} }
static size_t decode_fd(shout_context_t *context, void *data, size_t bytes) static void decode_fd(shout_context_t *context, void *data, size_t bytes)
{ {
int decode_status = 0; int decode_status = 0;
size_t dlen = 0; size_t usedlen;
int x = 0;
unsigned char *in;
int inlen = 0;
unsigned char *out;
int outlen;
int usedlen;
unsigned char inbuf[MP3_SCACHE];
int done = 0;
size_t used;
size_t lp;
size_t rb = 0;
while (context->eof < 2 && switch_buffer_inuse(context->audio_buffer) < bytes) { while (!context->err && !context->eof && switch_buffer_inuse(context->audio_buffer) < bytes) {
lp = sizeof(inbuf);
if (!context->eof && ((switch_file_read(context->fd, inbuf, &lp) != SWITCH_STATUS_SUCCESS) || lp == 0)) {
context->eof++;
}
inlen = (int) lp;
in = inbuf;
out = context->decode_buf;
outlen = (int) sizeof(context->decode_buf);
usedlen = 0; usedlen = 0;
x = 0;
if (lp < bytes) { decode_status = mpg123_read(context->mh, context->decode_buf, sizeof(context->decode_buf), &usedlen);
done = 1;
}
do {
if (context->eof) {
decode_status = mpg123_read(context->mh, out, outlen, &dlen);
} else {
decode_status = mpg123_decode(context->mh, in, inlen, out, outlen, &dlen);
}
if (context->err) { if (decode_status == MPG123_NEW_FORMAT) {
continue;
} else if (decode_status == MPG123_OK) {
;
} else if (decode_status == MPG123_DONE || decode_status == MPG123_NEED_MORE) {
context->eof++;
} else if (decode_status == MPG123_ERR || decode_status > 0) {
if (++context->mp3err >= 5) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n");
context->eof++;
goto error; goto error;
} }
continue;
if (!x) {
in = NULL;
inlen = 0;
x++;
}
if (decode_status == MPG123_NEW_FORMAT) {
continue;
} else if (decode_status == MPG123_OK) {
usedlen = dlen;
break;
} else if (decode_status == MPG123_DONE || (context->eof && decode_status == MPG123_NEED_MORE)) {
context->eof++;
goto end;
} else if (decode_status == MPG123_ERR || decode_status > 0) {
if (++context->mp3err >= 5) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n");
context->eof++;
goto end;
}
continue;
}
context->mp3err = 0;
usedlen += dlen;
out += dlen;
outlen -= dlen;
dlen = 0;
} while (decode_status != MPG123_NEED_MORE);
if (context->audio_buffer) {
switch_buffer_write(context->audio_buffer, context->decode_buf, usedlen);
} else {
goto error;
} }
if (done) { context->mp3err = 0;
break;
} switch_buffer_write(context->audio_buffer, context->decode_buf, usedlen);
} }
end: return;
used = switch_buffer_inuse(context->audio_buffer);
if (context->eof || done || used >= bytes) {
if (!(rb = switch_buffer_read(context->audio_buffer, data, bytes))) {
goto error;
}
return rb;
}
return 0;
error: error:
switch_mutex_lock(context->audio_mutex); switch_mutex_lock(context->audio_mutex);
context->err++; context->err++;
switch_mutex_unlock(context->audio_mutex); switch_mutex_unlock(context->audio_mutex);
return 0;
} }
#define error_check() if (context->err) goto error;
static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
{ {
register unsigned int realsize = (unsigned int) (size * nmemb); register unsigned int realsize = (unsigned int) (size * nmemb);
shout_context_t *context = data; shout_context_t *context = data;
int decode_status = 0; int decode_status = 0;
size_t dlen = 0; size_t usedlen;
int x = 0;
unsigned char *in;
int inlen;
unsigned char *out;
int outlen;
int usedlen;
uint32_t used, buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after uint32_t used, buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after
first 64k buffer is dry */ first 64k buffer is dry */
in = ptr;
inlen = realsize;
out = context->decode_buf;
outlen = sizeof(context->decode_buf);
usedlen = 0;
error_check();
if (context->prebuf) { if (context->prebuf) {
buf_size = context->prebuf; buf_size = context->prebuf;
} }
/* make sure we aren't over zealous by slowing down the stream when the buffer is too full */ /* make sure we aren't over zealous by slowing down the stream when the buffer is too full */
for (;;) { while (!context->err) {
error_check();
switch_mutex_lock(context->audio_mutex); switch_mutex_lock(context->audio_mutex);
if (!context->audio_buffer) {
context->err++;
break;
}
used = switch_buffer_inuse(context->audio_buffer); used = switch_buffer_inuse(context->audio_buffer);
switch_mutex_unlock(context->audio_mutex); switch_mutex_unlock(context->audio_mutex);
@ -448,55 +351,36 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
switch_yield(500000); switch_yield(500000);
} }
error_check(); if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) {
goto error;
}
do { do {
decode_status = mpg123_decode(context->mh, in, inlen, out, outlen, &dlen); usedlen = 0;
error_check(); decode_status = mpg123_read(context->mh, context->decode_buf, sizeof(context->decode_buf), &usedlen);
if (!x) {
in = NULL;
inlen = 0;
x++;
}
if (decode_status == MPG123_NEW_FORMAT) { if (decode_status == MPG123_NEW_FORMAT) {
continue; continue;
} else if (decode_status == MPG123_OK) { } else if (decode_status == MPG123_OK || decode_status == MPG123_NEED_MORE) {
usedlen = dlen; ;
break; } else if (decode_status == MPG123_DONE) {
} else if (decode_status == MPG123_ERR) { context->eof++;
} else if (decode_status == MPG123_ERR || decode_status > 0) {
if (++context->mp3err >= 20) { if (++context->mp3err >= 5) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n");
context->eof++;
goto error; goto error;
} }
continue;
mpg123_delete(context->mh);
context->mh = our_mpg123_new(NULL, NULL);
mpg123_open_feed(context->mh);
mpg123_param(context->mh, MPG123_FORCE_RATE, context->samplerate, 0);
mpg123_param(context->mh, MPG123_FLAGS, MPG123_MONO_MIX, 0);
mpg123_param(context->mh, MPG123_FLAGS, MPG123_SEEKBUFFER|MPG123_MONO_MIX, 0);
return realsize;
} }
context->mp3err = 0; context->mp3err = 0;
usedlen += dlen;
out += dlen;
outlen -= dlen;
dlen = 0;
} while (decode_status != MPG123_NEED_MORE);
switch_mutex_lock(context->audio_mutex);
switch_mutex_lock(context->audio_mutex);
if (context->audio_buffer) {
switch_buffer_write(context->audio_buffer, context->decode_buf, usedlen); switch_buffer_write(context->audio_buffer, context->decode_buf, usedlen);
} else { switch_mutex_unlock(context->audio_mutex);
goto error; } while (!context->err && !context->eof && decode_status != MPG123_NEED_MORE);
}
switch_mutex_unlock(context->audio_mutex);
return realsize; return realsize;
@ -537,9 +421,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
curl_easy_cleanup(curl_handle); curl_easy_cleanup(curl_handle);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n");
switch_mutex_lock(context->audio_mutex); context->eof++;
context->err++;
switch_mutex_unlock(context->audio_mutex);
context->thread_running = 0; context->thread_running = 0;
switch_thread_rwlock_unlock(context->rwlock); switch_thread_rwlock_unlock(context->rwlock);
return NULL; return NULL;
@ -549,18 +431,31 @@ static void launch_read_stream_thread(shout_context_t *context)
{ {
switch_thread_t *thread; switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL; switch_threadattr_t *thd_attr = NULL;
int sanity = 10;
if (context->err) { size_t used;
return;
}
context->thread_running = 1; context->thread_running = 1;
switch_threadattr_create(&thd_attr, context->memory_pool); switch_threadattr_create(&thd_attr, context->memory_pool);
switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool); switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool);
while (context->thread_running && --sanity) {
/* at least 1s of audio and up to 5s initialize */
switch_mutex_lock(context->audio_mutex);
used = switch_buffer_inuse(context->audio_buffer);
switch_mutex_unlock(context->audio_mutex);
if (used >= (2 * context->samplerate)) {
break;
}
switch_yield(500000);
}
} }
#define error_check() if (context->err) goto error;
static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, void *obj)
{ {
shout_context_t *context = (shout_context_t *) obj; shout_context_t *context = (shout_context_t *) obj;
@ -677,7 +572,6 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
char *username, *password, *port; char *username, *password, *port;
char *err = NULL; char *err = NULL;
int portno = 0; int portno = 0;
int sanity = 0;
if ((context = switch_core_alloc(handle->memory_pool, sizeof(*context))) == 0) { if ((context = switch_core_alloc(handle->memory_pool, sizeof(*context))) == 0) {
return SWITCH_STATUS_MEMERR; return SWITCH_STATUS_MEMERR;
@ -695,35 +589,35 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
switch_thread_rwlock_rdlock(context->rwlock); switch_thread_rwlock_rdlock(context->rwlock);
switch_mutex_init(&context->audio_mutex, SWITCH_MUTEX_NESTED, context->memory_pool);
if (switch_test_flag(handle, SWITCH_FILE_FLAG_READ)) { if (switch_test_flag(handle, SWITCH_FILE_FLAG_READ)) {
if (switch_buffer_create_dynamic(&context->audio_buffer, TC_BUFFER_SIZE, TC_BUFFER_SIZE * 2, 0) != SWITCH_STATUS_SUCCESS) { if (switch_buffer_create_dynamic(&context->audio_buffer, TC_BUFFER_SIZE, TC_BUFFER_SIZE * 2, 0) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
goto error; goto error;
} }
switch_mutex_init(&context->audio_mutex, SWITCH_MUTEX_NESTED, context->memory_pool);
context->mh = our_mpg123_new(NULL, NULL); context->mh = our_mpg123_new(NULL, NULL);
mpg123_open_feed(context->mh);
mpg123_format_all(context->mh); mpg123_format_all(context->mh);
mpg123_param(context->mh, MPG123_FORCE_RATE, context->samplerate, 0); mpg123_param(context->mh, MPG123_FORCE_RATE, context->samplerate, 0);
if (handle->handler) { if (handle->handler) {
mpg123_param(context->mh, MPG123_FLAGS, MPG123_SEEKBUFFER|MPG123_MONO_MIX, 0); mpg123_param(context->mh, MPG123_FLAGS, MPG123_SEEKBUFFER|MPG123_MONO_MIX, 0);
if (mpg123_open_feed(context->mh) != MPG123_OK) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening mpg feed\n");
goto error;
}
context->stream_url = switch_core_sprintf(context->memory_pool, "http://%s", path); context->stream_url = switch_core_sprintf(context->memory_pool, "http://%s", path);
context->prebuf = handle->prebuf; context->prebuf = handle->prebuf;
launch_read_stream_thread(context); launch_read_stream_thread(context);
while((switch_buffer_inuse(context->audio_buffer) < (switch_size_t)(2 * context->samplerate)) && ++sanity < 10) {
/* at least 1s of audio and up to 5s initialize */
switch_yield(500000);
}
} else { } else {
handle->seekable = 1;
mpg123_param(context->mh, MPG123_FLAGS, MPG123_MONO_MIX, 0); mpg123_param(context->mh, MPG123_FLAGS, MPG123_MONO_MIX, 0);
if (switch_file_open(&context->fd, path, SWITCH_FOPEN_READ, SWITCH_FPROT_UREAD | SWITCH_FPROT_UWRITE, handle->memory_pool) != if (mpg123_open(context->mh, path) != MPG123_OK) {
SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening %s\n", path); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening %s\n", path);
goto error; goto error;
} }
} }
} else if (switch_test_flag(handle, SWITCH_FILE_FLAG_WRITE)) { } else if (switch_test_flag(handle, SWITCH_FILE_FLAG_WRITE)) {
if (!(context->gfp = lame_init())) { if (!(context->gfp = lame_init())) {
@ -759,7 +653,6 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
goto error; goto error;
} }
switch_mutex_init(&context->audio_mutex, SWITCH_MUTEX_NESTED, context->memory_pool);
lame_set_bWriteVbrTag(context->gfp, 0); lame_set_bWriteVbrTag(context->gfp, 0);
lame_mp3_tags_fid(context->gfp, NULL); lame_mp3_tags_fid(context->gfp, NULL);
@ -850,7 +743,6 @@ static switch_status_t shout_file_open(switch_file_handle_t *handle, const char
} }
} else { } else {
handle->seekable = 1;
/* lame being lame and all has FILE * coded into it's API for some functions so we gotta use it */ /* lame being lame and all has FILE * coded into it's API for some functions so we gotta use it */
if (!(context->fp = fopen(path, "wb+"))) { if (!(context->fp = fopen(path, "wb+"))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening %s\n", path); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error opening %s\n", path);
@ -891,29 +783,17 @@ static switch_status_t shout_file_seek(switch_file_handle_t *handle, unsigned in
{ {
shout_context_t *context = handle->private_info; shout_context_t *context = handle->private_info;
if (handle->handler) { if (handle->handler || switch_test_flag(handle, SWITCH_FILE_FLAG_WRITE)) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} else { } else {
switch_mutex_lock(context->audio_mutex); if (whence == SWITCH_SEEK_CUR) {
if (context->audio_buffer) { samples -= switch_buffer_inuse(context->audio_buffer) / sizeof(int16_t);
if (context->fd) {
switch_file_seek(context->fd, whence, &samples);
} else if (context->fp) {
*cur_sample = fseek(context->fp, *cur_sample, whence);
}
mpg123_delete(context->mh);
context->mh = our_mpg123_new(NULL, NULL);
mpg123_open_feed(context->mh);
mpg123_param(context->mh, MPG123_FORCE_RATE, context->samplerate, 0);
mpg123_param(context->mh, MPG123_FLAGS, MPG123_MONO_MIX, 0);
switch_buffer_zero(context->audio_buffer);
} else {
context->err++;
} }
switch_mutex_unlock(context->audio_mutex);
return SWITCH_STATUS_SUCCESS; switch_buffer_zero(context->audio_buffer);
*cur_sample = mpg123_seek (context->mh, samples, whence);
return *cur_sample >= 0 ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
} }
} }
@ -924,26 +804,18 @@ static switch_status_t shout_file_read(switch_file_handle_t *handle, void *data,
*len = 0; *len = 0;
if (!context || context->err) { if (!context) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (context->fd) { if (!handle->handler)
rb = decode_fd(context, data, bytes); decode_fd(context, data, bytes);
} else {
switch_mutex_lock(context->audio_mutex);
if (context->audio_buffer) {
rb = switch_buffer_read(context->audio_buffer, data, bytes);
} else {
switch_mutex_lock(context->audio_mutex);
context->err++;
switch_mutex_unlock(context->audio_mutex);
}
switch_mutex_unlock(context->audio_mutex);
}
if (context->err) { switch_mutex_lock(context->audio_mutex);
rb = switch_buffer_read(context->audio_buffer, data, bytes);
switch_mutex_unlock(context->audio_mutex);
if (!rb && (context->eof || context->err)) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }