add prefetch API to mod_http_cache

This commit is contained in:
Christopher Rienzo 2012-01-25 15:07:59 +00:00
parent 5783078401
commit b55a6b158f
1 changed files with 131 additions and 1 deletions

View File

@ -40,6 +40,7 @@ SWITCH_STANDARD_API(http_cache_get);
SWITCH_STANDARD_API(http_cache_put);
SWITCH_STANDARD_API(http_cache_tryget);
SWITCH_STANDARD_API(http_cache_clear);
SWITCH_STANDARD_API(http_cache_prefetch);
#define DOWNLOAD_NEEDED "download"
@ -148,6 +149,16 @@ struct url_cache {
int misses;
/** Number of cache errors */
int errors;
/** The prefetch queue */
switch_queue_t *prefetch_queue;
/** Max size of prefetch queue */
int prefetch_queue_size;
/** Size of prefetch thread pool */
int prefetch_thread_count;
/** Shutdown flag */
int shutdown;
/** Synchronizes shutdown of cache */
switch_thread_rwlock_t *shutdown_lock;
};
static url_cache_t gcache;
@ -778,6 +789,48 @@ static void setup_dir(url_cache_t *cache)
}
}
static int isUrl(const char *filename)
{
return !zstr(filename) && !strncmp("http://", filename, strlen("http://"));
}
#define HTTP_PREFETCH_SYNTAX "<url>"
SWITCH_STANDARD_API(http_cache_prefetch)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_memory_pool_t *lpool = NULL;
switch_memory_pool_t *pool = NULL;
char *url;
if (!isUrl(cmd)) {
stream->write_function(stream, "USAGE: %s\n", HTTP_PREFETCH_SYNTAX);
return SWITCH_STATUS_SUCCESS;
}
if (session) {
pool = switch_core_session_get_pool(session);
} else {
switch_core_new_memory_pool(&lpool);
pool = lpool;
}
/* send to thread pool */
url = strdup(cmd);
if (switch_queue_trypush(gcache.prefetch_queue, url) != SWITCH_STATUS_SUCCESS) {
switch_safe_free(url);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue prefetch request\n");
stream->write_function(stream, "-ERR\n");
} else {
stream->write_function(stream, "+OK\n");
}
if (lpool) {
switch_core_destroy_memory_pool(&lpool);
}
return status;
}
#define HTTP_GET_SYNTAX "<url>"
/**
* Get a file from the cache, download if it isn't cached
@ -789,7 +842,7 @@ SWITCH_STANDARD_API(http_cache_get)
switch_memory_pool_t *pool = NULL;
char *filename;
if (zstr(cmd) || strncmp("http://", cmd, strlen("http://"))) {
if (!isUrl(cmd)) {
stream->write_function(stream, "USAGE: %s\n", HTTP_GET_SYNTAX);
return SWITCH_STATUS_SUCCESS;
}
@ -911,6 +964,44 @@ SWITCH_STANDARD_API(http_cache_clear)
return SWITCH_STATUS_SUCCESS;
}
/**
* Thread to prefetch URLs
* @param thread the thread
* @param obj started flag
* @return NULL
*/
static void *SWITCH_THREAD_FUNC prefetch_thread(switch_thread_t *thread, void *obj)
{
int *started = obj;
void *url = NULL;
switch_thread_rwlock_rdlock(gcache.shutdown_lock);
*started = 1;
// process prefetch requests
while (!gcache.shutdown) {
if (switch_queue_pop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) {
switch_stream_handle_t stream = { 0 };
SWITCH_STANDARD_STREAM(stream);
switch_api_execute("http_get", url, NULL, &stream);
switch_safe_free(stream.data);
switch_safe_free(url);
}
url = NULL;
}
// shutting down- clear the queue
while (switch_queue_trypop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) {
switch_safe_free(url);
url = NULL;
}
switch_thread_rwlock_unlock(gcache.shutdown_lock);
return NULL;
}
/**
* Configure the module
* @param cache to configure
@ -933,6 +1024,8 @@ static switch_status_t do_config(url_cache_t *cache)
max_urls = 4000;
default_max_age_sec = 86400;
cache->location = SWITCH_PREFIX_DIR "/http_cache";
cache->prefetch_queue_size = 100;
cache->prefetch_thread_count = 8;
/* get params */
settings = switch_xml_child(cfg, "settings");
@ -949,6 +1042,12 @@ static switch_status_t do_config(url_cache_t *cache)
} else if (!strcasecmp(var, "default-max-age")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting default-max-age to %s\n", val);
default_max_age_sec = atoi(val);
} else if (!strcasecmp(var, "prefetch-queue-size")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-queue-size to %s\n", val);
cache->prefetch_queue_size = atoi(val);
} else if (!strcasecmp(var, "prefetch-thread-count")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-thread-count to %s\n", val);
cache->prefetch_thread_count = atoi(val);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var);
}
@ -971,6 +1070,16 @@ static switch_status_t do_config(url_cache_t *cache)
status = SWITCH_STATUS_TERM;
goto done;
}
if (cache->prefetch_queue_size <= 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-queue-size must be > 0\n");
status = SWITCH_STATUS_TERM;
goto done;
}
if (cache->prefetch_thread_count <= 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-thread-count must be > 0\n");
status = SWITCH_STATUS_TERM;
goto done;
}
cache->max_url = max_urls;
cache->default_max_age = (default_max_age_sec * 1000 * 1000); /* convert from seconds to nanoseconds */
@ -992,6 +1101,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
SWITCH_ADD_API(api, "http_tryget", "HTTP GET from cache only", http_cache_tryget, HTTP_GET_SYNTAX);
SWITCH_ADD_API(api, "http_put", "HTTP PUT", http_cache_put, HTTP_PUT_SYNTAX);
SWITCH_ADD_API(api, "http_clear_cache", "Clear the cache", http_cache_clear, HTTP_CACHE_CLEAR_SYNTAX);
SWITCH_ADD_API(api, "http_prefetch", "Prefetch document in a background thread. Use http_get to get the prefetched document", http_cache_prefetch, HTTP_PREFETCH_SYNTAX);
memset(&gcache, 0, sizeof(url_cache_t));
gcache.pool = pool;
@ -1002,6 +1112,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
switch_core_hash_init(&gcache.map, gcache.pool);
switch_mutex_init(&gcache.mutex, SWITCH_MUTEX_UNNESTED, gcache.pool);
switch_thread_rwlock_create(&gcache.shutdown_lock, gcache.pool);
/* create the queue */
gcache.queue.max_size = gcache.max_url;
@ -1011,6 +1122,21 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
setup_dir(&gcache);
/* Start the prefetch threads */
switch_queue_create(&gcache.prefetch_queue, gcache.prefetch_queue_size, gcache.pool);
for (int i = 0; i < gcache.prefetch_thread_count; i++) {
int started = 0;
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_threadattr_create(&thd_attr, gcache.pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, prefetch_thread, &started, gcache.pool);
while (!started) {
switch_sleep(1000);
}
}
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
@ -1020,6 +1146,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
*/
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_http_cache_shutdown)
{
gcache.shutdown = 1;
switch_queue_interrupt_all(gcache.prefetch_queue);
switch_thread_rwlock_wrlock(gcache.shutdown_lock);
url_cache_clear(&gcache, NULL);
switch_core_hash_destroy(&gcache.map);
switch_mutex_destroy(gcache.mutex);