将任务分装成独立的可执行单元(Actor),然后进行多线程并发处理;预先创建一组线程,等待任务到来时分配执行,避免了频繁创建和销毁线程的开销。
线程池核心组件:
1. 任务队列:存储待执行的任务
2. 工作线程:执行任务的线程集合
3. 线程池管理器:负责线程池的创建、销毁和任务调度核心实现:
具体实现
//threadpool.h
#ifndef THREADPOOLH_
#define THREADPOOLH_
#ifdef __cplusplus
extern "C" {
#endif
/**
* Increase this constants at your own risk
* Large values might slow down your system
*/
#define MIN_THREADS 1
#define MAX_THREADS 64
#define MAX_QUEUE 655360
typedef struct threadpool_t threadpool_t;
typedef void (*threadpool_func_f)(void *param);
typedef enum {
threadpool_succeed = 0,
threadpool_invalid = -1,
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5,
threadpool_comm_error = -6,
threadpool_thread_limit = -7,
threadpool_thread_equal = -8,
} threadpool_error_t;
typedef enum {
threadpool_thread_invalid = 0,
threadpool_thread_run = 1,
threadpool_thread_kill = 2
} threadpool_thread_status;
typedef enum {
threadpool_graceful = 1,
threadpool_priority = 2
} threadpool_flags_t;
class CThreadPool
{
public:
CThreadPool();
~CThreadPool();
int threadPoolStart(int thread_count, int queueSize);
int threadPoolSetSize(int thread_count);
int threadPoolStop();
int threadPoolAddTask(void (*routine)(void ),void arg, int flags=0);
int threadPoolTaskCount();
private:
/**
* @function threadpool_create
* @brief Creates a threadpool_t object.
* @param thread_count Number of worker threads.
* @param queue_size Size of the queue.
* @param flags Unused parameter.
* @return a newly created thread pool or NULL
*/
threadpool_t *threadpool_create(int thread_count, int queueSize, int flags = 0);
/**
* @function threadpool_add
* @brief add a new task in the queue of a thread pool
* @param pool Thread pool to which add the task.
* @param function Pointer to the function that will perform the task.
* @param argument Argument to be passed to the function.
* @param flags Unused parameter.
* @return 0 if all goes well, negative values in case of error (@see
* threadpool_error_t for codes).
*/
int threadpool_add(threadpool_t pool, void (routine)(void *),
void *arg, int flags);
/**
* @function threadpool_destroy
* @brief Stops and destroys a thread pool.
* @param pool Thread pool to destroy.
* @param flags Flags for shutdown
*
* Known values for flags are 0 (default) and threadpool_graceful in
* which case the thread pool doesn't accept any new tasks but
* processes all pending tasks before shutdown.
*/
int threadpool_destroy(threadpool_t *pool, int flags);
/**
* @function threadPool_SetSize
* @brief Increase or decrease the number of threads.
* @param pool Thread pool to operate.
* @param thread_count number of threads to operate.
*
* Known values for number of threads is any value greater than 1
*/
int threadpool_setsize(threadpool_t *pool, int thread_count);
private:
pthread_mutex_t m_threadLock;
threadpool_t *m_threadPool;
};
#ifdef __cplusplus
}
#endif
#endif /* THREADPOOLH_ *///threadpool.cpp
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <map>
#include "threadpool.h"
#include <stdio.h>
typedef enum {
immediate_shutdown = 1,
graceful_shutdown = 2
} threadpool_shutdown_t;
typedef struct {
pthread_t * threads;
int status;
}threadpool_thread_t;
/**
* @struct threadpool_task
* @brief the work struct
*
* @var function Pointer to the function that will perform the task.
* @var argument Argument to be passed to the function.
*/
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
/**
* @struct threadpool
* @brief The threadpool struct
*
* @var notify Condition variable to notify worker threads.
* @var threads Array containing worker threads ID.
* @var thread_count Number of threads
* @var queue Array containing the task queue.
* @var queue_size Size of the task queue.
* @var head Index of the first element.
* @var tail Index of the next element.
* @var count Number of pending tasks
* @var shutdown Flag indicating if the pool is shutting down
* @var started Number of started threads
*/
struct threadpool_t {
pthread_mutex_t lock;
pthread_cond_t notify;
std::map<pthread_t, threadpool_thread_t> threads;
threadpool_task_t *queue;
threadpool_task_t *queue_priority;
int thread_count;
int queue_size;
int queue_head;
int queue_tail;
int queue_count;
int queue_size_priority;
int queue_head_priority;
int queue_tail_priority;
int queue_count_priority;
int shutdown;
int started;
};
/**
@function void threadpool_thread(void *threadpool)
* @brief the worker thread
* @param threadpool the pool which own the thread
*/
static void threadpool_thread(void threadpool);
int threadpool_free(threadpool_t *pool);
CThreadPool::CThreadPool()
:m_threadPool(NULL),
m_threadLock(PTHREAD_MUTEX_INITIALIZER)
{
}
CThreadPool::~CThreadPool()
{
m_threadPool = NULL;
}
int CThreadPool::threadPoolStart(int thread_count, int queueSize)
{
pthread_mutex_lock(&m_threadLock);
if (!(m_threadPool = threadpool_create(thread_count, queueSize, 0)))
{
pthread_mutex_unlock(&m_threadLock);
return threadpool_comm_error;
}
pthread_mutex_unlock(&m_threadLock);
return threadpool_succeed;
}
int CThreadPool::threadPoolStop()
{
pthread_mutex_lock(&m_threadLock);
if ((threadpool_destroy(m_threadPool, 0) != 0))
{
pthread_mutex_unlock(&m_threadLock);
return threadpool_comm_error;
}
pthread_mutex_unlock(&m_threadLock);
return threadpool_succeed;
}
int CThreadPool::threadPoolAddTask(void (*routine)(void ),void arg, int flags)
{
pthread_mutex_lock(&m_threadLock);
int iRet = threadpool_add(m_threadPool, routine, arg, flags);
pthread_mutex_unlock(&m_threadLock);
return iRet;
}
int CThreadPool::threadPoolTaskCount()
{
int iTaskCount = 0;
pthread_mutex_lock(&m_threadLock);
iTaskCount = m_threadPool->queue_count;
pthread_mutex_unlock(&m_threadLock);
return iTaskCount;
}
threadpool_t* CThreadPool::threadpool_create(int thread_count, int queueSize, int flags)
{
threadpool_t *pool;
int i;
(void) flags;
if(thread_count <= 0 || thread_count > MAX_THREADS || queueSize <= 0 || queueSize > MAX_QUEUE) {
return NULL;
}
/*if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
goto err;
}*/
if((pool = (threadpool_t *)new threadpool_t()) == NULL) {
goto err;
}
/* Initialize */
pool->thread_count = 0;
pool->threads.clear();
pool->queue_size = queueSize;
pool->queue_size_priority = queueSize/2;
pool->queue_head = pool->queue_tail = pool->queue_count = 0;
pool->queue_head_priority = pool->queue_tail_priority = pool->queue_count_priority = 0;
pool->shutdown = pool->started = 0;
/* Allocate task queue */
pool->queue = (threadpool_task_t *)malloc
(sizeof(threadpool_task_t) * queueSize);
/* Allocate task queue priority*/
pool->queue_priority = (threadpool_task_t *)malloc
(sizeof(threadpool_task_t) * (queueSize/2));
/* Initialize mutex and conditional variable first */
if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
(pthread_cond_init(&(pool->notify), NULL) != 0) ||
(pool->queue == NULL) ||
(pool->queue_priority == NULL)) {
goto err;
}
/* Start worker threads */
for(i = 0; i < thread_count; i++) {
pthread_t pHandle = (pthread_t)malloc(sizeof(pthread_t));
if (NULL != pHandle)
{
if(pthread_create(pHandle, NULL,threadpool_thread, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
free(pHandle);
return NULL;
}
threadpool_thread_t stThread;
stThread.status = 0;
stThread.threads = pHandle;
pool->threads.insert(std::pair<pthread_t, threadpool_thread_t>(*pHandle, stThread));
pool->thread_count++;
pool->started++;
}
}
return pool;
err:
if(pool) {
threadpool_free(pool);
}
return NULL;
}
int CThreadPool::threadpool_setsize(threadpool_t *pool, int thread_count)
{
int err = 0;
if (thread_count<MIN_THREADS || thread_count>MAX_THREADS)
{
return threadpool_thread_limit;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
if (pool->thread_count < thread_count)
{
/* Start additional worker threads */
for(int i = pool->thread_count; i < thread_count; i++) {
pthread_t pHandle = (pthread_t)malloc(sizeof(pthread_t));
if (NULL != pHandle)
{
if(pthread_create(pHandle, NULL,threadpool_thread, (void*)pool) != 0) {
if(pthread_mutex_unlock(&pool->lock) != 0) {
err = threadpool_lock_failure;
}
free(pHandle);
return threadpool_comm_error;
}
threadpool_thread_t stThread;
stThread.status = 0;
stThread.threads = pHandle;
pool->threads.insert(std::pair<pthread_t, threadpool_thread_t>(*pHandle, stThread));
pool->thread_count++;
pool->started++;
}
}
}
else if (pool->thread_count > thread_count)
{
int i = pool->thread_count;
/* notify to close [pool.thread_count - thread_count] thread */
std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.begin();
for(; i > thread_count; i--) {
if (it != pool->threads.end())
{
it->second.status = 1;
it++;
}
}
/* Wake up all worker threads */
if(pthread_cond_broadcast(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
}
}
else
{
err = threadpool_thread_equal;
}
if(pthread_mutex_unlock(&pool->lock) != 0) {
err = threadpool_lock_failure;
}
return err;
}
int CThreadPool::threadPoolSetSize(int thread_count)
{
pthread_mutex_lock(&m_threadLock);
int iRet = threadpool_setsize(m_threadPool, thread_count);
pthread_mutex_unlock(&m_threadLock);
return iRet;
}
int CThreadPool::threadpool_add(threadpool_t pool, void (function)(void *),
void *argument, int flags)
{
int err = 0;
if(pool NULL || function NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
if(flags&threadpool_priority)
{
int next;
next = (pool->queue_tail_priority + 1) % pool->queue_size_priority;
do {
/* Are we full ? */
if(pool->queue_count_priority == pool->queue_size_priority) {
err = threadpool_queue_full;
break;
}
/* Are we shutting down ? */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
/* Add task to queue */
pool->queue_priority[pool->queue_tail_priority].function = function;
pool->queue_priority[pool->queue_tail_priority].argument = argument;
pool->queue_tail_priority = next;
pool->queue_count_priority += 1;
/* pthread_cond_broadcast */
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
} while(0);
}
else
{
int next;
next = (pool->queue_tail + 1) % pool->queue_size;
do {
/* Are we full ? */
if(pool->queue_count == pool->queue_size) {
err = threadpool_queue_full;
break;
}
/* Are we shutting down ? */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
/* Add task to queue */
pool->queue[pool->queue_tail].function = function;
pool->queue[pool->queue_tail].argument = argument;
pool->queue_tail = next;
pool->queue_count += 1;
/* pthread_cond_broadcast */
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
} while(0);
}
if(pthread_mutex_unlock(&pool->lock) != 0) {
err = threadpool_lock_failure;
}
return err;
}
int CThreadPool::threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if(pool == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
do {
/* Already shutting down */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
int i = pool->thread_count;
/* notify to close [pool.thread_count - thread_count] thread */
std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.begin();
for(; i > 0; i--) {
if (it != pool->threads.end())
{
it->second.status = 1;
it++;
}
}
/* Wake up all worker threads */
if(pthread_cond_broadcast(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
} while(0);
if(pthread_mutex_unlock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
/* Only if everything went well do we deallocate the pool */
if(!err) {
threadpool_free(pool);
}
return err;
}
int threadpool_free(threadpool_t *pool)
{
if(pool == NULL || pool->started > 0) {
return threadpool_comm_error;
}
/* Did we manage to allocate ? */
if(pool->queue_priority) {
free(pool->queue_priority);
}
if(pool->queue) {
free(pool->queue);
/* Because we allocate pool->threads after initializing the
mutex and condition variable, we're sure they're
initialized. Let's lock the mutex just in case. */
//pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
delete(pool);
return threadpool_succeed;
}
static int checkthreadexit(threadpool_t *pool)
{
int err = 0;
pthread_t tid = pthread_self();
std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.find(tid);
if (it != pool->threads.end()){
if (it->second.status == 1)
{
free(it->second.threads);
pool->threads.erase(it);
pool->thread_count--;
err = threadpool_thread_kill;
return err;
}
else
{
err = threadpool_thread_run;
return err;
}
}
else
{
err = threadpool_thread_invalid;
return err;
}
return err;
}
static int checkthreadwait(threadpool_t *pool)
{
int err = 0;
pthread_t tid = pthread_self();
std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.find(tid);
if (it != pool->threads.end() && it->second.status == 1){
if (it->second.status == 1)
{
err = threadpool_thread_kill;
return err;
}
else
{
err = threadpool_thread_run;
return err;
}
}
else
{
err = threadpool_thread_invalid;
return err;
}
return err;
}
static void threadpool_thread(void threadpool)
{
threadpool_t pool = (threadpool_t )threadpool;
threadpool_task_t task;
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while((pool->queue_count 0) && (pool->queue_count_priority 0) && (!pool->shutdown)) {
/* check status */
if (checkthreadwait(pool) == threadpool_thread_kill)
{
break;
}
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
/* check status */
if (checkthreadexit(pool) == threadpool_thread_kill)
{
break;
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->queue_count == 0) &&
(pool->queue_count_priority == 0))) {
break;
}
/* Grab our task */
if(pool->queue_count_priority != 0)
{
task.function = pool->queue_priority[pool->queue_head_priority].function;
task.argument = pool->queue_priority[pool->queue_head_priority].argument;
pool->queue_head_priority = (pool->queue_head_priority + 1) % pool->queue_size_priority;
pool->queue_count_priority -= 1;
}
else
{
task.function = pool->queue[pool->queue_head].function;
task.argument = pool->queue[pool->queue_head].argument;
pool->queue_head = (pool->queue_head + 1) % pool->queue_size;
pool->queue_count -= 1;
}
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.function))(task.argument);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return(NULL);
}