您的位置:首页 > 其它

线程池

2015-05-29 22:30 330 查看
一些定义

threadpoolmacro.h

#ifndef THREAD_POOL_MACRO_H_
#define THREAD_POOL_MACRO_H_

#define MAX_THREAD_NUM	256
#define MIN_THREAD_NUM 1

#include <process.h>
#include <windows.h>
typedef void (*threadpool_task_fun_)(void*arg);
#endif//end of THTREAD_POOL_MACRO_H_


线程池头文件

ThreadPool.h
/************************************************************************/
/*
File :threadPool.h
Author: Mr.Luo(517832403@qq.com)
Time:2015-05-29
*/
/************************************************************************/
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_
#include "threadpoolmacro.h"

class CThreadPool
{
public:
//构造函数
CThreadPool();

//析构函数
~CThreadPool();
private:
int m_threadNum_;//总线程数,不包括管理线程
HANDLE m_Thread[MAX_THREAD_NUM+1];//空闲线程集合
void **m_TaskQueue;//任务队列传入的参数
threadpool_task_fun_ *m_TaskFun_;//任务队列中传入函数地址
int m_Start ;//队列首
int m_End;//队列尾
int m_MaxTaskNum ; //任务的最大值
HANDLE m_hMutex ;//线程同步互斥锁
HANDLE m_hEvent;//线程同步时间内核对象
public:
//函数初始化线程池对象,成功返回TRUE
//@threadNum:初始化时默认创建的线程数目
//@maxTaskNum:队列中最大缓存的任务数,如果任务超过这个值,那么久丢弃
BOOL ThreadPoolCreate(int threadNum,int maxTaskNum);
//增加一个任务到当前线程池
//@task_fun:用户传入的回调函数,线程执行时将调用该函数
//@arg:为函数传入的参数
//return:成功返回TRUE,失败返回 FALSE
BOOL ThreadPoolAddTask(threadpool_task_fun_ task_fun,void *arg);
//停止线程池中所有的线程
//@return:成功返回TRUE,失败返回FALSE
BOOL ThreadPoolStopAllTask();
//销毁线程池
//@return: 成功返回TRUE,失败返回FALSE
BOOL ThreadPoolDestroy();
//执行线程工作
static DWORD WINAPI ThreadPoolExcuteTaskOne(void* arg);
};
#endif
线程池实现代码

ThreadPool.cpp

