FS-9820 #resolve [Add thread_pool to libks]

This commit is contained in:
Anthony Minessale
2016-12-06 16:46:08 -06:00
parent b86f7a6d30
commit b6df83dd10
8 changed files with 339 additions and 4 deletions

View File

@@ -115,6 +115,7 @@ KS_DECLARE(void) ks_random_string(char *buf, uint16_t len, char *set);
#include "ks_printf.h"
#include "ks_json.h"
#include "ks_threadmutex.h"
#include "ks_thread_pool.h"
#include "ks_hash.h"
#include "ks_config.h"
#include "ks_q.h"

View File

@@ -0,0 +1,58 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _KS_THREAD_POOL_H_
#define _KS_THREAD_POOL_H_
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) ks_thread_pool_create(ks_thread_pool_t **tp, uint32_t min, uint32_t max, size_t stack_size,
ks_thread_priority_t priority, uint32_t idle_sec);
KS_DECLARE(ks_status_t) ks_thread_pool_destroy(ks_thread_pool_t **tp);
KS_DECLARE(ks_status_t) ks_thread_pool_add_job(ks_thread_pool_t *tp, ks_thread_function_t func, void *data);
KS_DECLARE(ks_size_t) ks_thread_pool_backlog(ks_thread_pool_t *tp);
KS_END_EXTERN_C
#endif /* defined(_KS_THREAD_POOL_H_) */
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@@ -210,6 +210,8 @@ struct ks_q_s;
typedef struct ks_q_s ks_q_t;
typedef void (*ks_flush_fn_t)(ks_q_t *q, void *ptr, void *flush_data);
typedef struct ks_thread_pool_s ks_thread_pool_t;
KS_END_EXTERN_C
#endif /* defined(_KS_TYPES_H_) */

View File

@@ -393,10 +393,14 @@ KS_DECLARE(ks_status_t) ks_cond_timedwait(ks_cond_t *cond, ks_time_t ms)
#else
struct timespec ts;
ks_time_t n = ks_time_now() + (ms * 1000);
int r = 0;
ts.tv_sec = ks_time_sec(n);
ts.tv_nsec = ks_time_nsec(n);
if (pthread_cond_timedwait(&cond->cond, &cond->mutex->mutex, &ts)) {
switch(errno) {
r = pthread_cond_timedwait(&cond->cond, &cond->mutex->mutex, &ts);
if (r) {
switch(r) {
case ETIMEDOUT:
return KS_STATUS_TIMEOUT;
default:

View File

@@ -0,0 +1,255 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <ks.h>
#define TP_MAX_QLEN 1024
typedef enum {
TP_STATE_DOWN = 0,
TP_STATE_RUNNING = 1
} ks_thread_pool_state_t;
struct ks_thread_pool_s {
uint32_t min;
uint32_t max;
uint32_t idle_sec;
size_t stack_size;
ks_thread_priority_t priority;
ks_q_t *q;
uint32_t thread_count;
uint32_t busy_thread_count;
uint32_t running_thread_count;
uint32_t dying_thread_count;
ks_thread_pool_state_t state;
ks_mutex_t *mutex;
ks_pool_t *pool;
};
typedef struct ks_thread_job_s {
ks_thread_function_t func;
void *data;
} ks_thread_job_t;
static void *worker_thread(ks_thread_t *thread, void *data);
static int check_queue(ks_thread_pool_t *tp, ks_bool_t adding)
{
ks_thread_t *thread;
int need = 0;
ks_mutex_lock(tp->mutex);
if (tp->state != TP_STATE_RUNNING) {
ks_mutex_unlock(tp->mutex);
return 1;
}
if (tp->thread_count < tp->min) {
need = tp->min - tp->thread_count;
}
if (adding) {
if (!need && tp->busy_thread_count >= tp->running_thread_count - tp->dying_thread_count &&
(tp->thread_count - tp->dying_thread_count + 1 <= tp->max)) {
need = 1;
}
}
tp->thread_count += need;
ks_mutex_unlock(tp->mutex);
while(need > 0) {
if (ks_thread_create_ex(&thread, worker_thread, tp, KS_THREAD_FLAG_DETATCHED, tp->stack_size, tp->priority, tp->pool) != KS_STATUS_SUCCESS) {
ks_mutex_lock(tp->mutex);
tp->thread_count--;
ks_mutex_unlock(tp->mutex);
}
need--;
}
ks_log(KS_LOG_DEBUG, "WORKER check: adding %d need %d running %d dying %d total %d max %d\n",
adding, need, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max);
return need;
}
static uint32_t TID = 0;
static void *worker_thread(ks_thread_t *thread, void *data)
{
ks_thread_pool_t *tp = (ks_thread_pool_t *) data;
uint32_t idle_sec = 0;
uint32_t my_id = 0;
int die = 0;
ks_mutex_lock(tp->mutex);
tp->running_thread_count++;
my_id = ++TID;
ks_mutex_unlock(tp->mutex);
while(tp->state == TP_STATE_RUNNING) {
ks_thread_job_t *job;
void *pop = NULL;
ks_status_t status;
status = ks_q_pop_timeout(tp->q, &pop, 1000);
ks_log(KS_LOG_DEBUG, "WORKER %d idle_sec %d running %d dying %d total %d max %d\n",
my_id, idle_sec, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max);
check_queue(tp, KS_FALSE);
if (status == KS_STATUS_TIMEOUT) {
idle_sec++;
if (idle_sec >= tp->idle_sec) {
ks_mutex_lock(tp->mutex);
if (tp->running_thread_count - tp->dying_thread_count - tp->busy_thread_count > tp->min) {
tp->dying_thread_count++;
die = 1;
}
ks_mutex_unlock(tp->mutex);
if (die) {
ks_log(KS_LOG_DEBUG, "WORKER %d IDLE TIMEOUT\n", my_id);
break;
}
}
continue;
}
if ((status != KS_STATUS_SUCCESS && status != KS_STATUS_BREAK) || !pop) {
ks_log(KS_LOG_DEBUG, "WORKER %d POP FAIL %d %p\n", my_id, status, (void *)pop);
break;
}
job = (ks_thread_job_t *) pop;
ks_mutex_lock(tp->mutex);
tp->busy_thread_count++;
ks_mutex_unlock(tp->mutex);
idle_sec = 0;
job->func(thread, job->data);
ks_pool_free(tp->pool, job);
ks_mutex_lock(tp->mutex);
tp->busy_thread_count--;
ks_mutex_unlock(tp->mutex);
}
ks_mutex_lock(tp->mutex);
tp->running_thread_count--;
tp->thread_count--;
if (die) {
tp->dying_thread_count--;
}
ks_mutex_unlock(tp->mutex);
return NULL;
}
KS_DECLARE(ks_status_t) ks_thread_pool_create(ks_thread_pool_t **tp, uint32_t min, uint32_t max, size_t stack_size,
ks_thread_priority_t priority, uint32_t idle_sec)
{
ks_pool_t *pool;
ks_pool_open(&pool);
*tp = (ks_thread_pool_t *) ks_pool_alloc(pool, sizeof(ks_thread_t));
(*tp)->min = min;
(*tp)->max = max;
(*tp)->pool = pool;
(*tp)->stack_size = stack_size;
(*tp)->priority = priority;
(*tp)->state = TP_STATE_RUNNING;
(*tp)->idle_sec = idle_sec;
ks_mutex_create(&(*tp)->mutex, KS_MUTEX_FLAG_DEFAULT, (*tp)->pool);
ks_q_create(&(*tp)->q, (*tp)->pool, TP_MAX_QLEN);
check_queue(*tp, KS_FALSE);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_thread_pool_destroy(ks_thread_pool_t **tp)
{
ks_pool_t *pool;
ks_assert(tp);
(*tp)->state = TP_STATE_DOWN;
while((*tp)->thread_count) {
ks_sleep(100000);
}
pool = (*tp)->pool;
ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_thread_pool_add_job(ks_thread_pool_t *tp, ks_thread_function_t func, void *data)
{
ks_thread_job_t *job = (ks_thread_job_t *) ks_pool_alloc(tp->pool, sizeof(*job));
job->func = func;
job->data = data;
ks_q_push(tp->q, job);
check_queue(tp, KS_TRUE);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_size_t) ks_thread_pool_backlog(ks_thread_pool_t *tp)
{
return ks_q_size(tp->q);
}