From b55a6b158f8f62757f236df4e71eed2a6fe1e321 Mon Sep 17 00:00:00 2001 From: Christopher Rienzo Date: Wed, 25 Jan 2012 15:07:59 +0000 Subject: [PATCH] add prefetch API to mod_http_cache --- .../mod_http_cache/mod_http_cache.c | 132 +++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/src/mod/applications/mod_http_cache/mod_http_cache.c b/src/mod/applications/mod_http_cache/mod_http_cache.c index b2004b5456..29cce6a4a2 100644 --- a/src/mod/applications/mod_http_cache/mod_http_cache.c +++ b/src/mod/applications/mod_http_cache/mod_http_cache.c @@ -40,6 +40,7 @@ SWITCH_STANDARD_API(http_cache_get); SWITCH_STANDARD_API(http_cache_put); SWITCH_STANDARD_API(http_cache_tryget); SWITCH_STANDARD_API(http_cache_clear); +SWITCH_STANDARD_API(http_cache_prefetch); #define DOWNLOAD_NEEDED "download" @@ -148,6 +149,16 @@ struct url_cache { int misses; /** Number of cache errors */ int errors; + /** The prefetch queue */ + switch_queue_t *prefetch_queue; + /** Max size of prefetch queue */ + int prefetch_queue_size; + /** Size of prefetch thread pool */ + int prefetch_thread_count; + /** Shutdown flag */ + int shutdown; + /** Synchronizes shutdown of cache */ + switch_thread_rwlock_t *shutdown_lock; }; static url_cache_t gcache; @@ -778,6 +789,48 @@ static void setup_dir(url_cache_t *cache) } } +static int isUrl(const char *filename) +{ + return !zstr(filename) && !strncmp("http://", filename, strlen("http://")); +} + +#define HTTP_PREFETCH_SYNTAX "" +SWITCH_STANDARD_API(http_cache_prefetch) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + switch_memory_pool_t *lpool = NULL; + switch_memory_pool_t *pool = NULL; + char *url; + + if (!isUrl(cmd)) { + stream->write_function(stream, "USAGE: %s\n", HTTP_PREFETCH_SYNTAX); + return SWITCH_STATUS_SUCCESS; + } + + if (session) { + pool = switch_core_session_get_pool(session); + } else { + switch_core_new_memory_pool(&lpool); + pool = lpool; + } + + /* send to thread pool */ + url = strdup(cmd); + if (switch_queue_trypush(gcache.prefetch_queue, url) != SWITCH_STATUS_SUCCESS) { + switch_safe_free(url); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue prefetch request\n"); + stream->write_function(stream, "-ERR\n"); + } else { + stream->write_function(stream, "+OK\n"); + } + + if (lpool) { + switch_core_destroy_memory_pool(&lpool); + } + + return status; +} + #define HTTP_GET_SYNTAX "" /** * Get a file from the cache, download if it isn't cached @@ -789,7 +842,7 @@ SWITCH_STANDARD_API(http_cache_get) switch_memory_pool_t *pool = NULL; char *filename; - if (zstr(cmd) || strncmp("http://", cmd, strlen("http://"))) { + if (!isUrl(cmd)) { stream->write_function(stream, "USAGE: %s\n", HTTP_GET_SYNTAX); return SWITCH_STATUS_SUCCESS; } @@ -911,6 +964,44 @@ SWITCH_STANDARD_API(http_cache_clear) return SWITCH_STATUS_SUCCESS; } + +/** + * Thread to prefetch URLs + * @param thread the thread + * @param obj started flag + * @return NULL + */ +static void *SWITCH_THREAD_FUNC prefetch_thread(switch_thread_t *thread, void *obj) +{ + int *started = obj; + void *url = NULL; + + switch_thread_rwlock_rdlock(gcache.shutdown_lock); + *started = 1; + + // process prefetch requests + while (!gcache.shutdown) { + if (switch_queue_pop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) { + switch_stream_handle_t stream = { 0 }; + SWITCH_STANDARD_STREAM(stream); + switch_api_execute("http_get", url, NULL, &stream); + switch_safe_free(stream.data); + switch_safe_free(url); + } + url = NULL; + } + + // shutting down- clear the queue + while (switch_queue_trypop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) { + switch_safe_free(url); + url = NULL; + } + + switch_thread_rwlock_unlock(gcache.shutdown_lock); + + return NULL; +} + /** * Configure the module * @param cache to configure @@ -933,6 +1024,8 @@ static switch_status_t do_config(url_cache_t *cache) max_urls = 4000; default_max_age_sec = 86400; cache->location = SWITCH_PREFIX_DIR "/http_cache"; + cache->prefetch_queue_size = 100; + cache->prefetch_thread_count = 8; /* get params */ settings = switch_xml_child(cfg, "settings"); @@ -949,6 +1042,12 @@ static switch_status_t do_config(url_cache_t *cache) } else if (!strcasecmp(var, "default-max-age")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting default-max-age to %s\n", val); default_max_age_sec = atoi(val); + } else if (!strcasecmp(var, "prefetch-queue-size")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-queue-size to %s\n", val); + cache->prefetch_queue_size = atoi(val); + } else if (!strcasecmp(var, "prefetch-thread-count")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-thread-count to %s\n", val); + cache->prefetch_thread_count = atoi(val); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var); } @@ -971,6 +1070,16 @@ static switch_status_t do_config(url_cache_t *cache) status = SWITCH_STATUS_TERM; goto done; } + if (cache->prefetch_queue_size <= 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-queue-size must be > 0\n"); + status = SWITCH_STATUS_TERM; + goto done; + } + if (cache->prefetch_thread_count <= 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-thread-count must be > 0\n"); + status = SWITCH_STATUS_TERM; + goto done; + } cache->max_url = max_urls; cache->default_max_age = (default_max_age_sec * 1000 * 1000); /* convert from seconds to nanoseconds */ @@ -992,6 +1101,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load) SWITCH_ADD_API(api, "http_tryget", "HTTP GET from cache only", http_cache_tryget, HTTP_GET_SYNTAX); SWITCH_ADD_API(api, "http_put", "HTTP PUT", http_cache_put, HTTP_PUT_SYNTAX); SWITCH_ADD_API(api, "http_clear_cache", "Clear the cache", http_cache_clear, HTTP_CACHE_CLEAR_SYNTAX); + SWITCH_ADD_API(api, "http_prefetch", "Prefetch document in a background thread. Use http_get to get the prefetched document", http_cache_prefetch, HTTP_PREFETCH_SYNTAX); memset(&gcache, 0, sizeof(url_cache_t)); gcache.pool = pool; @@ -1002,6 +1112,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load) switch_core_hash_init(&gcache.map, gcache.pool); switch_mutex_init(&gcache.mutex, SWITCH_MUTEX_UNNESTED, gcache.pool); + switch_thread_rwlock_create(&gcache.shutdown_lock, gcache.pool); /* create the queue */ gcache.queue.max_size = gcache.max_url; @@ -1011,6 +1122,21 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load) setup_dir(&gcache); + /* Start the prefetch threads */ + switch_queue_create(&gcache.prefetch_queue, gcache.prefetch_queue_size, gcache.pool); + for (int i = 0; i < gcache.prefetch_thread_count; i++) { + int started = 0; + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + switch_threadattr_create(&thd_attr, gcache.pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, prefetch_thread, &started, gcache.pool); + while (!started) { + switch_sleep(1000); + } + } + /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; } @@ -1020,6 +1146,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load) */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_http_cache_shutdown) { + gcache.shutdown = 1; + switch_queue_interrupt_all(gcache.prefetch_queue); + switch_thread_rwlock_wrlock(gcache.shutdown_lock); + url_cache_clear(&gcache, NULL); switch_core_hash_destroy(&gcache.map); switch_mutex_destroy(gcache.mutex);