Administrator
发布于 2025-08-29 / 22 阅读
0
0

基于pthread的线程池实现

将任务分装成独立的可执行单元(Actor),然后进行多线程并发处理;预先创建一组线程,等待任务到来时分配执行,避免了频繁创建和销毁线程的开销。

线程池核心组件:

1. 任务队列:存储待执行的任务

2. 工作线程:执行任务的线程集合

3. 线程池管理器:负责线程池的创建、销毁和任务调度核心实现:

具体实现

//threadpool.h

#ifndef THREADPOOLH_

#define THREADPOOLH_

#ifdef __cplusplus

extern "C" {

#endif

 

 /**

 * Increase this constants at your own risk

 * Large values might slow down your system

 */

#define MIN_THREADS 1

#define MAX_THREADS 64

#define MAX_QUEUE 655360

typedef struct threadpool_t threadpool_t;

typedef void (*threadpool_func_f)(void *param);    

typedef enum {

	threadpool_succeed		  = 0,

    threadpool_invalid        = -1,

    threadpool_lock_failure   = -2,

    threadpool_queue_full     = -3,

    threadpool_shutdown       = -4,

	threadpool_thread_failure = -5,

	threadpool_comm_error	  = -6,

	threadpool_thread_limit	  = -7,

	threadpool_thread_equal	  = -8,

} threadpool_error_t;

typedef enum {

	threadpool_thread_invalid	= 0,

	threadpool_thread_run		= 1,

	threadpool_thread_kill		= 2

} threadpool_thread_status;

typedef enum {

    threadpool_graceful       = 1,

	threadpool_priority		  = 2

} threadpool_flags_t;

class CThreadPool

{

public:

	CThreadPool();

	~CThreadPool();

	int threadPoolStart(int thread_count, int queueSize);

	int threadPoolSetSize(int thread_count);

	int threadPoolStop();

	int threadPoolAddTask(void (*routine)(void ),void arg, int flags=0);

	int threadPoolTaskCount();

private:

	/**

	 * @function threadpool_create

	 * @brief Creates a threadpool_t object.

	 * @param thread_count Number of worker threads.

	 * @param queue_size   Size of the queue.

	 * @param flags        Unused parameter.

	 * @return a newly created thread pool or NULL

	 */

	threadpool_t *threadpool_create(int thread_count, int queueSize, int flags = 0);

	/**

	 * @function threadpool_add

	 * @brief add a new task in the queue of a thread pool

	 * @param pool     Thread pool to which add the task.

	 * @param function Pointer to the function that will perform the task.

	 * @param argument Argument to be passed to the function.

	 * @param flags    Unused parameter.

	 * @return 0 if all goes well, negative values in case of error (@see

	 * threadpool_error_t for codes).

	 */

	int threadpool_add(threadpool_t pool, void (routine)(void *),

					   void *arg, int flags);

	/**

	 * @function threadpool_destroy

	 * @brief Stops and destroys a thread pool.

	 * @param pool  Thread pool to destroy.

	 * @param flags Flags for shutdown

	 *

	 * Known values for flags are 0 (default) and threadpool_graceful in

	 * which case the thread pool doesn't accept any new tasks but

	 * processes all pending tasks before shutdown.

	 */

	int threadpool_destroy(threadpool_t *pool, int flags);

	/**

	 * @function threadPool_SetSize

	 * @brief Increase or decrease the number of threads.

	 * @param pool  Thread pool to operate.

	 * @param thread_count number of threads to operate.

	 *

	 * Known values for number of threads is any value greater than 1

	 */

	int threadpool_setsize(threadpool_t *pool, int thread_count);

private:

	pthread_mutex_t		m_threadLock;

	threadpool_t		*m_threadPool;

};

#ifdef __cplusplus

}

#endif

#endif /* THREADPOOLH_ */
//threadpool.cpp

#include <stdlib.h>

#include <pthread.h>

#include <unistd.h>

#include <map>

#include "threadpool.h"

#include <stdio.h>

typedef enum {

    immediate_shutdown = 1,

    graceful_shutdown  = 2

} threadpool_shutdown_t;

typedef struct {

	pthread_t * threads;

	int status;

}threadpool_thread_t;

/**

 *  @struct threadpool_task

 *  @brief the work struct

 *

 *  @var function Pointer to the function that will perform the task.

 *  @var argument Argument to be passed to the function.

 */

typedef struct {

    void (*function)(void *);

    void *argument;

} threadpool_task_t;

/**

 *  @struct threadpool

 *  @brief The threadpool struct

 *

 *  @var notify       Condition variable to notify worker threads.

 *  @var threads      Array containing worker threads ID.

 *  @var thread_count Number of threads

 *  @var queue        Array containing the task queue.

 *  @var queue_size   Size of the task queue.

 *  @var head         Index of the first element.

 *  @var tail         Index of the next element.

 *  @var count        Number of pending tasks

 *  @var shutdown     Flag indicating if the pool is shutting down

 *  @var started      Number of started threads

 */

struct threadpool_t {

  pthread_mutex_t lock;

  pthread_cond_t notify;

  std::map<pthread_t, threadpool_thread_t> threads;

  threadpool_task_t *queue;

  threadpool_task_t *queue_priority;

  int thread_count;

  int queue_size;

  int queue_head;

  int queue_tail;

  int queue_count;

  int queue_size_priority;

  int queue_head_priority;

  int queue_tail_priority;

  int queue_count_priority;

  int shutdown;

  int started;

};

/**

  @function void threadpool_thread(void *threadpool)

 * @brief the worker thread

 * @param threadpool the pool which own the thread

 */

static void threadpool_thread(void threadpool);

int threadpool_free(threadpool_t *pool);

CThreadPool::CThreadPool()

:m_threadPool(NULL),

m_threadLock(PTHREAD_MUTEX_INITIALIZER)

{

}

CThreadPool::~CThreadPool()

{

	m_threadPool = NULL;

}

int CThreadPool::threadPoolStart(int thread_count, int queueSize)

{

	pthread_mutex_lock(&m_threadLock);

	if (!(m_threadPool = threadpool_create(thread_count, queueSize, 0)))

	{

		pthread_mutex_unlock(&m_threadLock);

		return threadpool_comm_error;

	}

	pthread_mutex_unlock(&m_threadLock);

	return threadpool_succeed;

}

int CThreadPool::threadPoolStop()

{

	pthread_mutex_lock(&m_threadLock);

	if ((threadpool_destroy(m_threadPool, 0) != 0))

	{

		pthread_mutex_unlock(&m_threadLock);

		return threadpool_comm_error;

	}

	pthread_mutex_unlock(&m_threadLock);

	return threadpool_succeed;

}

int CThreadPool::threadPoolAddTask(void (*routine)(void ),void arg, int flags)

{

	pthread_mutex_lock(&m_threadLock);

	int iRet = threadpool_add(m_threadPool, routine, arg, flags);

	pthread_mutex_unlock(&m_threadLock);

	return iRet;

}

int CThreadPool::threadPoolTaskCount()

{

	int iTaskCount = 0;

	pthread_mutex_lock(&m_threadLock);

	iTaskCount = m_threadPool->queue_count;

	pthread_mutex_unlock(&m_threadLock);

	return iTaskCount;

}

threadpool_t* CThreadPool::threadpool_create(int thread_count, int queueSize, int flags)

{

    threadpool_t *pool;

    int i;

    (void) flags;

    if(thread_count <= 0 || thread_count > MAX_THREADS || queueSize <= 0 || queueSize > MAX_QUEUE) {

        return NULL;

    }

    /*if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {

        goto err;

    }*/

	if((pool = (threadpool_t *)new threadpool_t()) == NULL) {

		goto err;

	}

    /* Initialize */

    pool->thread_count = 0;

	pool->threads.clear();

    pool->queue_size = queueSize;

	pool->queue_size_priority = queueSize/2;

    pool->queue_head = pool->queue_tail = pool->queue_count = 0;

    pool->queue_head_priority = pool->queue_tail_priority = pool->queue_count_priority = 0;

    pool->shutdown = pool->started = 0;

    /* Allocate task queue */

    pool->queue = (threadpool_task_t *)malloc

        (sizeof(threadpool_task_t) * queueSize);

	/* Allocate task queue priority*/

    pool->queue_priority = (threadpool_task_t *)malloc

        (sizeof(threadpool_task_t) * (queueSize/2));

    /* Initialize mutex and conditional variable first */

    if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||

       (pthread_cond_init(&(pool->notify), NULL) != 0) ||

       (pool->queue == NULL) ||

       (pool->queue_priority == NULL)) {

        goto err;

    }

    /* Start worker threads */

    for(i = 0; i < thread_count; i++) {

		pthread_t pHandle = (pthread_t)malloc(sizeof(pthread_t));

		if (NULL != pHandle)

		{

			if(pthread_create(pHandle, NULL,threadpool_thread, (void*)pool) != 0) {

					threadpool_destroy(pool, 0);

					free(pHandle);

					return NULL;

			}

			threadpool_thread_t stThread;

			stThread.status = 0;

			stThread.threads = pHandle;

			pool->threads.insert(std::pair<pthread_t, threadpool_thread_t>(*pHandle, stThread));

			pool->thread_count++;

			pool->started++;

		}

    }

    return pool;

 err:

    if(pool) {

        threadpool_free(pool);

    }

    return NULL;

}

