mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-04-17 01:02:12 +00:00
FS-7621
This commit is contained in:
parent
f8853ac620
commit
b3d2c92101
@ -117,7 +117,6 @@ struct shout_context {
|
|||||||
int dlen;
|
int dlen;
|
||||||
FILE *fp;
|
FILE *fp;
|
||||||
size_t samplerate;
|
size_t samplerate;
|
||||||
uint8_t thread_running;
|
|
||||||
uint8_t shout_init;
|
uint8_t shout_init;
|
||||||
uint32_t prebuf;
|
uint32_t prebuf;
|
||||||
int lame_ready;
|
int lame_ready;
|
||||||
@ -128,6 +127,9 @@ struct shout_context {
|
|||||||
switch_size_t mp3buflen;
|
switch_size_t mp3buflen;
|
||||||
switch_thread_rwlock_t *rwlock;
|
switch_thread_rwlock_t *rwlock;
|
||||||
int buffer_seconds;
|
int buffer_seconds;
|
||||||
|
switch_thread_t *read_stream_thread;
|
||||||
|
switch_thread_t *write_stream_thread;
|
||||||
|
curl_socket_t curlfd;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct shout_context shout_context_t;
|
typedef struct shout_context shout_context_t;
|
||||||
@ -137,6 +139,7 @@ static void decode_fd(shout_context_t *context, void *data, size_t bytes);
|
|||||||
static inline void free_context(shout_context_t *context)
|
static inline void free_context(shout_context_t *context)
|
||||||
{
|
{
|
||||||
size_t ret;
|
size_t ret;
|
||||||
|
switch_status_t st;
|
||||||
|
|
||||||
if (context) {
|
if (context) {
|
||||||
switch_mutex_lock(context->audio_mutex);
|
switch_mutex_lock(context->audio_mutex);
|
||||||
@ -144,16 +147,17 @@ static inline void free_context(shout_context_t *context)
|
|||||||
switch_mutex_unlock(context->audio_mutex);
|
switch_mutex_unlock(context->audio_mutex);
|
||||||
|
|
||||||
if (context->stream_url) {
|
if (context->stream_url) {
|
||||||
int sanity = 0;
|
if (context->curlfd > -1) {
|
||||||
|
shutdown(context->curlfd, 2);
|
||||||
while (context->thread_running) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url);
|
|
||||||
switch_yield(500000);
|
|
||||||
if (++sanity > 10) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Giving up waiting for stream to terminate: %s\n", context->stream_url);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url);
|
||||||
|
if (context->read_stream_thread) {
|
||||||
|
switch_thread_join(&st, context->read_stream_thread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (context->write_stream_thread) {
|
||||||
|
switch_thread_join(&st, context->write_stream_thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_thread_rwlock_wrlock(context->rwlock);
|
switch_thread_rwlock_wrlock(context->rwlock);
|
||||||
@ -368,6 +372,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
|
|||||||
uint32_t buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after first 64k buffer is dry */
|
uint32_t buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after first 64k buffer is dry */
|
||||||
switch_size_t used;
|
switch_size_t used;
|
||||||
|
|
||||||
|
if (context->err) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
if (!context->stream_channels) {
|
if (!context->stream_channels) {
|
||||||
long rate = 0;
|
long rate = 0;
|
||||||
int channels = 0;
|
int channels = 0;
|
||||||
@ -398,6 +406,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
|
|||||||
switch_yield(500000);
|
switch_yield(500000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (context->err) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) {
|
if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
@ -442,6 +454,22 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int progress_callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow)
|
||||||
|
{
|
||||||
|
shout_context_t *context = (shout_context_t *) clientp;
|
||||||
|
return context->err;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int sockopt_callback(void *clientp, curl_socket_t curlfd,
|
||||||
|
curlsocktype purpose)
|
||||||
|
{
|
||||||
|
shout_context_t *context = (shout_context_t *) clientp;
|
||||||
|
|
||||||
|
context->curlfd = curlfd;
|
||||||
|
|
||||||
|
return CURL_SOCKOPT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
#define MY_BUF_LEN 1024 * 32
|
#define MY_BUF_LEN 1024 * 32
|
||||||
#define MY_BLOCK_SIZE MY_BUF_LEN
|
#define MY_BLOCK_SIZE MY_BUF_LEN
|
||||||
@ -452,9 +480,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
|
|||||||
shout_context_t *context = (shout_context_t *) obj;
|
shout_context_t *context = (shout_context_t *) obj;
|
||||||
|
|
||||||
switch_thread_rwlock_rdlock(context->rwlock);
|
switch_thread_rwlock_rdlock(context->rwlock);
|
||||||
|
context->curlfd = -1;
|
||||||
curl_handle = switch_curl_easy_init();
|
curl_handle = switch_curl_easy_init();
|
||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url);
|
switch_curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_PROGRESSFUNCTION, progress_callback);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_PROGRESSDATA, (void *)context);
|
||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1);
|
switch_curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1);
|
||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 10);
|
switch_curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 10);
|
||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, stream_callback);
|
switch_curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, stream_callback);
|
||||||
@ -465,7 +495,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
|
|||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, 100); /* handle trickle connections */
|
switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, 100); /* handle trickle connections */
|
||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, 30);
|
switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, 30);
|
||||||
switch_curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, context->curl_error_buff);
|
switch_curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, context->curl_error_buff);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTDATA, (void *)context);
|
||||||
|
|
||||||
cc = switch_curl_easy_perform(curl_handle);
|
cc = switch_curl_easy_perform(curl_handle);
|
||||||
|
|
||||||
if (cc && cc != CURLE_WRITE_ERROR) { /* write error is ok, we just exited from callback early */
|
if (cc && cc != CURLE_WRITE_ERROR) { /* write error is ok, we just exited from callback early */
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "CURL returned error:[%d] %s : %s [%s]\n", cc, switch_curl_easy_strerror(cc),
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "CURL returned error:[%d] %s : %s [%s]\n", cc, switch_curl_easy_strerror(cc),
|
||||||
context->curl_error_buff, context->stream_url);
|
context->curl_error_buff, context->stream_url);
|
||||||
@ -474,21 +508,17 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
|
|||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n");
|
||||||
|
|
||||||
context->eof++;
|
context->eof++;
|
||||||
context->thread_running = 0;
|
|
||||||
switch_thread_rwlock_unlock(context->rwlock);
|
switch_thread_rwlock_unlock(context->rwlock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void launch_read_stream_thread(shout_context_t *context)
|
static void launch_read_stream_thread(shout_context_t *context)
|
||||||
{
|
{
|
||||||
switch_thread_t *thread;
|
|
||||||
switch_threadattr_t *thd_attr = NULL;
|
switch_threadattr_t *thd_attr = NULL;
|
||||||
|
|
||||||
context->thread_running = 1;
|
|
||||||
switch_threadattr_create(&thd_attr, context->memory_pool);
|
switch_threadattr_create(&thd_attr, context->memory_pool);
|
||||||
switch_threadattr_detach_set(thd_attr, 1);
|
|
||||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||||
switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool);
|
switch_thread_create(&context->read_stream_thread, thd_attr, read_stream_thread, context, context->memory_pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define error_check() if (context->err) goto error;
|
#define error_check() if (context->err) goto error;
|
||||||
@ -499,20 +529,13 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi
|
|||||||
|
|
||||||
switch_thread_rwlock_rdlock(context->rwlock);
|
switch_thread_rwlock_rdlock(context->rwlock);
|
||||||
|
|
||||||
if (context->thread_running) {
|
|
||||||
context->thread_running++;
|
|
||||||
} else {
|
|
||||||
switch_thread_rwlock_unlock(context->rwlock);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!context->lame_ready) {
|
if (!context->lame_ready) {
|
||||||
lame_init_params(context->gfp);
|
lame_init_params(context->gfp);
|
||||||
lame_print_config(context->gfp);
|
lame_print_config(context->gfp);
|
||||||
context->lame_ready = 1;
|
context->lame_ready = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!context->err && context->thread_running) {
|
while (!context->err) {
|
||||||
unsigned char mp3buf[20480] = "";
|
unsigned char mp3buf[20480] = "";
|
||||||
int16_t audio[9600] = { 0 };
|
int16_t audio[9600] = { 0 };
|
||||||
switch_size_t audio_read = 0;
|
switch_size_t audio_read = 0;
|
||||||
@ -575,31 +598,21 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi
|
|||||||
error:
|
error:
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n");
|
||||||
switch_thread_rwlock_unlock(context->rwlock);
|
switch_thread_rwlock_unlock(context->rwlock);
|
||||||
context->thread_running = 0;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void launch_write_stream_thread(shout_context_t *context)
|
static void launch_write_stream_thread(shout_context_t *context)
|
||||||
{
|
{
|
||||||
switch_thread_t *thread;
|
|
||||||
switch_threadattr_t *thd_attr = NULL;
|
switch_threadattr_t *thd_attr = NULL;
|
||||||
int sanity = 10;
|
|
||||||
|
|
||||||
if (context->err) {
|
if (context->err) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
context->thread_running = 1;
|
|
||||||
switch_threadattr_create(&thd_attr, context->memory_pool);
|
switch_threadattr_create(&thd_attr, context->memory_pool);
|
||||||
switch_threadattr_detach_set(thd_attr, 1);
|
|
||||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||||
switch_thread_create(&thread, thd_attr, write_stream_thread, context, context->memory_pool);
|
switch_thread_create(&context->write_stream_thread, thd_attr, write_stream_thread, context, context->memory_pool);
|
||||||
|
|
||||||
while (context->thread_running && context->thread_running != 2) {
|
|
||||||
switch_yield(100000);
|
|
||||||
if (!--sanity)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#define TC_BUFFER_SIZE 1024 * 32
|
#define TC_BUFFER_SIZE 1024 * 32
|
||||||
|
Loading…
x
Reference in New Issue
Block a user