#include "ThreadPool.h"
#include <assert.h>
#include <iostream>
/************************************************************************/
/*
@functionName: CThreadPool
@description:构造函数
*/
/************************************************************************/
CThreadPool::CThreadPool()
{
m_hMutex = INVALID_HANDLE_VALUE ;
m_hEvent = INVALID_HANDLE_VALUE;
m_Start  = 1 ;//对列头部
m_End = 1 ;//对列尾部

}
/************************************************************************/
/*
@functionName:~CThreadPool
@description:析构函数
*/
/************************************************************************/
CThreadPool::~CThreadPool()
{
if(m_hMutex != INVALID_HANDLE_VALUE)
{
CloseHandle(m_hMutex);
m_hMutex = INVALID_HANDLE_VALUE ;
}
if(m_hEvent != INVALID_HANDLE_VALUE)
{
CloseHandle(m_hEvent);
m_hEvent = INVALID_HANDLE_VALUE ;
}
}
/************************************************************************/
/*
@functionName:ThreadPoolCreate
@description:创建线程池
@parameters:
int threadNum :创建的线程数量
int maxTaskNum:最大任务数
*/
/************************************************************************/
BOOL CThreadPool::ThreadPoolCreate(int threadNum,int maxTaskNum)
{
if(threadNum > MAX_THREAD_NUM || threadNum < 0)
return FALSE ;
if(threadNum < MIN_THREAD_NUM)
threadNum = MIN_THREAD_NUM;
m_hMutex  = CreateMutex(NULL,FALSE,NULL);//互斥量对象
assert(m_hMutex != INVALID_HANDLE_VALUE);

m_hEvent = CreateEvent(NULL,TRUE,FALSE,NULL); //手动重置事件时等待的线程都被唤醒,设置为自动重置事件时,每次只有一个等待的线程被唤醒

assert(m_hEvent!=INVALID_HANDLE_VALUE);

m_threadNum_ = threadNum ;//线程总数
m_Thread[0] =(void *) threadNum ;//空闲线程数量

m_TaskQueue =(void **)malloc((maxTaskNum+1)*sizeof(void*));//分配
m_TaskQueue[0] = (void*)0 ; //此处很巧妙
m_TaskFun_ = (threadpool_task_fun_*)malloc((maxTaskNum+1)*sizeof(void*));
if(maxTaskNum <=0)
{
maxTaskNum = 10; //设置默认值
}
m_MaxTaskNum = maxTaskNum ;
int i = 0 ;
HANDLE hThread = INVALID_HANDLE_VALUE;
while(threadNum>0)
{
hThread = CreateThread(NULL,0,ThreadPoolExcuteTaskOne,(void*)this,0,NULL) ;
if(hThread == INVALID_HANDLE_VALUE)
{
//停止所有的线程,释放分配的存储空间
ThreadPoolDestroy() ;
return FALSE ;
}
else
{
m_Thread[++i] = hThread ;
--threadNum;
hThread = INVALID_HANDLE_VALUE ;
}
}
std::cout<<"create is success "<<std::endl;
return TRUE ;
}
/************************************************************************/
/*
@functionNanem: ThreadPoolAddTask
@decription:添加处理任务
@parameter
@task_fun: 任务处理函数,回调函数
@arg:回调函数的参数,此参数可以是任何类型的指针
*/
/************************************************************************/
BOOL CThreadPool::ThreadPoolAddTask(threadpool_task_fun_ task_fun,void *arg)
{
std::cout<<"AddTask "<<std::endl;
WaitForSingleObject(m_hMutex,INFINITE);
std::cout<<" m_hMutex"<<std::endl;
int taskNum =(int) m_TaskQueue[0] ;
if(taskNum > m_MaxTaskNum-2) //1-(m_MaxTaskNum-1)之间
{
ReleaseMutex(m_hMutex);
std::cout<<"Throw the task"<<std::endl;
return FALSE ;
}
//添加回调函数的地址和参数
++taskNum ;
m_TaskQueue[0] = (void*)taskNum ;
m_TaskQueue[m_End] = arg ;
m_TaskFun_[m_End]  = task_fun ;

if(++m_End > m_MaxTaskNum)
{
m_End =1 ;
}
std::cout<<"Add Task Success "<<std::endl;
ReleaseMutex(m_hMutex);//释放互斥量内核对象
SetEvent(m_hEvent);

return TRUE;
}
/************************************************************************/
/*
@functionName:ThreadPoolStopAllTask
@description:停止所有的线程
*/
/************************************************************************/
BOOL CThreadPool::ThreadPoolStopAllTask()
{
for(int i =1;i<=(int)m_Thread[0];++i)
{
TerminateThread(m_Thread[i],0); //终止线程
WaitForSingleObject(m_Thread[i],INFINITE); //等待线程终止
}
m_Thread[0] = 0 ;
return TRUE ;
}
/*
线程池销毁
*/
BOOL CThreadPool::ThreadPoolDestroy()
{
free(m_TaskFun_);
free(m_TaskQueue);
ThreadPoolStopAllTask() ;
ReleaseMutex(m_hMutex);
SetEvent(m_hEvent);
return TRUE ;
}
/*
@functionName:ThreadPoolExcuteTaskOne
@description:工作线程
@arg :传递线程池对象,
*/
DWORD  CThreadPool::ThreadPoolExcuteTaskOne(void* arg)
{
CThreadPool *tpool = (CThreadPool*)arg ;
BOOL bExcute = FALSE;
void *tempArg = NULL ;
threadpool_task_fun_ taskFun ;
int TaskNum = 0 ;
while(TRUE)
{
WaitForSingleObject(tpool->m_hMutex,INFINITE);
//如果队列中存在任务
if((TaskNum = (int)tpool->m_TaskQueue[0])>0) //如果有任务有待处理
{
tempArg = tpool->m_TaskQueue[tpool->m_Start] ; //取当前处理任务的参数
taskFun = tpool->m_TaskFun_[tpool->m_Start] ;//当前处理任务的函数
if(++tpool->m_Start > tpool->m_MaxTaskNum)//队列中任务数量调整
tpool->m_Start = 1 ;//队列
--TaskNum ;//任务数量减1
tpool->m_TaskQueue[0] =(void*) TaskNum ;//刷新任务数量
bExcute = TRUE;//执行调度函数
std::cout<<"ThreadId: "<<GetCurrentThreadId()<<"    finished "<<" task! "<<std::endl;
}
else//如果队列中不存在任务
{
ReleaseMutex(tpool->m_hMutex); //一定要先释放互斥量内核对象
WaitForSingleObject(tpool->m_hEvent,INFINITE); //等待任务的到来
ResetEvent(tpool->m_hEvent);
continue;
}
ReleaseMutex(tpool->m_hMutex);
//处理任务
if(bExcute)
{
taskFun(tempArg);
bExcute = FALSE;
}
}
return TRUE ;
}
测试代码

main.cpp

#include "ThreadPool.h"
#include <iostream>

void TaskFun(void *arg)
{
long id = (long)arg;
for(int i = 0 ;i<5;++i)
{
std::cout<<"thread "<<id<<"("<<i<<")"<<std::endl;
Sleep(1);
}
}
int main()
{
int i=0;
CThreadPool pool ;
pool.ThreadPoolCreate(3,10);
//添加任务
for(i = 0 ;i<20;++i)
{
if(pool.ThreadPoolAddTask(TaskFun,(void *)i)==FALSE)
{
std::cout<<"threadPoolAddTask is error"<<std::endl;
}
}
Sleep(1000);
pool.ThreadPoolStopAllTask() ;
pool.ThreadPoolDestroy() ;
std::cout<<"run out"<<std::endl;
system("pause");
return 0 ;
}


此线程池是根据一个大牛(牛人博客)的代码进行改编,改编成windows版。

此版本仍然没有实现动态的对线程进行扩容,下一个版本进行实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: