您的位置:首页 > 编程语言 > C语言/C++

c++/c实现线程池

2015-12-04 09:29 639 查看
编程实例:

CThreadPool.h

#include <iostream>
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>

typedef void*(*FunPtr)(void* arg);

class CThreadPool;
typedef struct worker
{
pthread_t pid;
FunPtr fun_cb;
void* arg;
CThreadPool* parent;
pthread_mutex_t mutex;
pthread_cond_t cond;
}CThread_worker;

class CThreadPool
{
public:
CThreadPool(int maxThreads);
virtual ~CThreadPool();

static CThreadPool* getInstance(int threadNum);
int addTask(FunPtr fun_cb, void *arg);
FunPtr fun_cb;

private:
pthread_mutex_t mainMutex;
pthread_cond_t IdleCond;
pthread_cond_t FullCond;
pthread_cond_t EmptyCond;

CThread_worker **threadList;

int shutdown;

int maxThreads;
int index;
int total;

static void* wrapperFunc(void* arg);
int saveThread(CThread_worker* thread);

static CThreadPool *threadPool;
};

CThreadPool* CThreadPool::threadPool = NULL;

CThreadPool::CThreadPool(int threadNum)
{
if(threadNum <=0)
maxThreads = 2;
else
maxThreads = threadNum;
pthread_mutex_init(&mainMutex, NULL);
pthread_cond_init(&IdleCond, NULL);
pthread_cond_init(&FullCond, NULL);
pthread_cond_init(&EmptyCond, NULL);

shutdown = index  = total = 0;
threadList = (CThread_worker**)calloc(maxThreads, sizeof(void*));
}

CThreadPool::~CThreadPool()
{
pthread_mutex_lock(&mainMutex);
if(index <= total)
{
pthread_cond_wait(&FullCond, &mainMutex);
}
shutdown = 1;
for(int i=0;i<index; ++i)
{
CThread_worker* thread = threadList[i];
pthread_mutex_lock(&thread->mutex);
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);
}
if(total>0)
{
pthread_cond_wait(&EmptyCond, &mainMutex);
}
for(int i=0;i<index; ++i)
{
CThread_worker* thread = threadList[i];
pthread_mutex_destroy(&thread->mutex);
pthread_cond_destroy(&thread->cond);
free(thread);
thread = NULL;
}

pthread_mutex_unlock(&mainMutex);
index = 0;
pthread_mutex_destroy(&mainMutex);
pthread_cond_destroy(&EmptyCond);
pthread_cond_destroy(&IdleCond);
pthread_cond_destroy(&FullCond);

free(threadList);
threadList = NULL;

}

CThreadPool* CThreadPool::getInstance(int num)
{
if(threadPool == NULL)
{
threadPool = new CThreadPool(num);
}
return threadPool;
}

int CThreadPool::addTask(FunPtr fun_cb, void *arg)
{
int ret = 0;
pthread_attr_t attr;
CThread_worker* thread = NULL;
pthread_mutex_lock(&mainMutex);
if(index <=0 && total >= maxThreads)
pthread_cond_wait(&IdleCond, &mainMutex);
if(index <=0)
{
CThread_worker *thread = (CThread_worker*)malloc(sizeof(CThread_worker));
thread->pid = 0;
pthread_mutex_init(&thread->mutex, NULL);
pthread_cond_init(&thread->cond, NULL);
thread->fun_cb = fun_cb;
thread->arg= arg;
thread->parent = this;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED );
if(pthread_create(&(thread->pid), &attr, wrapperFunc, (void*)thread) == 0)
{
++total;
}
else
{
ret = -1;
pthread_mutex_destroy(&thread->mutex);
pthread_cond_destroy(&thread->cond);
free(thread);
thread =NULL;
}
}
else
{
--index;
thread = threadList[index];
threadList[index] = NULL;
thread->fun_cb = fun_cb;
thread->arg = arg;
thread->parent = this;

pthread_mutex_lock(&thread->mutex);
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);
}

pthread_mutex_unlock(&mainMutex);
}

void* CThreadPool::wrapperFunc(void* arg)
{
CThread_worker *thread = (CThread_worker*)arg;
while(1)
{
if(thread->parent->shutdown == 0)
{
thread->fun_cb(thread->arg);
pthread_mutex_lock(&thread->mutex);
if(thread->parent->saveThread(thread) == 0)
{
pthread_cond_wait(&thread->cond, &thread->mutex);
pthread_mutex_unlock(&thread->mutex);
}
else
{
pthread_mutex_unlock(&thread->mutex);
pthread_mutex_destroy(&thread->mutex);
free(thread);
thread = NULL;
break;
}
}
else
break;
}

if(thread != NULL)
{
pthread_mutex_lock(&thread->parent->mainMutex);
--(thread->parent->total);
if(thread->parent->total <=0)
{
pthread_cond_signal(&thread->parent->EmptyCond);
}
pthread_mutex_unlock(&thread->parent->mainMutex);
}
}

int CThreadPool::saveThread(CThread_worker* thread)
{
int ret = -1;
pthread_mutex_lock(&mainMutex);
if(index <= maxThreads)
{
threadList[index] = thread;
++index;
ret = 0;
pthread_cond_signal(&IdleCond);

if(index >= total)
pthread_cond_signal(&FullCond);
}

pthread_mutex_unlock(&mainMutex);
return ret;
}


TestPool.cpp

#include "CThreadPool.h"
#include <time.h>
void *myprocess (void *arg)
{
printf ("threadid is %lu, working on task(%s)\n", pthread_self(), (char*)arg);
sleep (3);
}