int CThreadPool::threadpool_setsize(threadpool_t *pool, int thread_count)

{

	int err = 0;

	if (thread_count<MIN_THREADS || thread_count>MAX_THREADS)

	{

		return threadpool_thread_limit;

	}

	if(pthread_mutex_lock(&(pool->lock)) != 0) {

		return threadpool_lock_failure;

	}

	if (pool->thread_count < thread_count)

	{

		/* Start additional worker threads */

		for(int i = pool->thread_count; i < thread_count; i++) {

			pthread_t pHandle = (pthread_t)malloc(sizeof(pthread_t));

			if (NULL != pHandle)

			{

				if(pthread_create(pHandle, NULL,threadpool_thread, (void*)pool) != 0) {

						if(pthread_mutex_unlock(&pool->lock) != 0) {

							err = threadpool_lock_failure;

						}

						free(pHandle);

						return threadpool_comm_error;

				}

				threadpool_thread_t stThread;

				stThread.status = 0;

				stThread.threads = pHandle;

				pool->threads.insert(std::pair<pthread_t, threadpool_thread_t>(*pHandle, stThread));

				pool->thread_count++;

				pool->started++;

			}

		}

	}

	else if (pool->thread_count > thread_count)

	{

		int i = pool->thread_count;

		/* notify to close [pool.thread_count - thread_count] thread  */

		std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.begin();

		for(; i > thread_count; i--) {

			if (it != pool->threads.end())

			{

				

				it->second.status = 1;

				it++;

			}

		}

		/* Wake up all worker threads */

		if(pthread_cond_broadcast(&(pool->notify)) != 0) {

			err = threadpool_lock_failure;

		}

	}

	else

	{

		err = threadpool_thread_equal;

	}

	if(pthread_mutex_unlock(&pool->lock) != 0) {

		err = threadpool_lock_failure;

	}

	return err;

}

int CThreadPool::threadPoolSetSize(int thread_count)

{

	pthread_mutex_lock(&m_threadLock);

	int iRet = threadpool_setsize(m_threadPool, thread_count);

	pthread_mutex_unlock(&m_threadLock);

	return iRet;

}

int CThreadPool::threadpool_add(threadpool_t pool, void (function)(void *),

                   void *argument, int flags)

{

    int err = 0;

    if(pool  NULL || function  NULL) {

        return threadpool_invalid;

    }

    if(pthread_mutex_lock(&(pool->lock)) != 0) {

        return threadpool_lock_failure;

    }

	if(flags&threadpool_priority)

	{

		

		int next;

		next = (pool->queue_tail_priority + 1) % pool->queue_size_priority;

		do {

			/* Are we full ? */

			if(pool->queue_count_priority == pool->queue_size_priority) {

				err = threadpool_queue_full;

				break;

			}

			/* Are we shutting down ? */

			if(pool->shutdown) {

				err = threadpool_shutdown;

				break;

			}

			/* Add task to queue */

			pool->queue_priority[pool->queue_tail_priority].function = function;

			pool->queue_priority[pool->queue_tail_priority].argument = argument;

			pool->queue_tail_priority = next;

			pool->queue_count_priority += 1;

			/* pthread_cond_broadcast */

			if(pthread_cond_signal(&(pool->notify)) != 0) {

				err = threadpool_lock_failure;

				break;

			}

		} while(0);

	}

	else

	{

		int next;

	    next = (pool->queue_tail + 1) % pool->queue_size;

	    do {

	        /* Are we full ? */

	        if(pool->queue_count == pool->queue_size) {

	            err = threadpool_queue_full;

	            break;

	        }

	        /* Are we shutting down ? */

	        if(pool->shutdown) {

	            err = threadpool_shutdown;

	            break;

	        }

	        /* Add task to queue */

	        pool->queue[pool->queue_tail].function = function;

	        pool->queue[pool->queue_tail].argument = argument;

	        pool->queue_tail = next;

	        pool->queue_count += 1;

	        /* pthread_cond_broadcast */

	        if(pthread_cond_signal(&(pool->notify)) != 0) {

	            err = threadpool_lock_failure;

	            break;

	        }

	    } while(0);

	}

	

    if(pthread_mutex_unlock(&pool->lock) != 0) {

        err = threadpool_lock_failure;

    }

    return err;

}

int CThreadPool::threadpool_destroy(threadpool_t *pool, int flags)

{

    int i, err = 0;

    if(pool == NULL) {

        return threadpool_invalid;

    }

    if(pthread_mutex_lock(&(pool->lock)) != 0) {

        return threadpool_lock_failure;

    }

    do {

        /* Already shutting down */

        if(pool->shutdown) {

            err = threadpool_shutdown;

            break;

        }

        pool->shutdown = (flags & threadpool_graceful) ?

            graceful_shutdown : immediate_shutdown;

		int i = pool->thread_count;

		/* notify to close [pool.thread_count - thread_count] thread  */

		std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.begin();

		for(; i > 0; i--) {

			if (it != pool->threads.end())

			{

				it->second.status = 1;

				it++;

			}

		}

        /* Wake up all worker threads */

        if(pthread_cond_broadcast(&(pool->notify)) != 0) {

            err = threadpool_lock_failure;

            break;

        }

    } while(0);

	if(pthread_mutex_unlock(&(pool->lock)) != 0) {

		return threadpool_lock_failure;

	}

    /* Only if everything went well do we deallocate the pool */

    if(!err) {

        threadpool_free(pool);

    }

    return err;

}

int threadpool_free(threadpool_t *pool)

{

    if(pool == NULL || pool->started > 0) {

        return threadpool_comm_error;

    }

    /* Did we manage to allocate ? */

	if(pool->queue_priority) {

        free(pool->queue_priority);

		}

    if(pool->queue) {

        free(pool->queue);

 

        /* Because we allocate pool->threads after initializing the

           mutex and condition variable, we're sure they're

           initialized. Let's lock the mutex just in case. */

        //pthread_mutex_lock(&(pool->lock));

        pthread_mutex_destroy(&(pool->lock));

        pthread_cond_destroy(&(pool->notify));

    }

    delete(pool);    

    return threadpool_succeed;

}

static int checkthreadexit(threadpool_t *pool)

{

	int err = 0;

	pthread_t tid = pthread_self();

	std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.find(tid);

	if (it != pool->threads.end()){

		if (it->second.status == 1)

		{

			free(it->second.threads);

			pool->threads.erase(it);

			pool->thread_count--;

			err =  threadpool_thread_kill;

			return err;

		}

		else

		{

			err =  threadpool_thread_run;

			return err;

		}

	}

	else

	{

		err = threadpool_thread_invalid;

		return err;

	}

	return err;

}

static int checkthreadwait(threadpool_t *pool)

{

	int err = 0;

	pthread_t tid = pthread_self();

	std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.find(tid);

	if (it != pool->threads.end() && it->second.status == 1){

		if (it->second.status == 1)

		{

			err =  threadpool_thread_kill;

			return err;

		}

		else

		{

			err =  threadpool_thread_run;

			return err;

		}

	}

	else

	{

		err = threadpool_thread_invalid;

		return err;

	}

	return err;

}

static void threadpool_thread(void threadpool)

{

    threadpool_t pool = (threadpool_t )threadpool;

    threadpool_task_t task;

    for(;;) {

        /* Lock must be taken to wait on conditional variable */

        pthread_mutex_lock(&(pool->lock));

        /* Wait on condition variable, check for spurious wakeups.

           When returning from pthread_cond_wait(), we own the lock. */

        while((pool->queue_count  0) && (pool->queue_count_priority  0) && (!pool->shutdown)) {

			

			/* check status */

			if (checkthreadwait(pool) == threadpool_thread_kill)

			{

				break;

			}

            pthread_cond_wait(&(pool->notify), &(pool->lock));

        }

		/* check status */

		if (checkthreadexit(pool) == threadpool_thread_kill)

		{

			break;

		}

        if((pool->shutdown == immediate_shutdown) ||

           ((pool->shutdown == graceful_shutdown) &&

            (pool->queue_count == 0) && 

            (pool->queue_count_priority == 0))) {

            break;

        }

        /* Grab our task */

		if(pool->queue_count_priority != 0)

		{

			 task.function = pool->queue_priority[pool->queue_head_priority].function;

	        task.argument = pool->queue_priority[pool->queue_head_priority].argument;

	        pool->queue_head_priority = (pool->queue_head_priority + 1) % pool->queue_size_priority;

	        pool->queue_count_priority -= 1;

		}

        else

        {

        	task.function = pool->queue[pool->queue_head].function;

	        task.argument = pool->queue[pool->queue_head].argument;

	        pool->queue_head = (pool->queue_head + 1) % pool->queue_size;

	        pool->queue_count -= 1;

        }

        /* Unlock */

        pthread_mutex_unlock(&(pool->lock));

        /* Get to work */

        (*(task.function))(task.argument);

	}

    pool->started--;

    pthread_mutex_unlock(&(pool->lock));

    pthread_exit(NULL);

    return(NULL);

}


评论