void *myprocess2 (void *arg)
{
printf ("threadid is %lu, working on task(%s)\n", pthread_self(), (char*)arg);
sleep (2);
}

int main(int argc, char *argv[])
{

CThreadPool* pool = CThreadPool::getInstance(10);

for(int i=0;i<5; ++i)
{
pool->addTask(myprocess, (void*)"process1");
}

for(int i=0; i<5; i++)
{
pool->addTask(myprocess2, (void*)"process2");
}

sleep(5);
return 0;
}


编译:

g++ -g -o testPool CThreadPool.h TestPool.cpp -lpthread

C语言实现:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>

/*
*线程池里所有运行和等待的任务都是一个CThread_worker
*由于所有任务都在链表里,所以是一个链表结构
*/
typedef struct worker
{
/*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/
void *(*process) (void *arg);
void *arg;/*回调函数的参数*/
struct worker *next;

} CThread_worker;

/*线程池结构*/
typedef struct
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;

/*链表结构,线程池中所有等待任务*/
CThread_worker *queue_head;

/*是否销毁线程池*/
int shutdown;
pthread_t *threadid;
/*线程池中允许的活动线程数目*/
int max_thread_num;
/*当前等待队列的任务数目*/
int cur_queue_size;

} CThread_pool;

int pool_add_worker (void *(*process) (void *arg), void *arg);
void *thread_routine (void *arg);

static CThread_pool *pool = NULL;
void
pool_init (int max_thread_num)
{
pool = (CThread_pool *) malloc (sizeof (CThread_pool));

pthread_mutex_init (&(pool->queue_lock), NULL);
pthread_cond_init (&(pool->queue_ready), NULL);

pool->queue_head = NULL;

pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;

pool->shutdown = 0;

pool->threadid =
(pthread_t *) malloc (max_thread_num * sizeof (pthread_t));

int i = 0;
for (i = 0; i < max_thread_num; i++)
{
pthread_create (&(pool->threadid[i]), NULL, thread_routine,
NULL);
}
}

void *
thread_routine (void *arg)
{
printf ("starting thread 0x%x\n", (uint)pthread_self());
while (1)
{
pthread_mutex_lock (&(pool->queue_lock));
/*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
while (pool->cur_queue_size == 0 && !pool->shutdown)
{
printf ("thread 0x%x is waiting\n", (uint)pthread_self ());
pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
}

/*线程池要销毁了*/
if (pool->shutdown)
{
/*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
pthread_mutex_unlock (&(pool->queue_lock));
printf ("thread 0x%x will exit\n", (uint)pthread_self ());
pthread_exit (NULL);
}

printf ("thread 0x%x is starting to work\n", (uint)pthread_self ());

/*assert是调试的好帮手*/
assert (pool->cur_queue_size != 0);
assert (pool->queue_head != NULL);

/*等待队列长度减去1,并取出链表中的头元素*/
pool->cur_queue_size--;
CThread_worker *worker = pool->queue_head;
pool->queue_head = worker->next;
pthread_mutex_unlock (&(pool->queue_lock));

/*调用回调函数,执行任务*/
(*(worker->process)) (worker->arg);
free (worker);
worker = NULL;
}
/*这一句应该是不可达的*/
pthread_exit (NULL);
}

/*向线程池中加入任务*/
int
pool_add_worker (void *(*process) (void *arg), void *arg)
{
/*构造一个新任务*/
CThread_worker *newworker =
(CThread_worker *) malloc (sizeof (CThread_worker));
newworker->process = process;
newworker->arg = arg;
newworker->next = NULL;/*别忘置空*/

pthread_mutex_lock (&(pool->queue_lock));
/*将任务加入到等待队列中*/
CThread_worker *member = pool->queue_head;
if (member != NULL)
{
while (member->next != NULL)
member = member->next;
member->next = newworker;
}
else
{
pool->queue_head = newworker;
}

assert (pool->queue_head != NULL);

pool->cur_queue_size++;
pthread_mutex_unlock (&(pool->queue_lock));
/*好了,等待队列中有任务了,唤醒一个等待线程;
注意如果所有线程都在忙碌,这句没有任何作用*/
pthread_cond_signal (&(pool->queue_ready));
return 0;
}

/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直
把任务运行完后再退出*/
int
pool_destroy ()
{
if (pool->shutdown)
return -1;/*防止两次调用*/
pool->shutdown = 1;

/*唤醒所有等待线程,线程池要销毁了*/
pthread_cond_broadcast (&(pool->queue_ready));

/*阻塞等待线程退出,否则就成僵尸了*/
int i;
for (i = 0; i < pool->max_thread_num; i++)
pthread_join (pool->threadid[i], NULL);
free (pool->threadid);

/*销毁等待队列*/
CThread_worker *head = NULL;
while (pool->queue_head != NULL)
{
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free (head);
}
/*条件变量和互斥量也别忘了销毁*/
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));

free (pool);
/*销毁后指针置空是个好习惯*/
pool=NULL;
return 0;
}


测试:

#include "ThreadPool2.h"

void *
myprocess (void *arg)
{
printf ("threadid is 0x%x, working on task %d\n", pthread_self(),*(int *) arg);
sleep (1);/*休息一秒,延长任务的执行时间*/
return NULL;
}

int
main (int argc, char **argv)
{
pool_init(3);/*线程池中最多三个活动线程*/

/*连续向池中投入10个任务*/
int *workingnum = (int *) malloc (sizeof (int) * 10);
int i;
for (i = 0; i < 10; i++)
{
workingnum[i] = i;
pool_add_worker (myprocess, &workingnum[i]);
}

/*等待所有任务完成*/
sleep (5);

/*销毁线程池*/
pool_destroy ();

free (workingnum);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: