线程池
2015-05-29 22:30
330 查看
一些定义
threadpoolmacro.h
线程池头文件
ThreadPool.h
ThreadPool.cpp
main.cpp
此线程池是根据一个大牛(牛人博客)的代码进行改编,改编成windows版。
此版本仍然没有实现动态的对线程进行扩容,下一个版本进行实现。
threadpoolmacro.h
#ifndef THREAD_POOL_MACRO_H_ #define THREAD_POOL_MACRO_H_ #define MAX_THREAD_NUM 256 #define MIN_THREAD_NUM 1 #include <process.h> #include <windows.h> typedef void (*threadpool_task_fun_)(void*arg); #endif//end of THTREAD_POOL_MACRO_H_
线程池头文件
ThreadPool.h
/************************************************************************/ /* File :threadPool.h Author: Mr.Luo(517832403@qq.com) Time:2015-05-29 */ /************************************************************************/ #ifndef THREAD_POOL_H_ #define THREAD_POOL_H_ #include "threadpoolmacro.h" class CThreadPool { public: //构造函数 CThreadPool(); //析构函数 ~CThreadPool(); private: int m_threadNum_;//总线程数,不包括管理线程 HANDLE m_Thread[MAX_THREAD_NUM+1];//空闲线程集合 void **m_TaskQueue;//任务队列传入的参数 threadpool_task_fun_ *m_TaskFun_;//任务队列中传入函数地址 int m_Start ;//队列首 int m_End;//队列尾 int m_MaxTaskNum ; //任务的最大值 HANDLE m_hMutex ;//线程同步互斥锁 HANDLE m_hEvent;//线程同步时间内核对象 public: //函数初始化线程池对象,成功返回TRUE //@threadNum:初始化时默认创建的线程数目 //@maxTaskNum:队列中最大缓存的任务数,如果任务超过这个值,那么久丢弃 BOOL ThreadPoolCreate(int threadNum,int maxTaskNum); //增加一个任务到当前线程池 //@task_fun:用户传入的回调函数,线程执行时将调用该函数 //@arg:为函数传入的参数 //return:成功返回TRUE,失败返回 FALSE BOOL ThreadPoolAddTask(threadpool_task_fun_ task_fun,void *arg); //停止线程池中所有的线程 //@return:成功返回TRUE,失败返回FALSE BOOL ThreadPoolStopAllTask(); //销毁线程池 //@return: 成功返回TRUE,失败返回FALSE BOOL ThreadPoolDestroy(); //执行线程工作 static DWORD WINAPI ThreadPoolExcuteTaskOne(void* arg); }; #endif线程池实现代码
ThreadPool.cpp
#include "ThreadPool.h" #include <assert.h> #include <iostream> /************************************************************************/ /* @functionName: CThreadPool @description:构造函数 */ /************************************************************************/ CThreadPool::CThreadPool() { m_hMutex = INVALID_HANDLE_VALUE ; m_hEvent = INVALID_HANDLE_VALUE; m_Start = 1 ;//对列头部 m_End = 1 ;//对列尾部 } /************************************************************************/ /* @functionName:~CThreadPool @description:析构函数 */ /************************************************************************/ CThreadPool::~CThreadPool() { if(m_hMutex != INVALID_HANDLE_VALUE) { CloseHandle(m_hMutex); m_hMutex = INVALID_HANDLE_VALUE ; } if(m_hEvent != INVALID_HANDLE_VALUE) { CloseHandle(m_hEvent); m_hEvent = INVALID_HANDLE_VALUE ; } } /************************************************************************/ /* @functionName:ThreadPoolCreate @description:创建线程池 @parameters: int threadNum :创建的线程数量 int maxTaskNum:最大任务数 */ /************************************************************************/ BOOL CThreadPool::ThreadPoolCreate(int threadNum,int maxTaskNum) { if(threadNum > MAX_THREAD_NUM || threadNum < 0) return FALSE ; if(threadNum < MIN_THREAD_NUM) threadNum = MIN_THREAD_NUM; m_hMutex = CreateMutex(NULL,FALSE,NULL);//互斥量对象 assert(m_hMutex != INVALID_HANDLE_VALUE); m_hEvent = CreateEvent(NULL,TRUE,FALSE,NULL); //手动重置事件时等待的线程都被唤醒,设置为自动重置事件时,每次只有一个等待的线程被唤醒 assert(m_hEvent!=INVALID_HANDLE_VALUE); m_threadNum_ = threadNum ;//线程总数 m_Thread[0] =(void *) threadNum ;//空闲线程数量 m_TaskQueue =(void **)malloc((maxTaskNum+1)*sizeof(void*));//分配 m_TaskQueue[0] = (void*)0 ; //此处很巧妙 m_TaskFun_ = (threadpool_task_fun_*)malloc((maxTaskNum+1)*sizeof(void*)); if(maxTaskNum <=0) { maxTaskNum = 10; //设置默认值 } m_MaxTaskNum = maxTaskNum ; int i = 0 ; HANDLE hThread = INVALID_HANDLE_VALUE; while(threadNum>0) { hThread = CreateThread(NULL,0,ThreadPoolExcuteTaskOne,(void*)this,0,NULL) ; if(hThread == INVALID_HANDLE_VALUE) { //停止所有的线程,释放分配的存储空间 ThreadPoolDestroy() ; return FALSE ; } else { m_Thread[++i] = hThread ; --threadNum; hThread = INVALID_HANDLE_VALUE ; } } std::cout<<"create is success "<<std::endl; return TRUE ; } /************************************************************************/ /* @functionNanem: ThreadPoolAddTask @decription:添加处理任务 @parameter @task_fun: 任务处理函数,回调函数 @arg:回调函数的参数,此参数可以是任何类型的指针 */ /************************************************************************/ BOOL CThreadPool::ThreadPoolAddTask(threadpool_task_fun_ task_fun,void *arg) { std::cout<<"AddTask "<<std::endl; WaitForSingleObject(m_hMutex,INFINITE); std::cout<<" m_hMutex"<<std::endl; int taskNum =(int) m_TaskQueue[0] ; if(taskNum > m_MaxTaskNum-2) //1-(m_MaxTaskNum-1)之间 { ReleaseMutex(m_hMutex); std::cout<<"Throw the task"<<std::endl; return FALSE ; } //添加回调函数的地址和参数 ++taskNum ; m_TaskQueue[0] = (void*)taskNum ; m_TaskQueue[m_End] = arg ; m_TaskFun_[m_End] = task_fun ; if(++m_End > m_MaxTaskNum) { m_End =1 ; } std::cout<<"Add Task Success "<<std::endl; ReleaseMutex(m_hMutex);//释放互斥量内核对象 SetEvent(m_hEvent); return TRUE; } /************************************************************************/ /* @functionName:ThreadPoolStopAllTask @description:停止所有的线程 */ /************************************************************************/ BOOL CThreadPool::ThreadPoolStopAllTask() { for(int i =1;i<=(int)m_Thread[0];++i) { TerminateThread(m_Thread[i],0); //终止线程 WaitForSingleObject(m_Thread[i],INFINITE); //等待线程终止 } m_Thread[0] = 0 ; return TRUE ; } /* 线程池销毁 */ BOOL CThreadPool::ThreadPoolDestroy() { free(m_TaskFun_); free(m_TaskQueue); ThreadPoolStopAllTask() ; ReleaseMutex(m_hMutex); SetEvent(m_hEvent); return TRUE ; } /* @functionName:ThreadPoolExcuteTaskOne @description:工作线程 @arg :传递线程池对象, */ DWORD CThreadPool::ThreadPoolExcuteTaskOne(void* arg) { CThreadPool *tpool = (CThreadPool*)arg ; BOOL bExcute = FALSE; void *tempArg = NULL ; threadpool_task_fun_ taskFun ; int TaskNum = 0 ; while(TRUE) { WaitForSingleObject(tpool->m_hMutex,INFINITE); //如果队列中存在任务 if((TaskNum = (int)tpool->m_TaskQueue[0])>0) //如果有任务有待处理 { tempArg = tpool->m_TaskQueue[tpool->m_Start] ; //取当前处理任务的参数 taskFun = tpool->m_TaskFun_[tpool->m_Start] ;//当前处理任务的函数 if(++tpool->m_Start > tpool->m_MaxTaskNum)//队列中任务数量调整 tpool->m_Start = 1 ;//队列 --TaskNum ;//任务数量减1 tpool->m_TaskQueue[0] =(void*) TaskNum ;//刷新任务数量 bExcute = TRUE;//执行调度函数 std::cout<<"ThreadId: "<<GetCurrentThreadId()<<" finished "<<" task! "<<std::endl; } else//如果队列中不存在任务 { ReleaseMutex(tpool->m_hMutex); //一定要先释放互斥量内核对象 WaitForSingleObject(tpool->m_hEvent,INFINITE); //等待任务的到来 ResetEvent(tpool->m_hEvent); continue; } ReleaseMutex(tpool->m_hMutex); //处理任务 if(bExcute) { taskFun(tempArg); bExcute = FALSE; } } return TRUE ; }测试代码
main.cpp
#include "ThreadPool.h" #include <iostream> void TaskFun(void *arg) { long id = (long)arg; for(int i = 0 ;i<5;++i) { std::cout<<"thread "<<id<<"("<<i<<")"<<std::endl; Sleep(1); } } int main() { int i=0; CThreadPool pool ; pool.ThreadPoolCreate(3,10); //添加任务 for(i = 0 ;i<20;++i) { if(pool.ThreadPoolAddTask(TaskFun,(void *)i)==FALSE) { std::cout<<"threadPoolAddTask is error"<<std::endl; } } Sleep(1000); pool.ThreadPoolStopAllTask() ; pool.ThreadPoolDestroy() ; std::cout<<"run out"<<std::endl; system("pause"); return 0 ; }
此线程池是根据一个大牛(牛人博客)的代码进行改编,改编成windows版。
此版本仍然没有实现动态的对线程进行扩容,下一个版本进行实现。
相关文章推荐
- TestNG超详细教程
- ngx_http_concat_module
- 五步学会Android的ListView控件
- hdu 2093 考试排名(结构体排序)
- 傻瓜的SPI的通信
- openstack ecosystem
- 五月学习感悟
- 大盘暴跌是赚钱的什么好机会?
- 疯狂Java学习笔记(57)------------NIO:浅析I/O模型
- 疯狂Java学习笔记(58)-----------NIO概述
- 虚拟化基础知识
- awk命令详解
- 将github上的工程导入到Myeclipse中
- oracle 优化器之执行计划
- 第五次作业
- selenium简单操作浏览器
- 天敏T2四核增强版ROOT方法
- 友情链接的添加保存
- [EF] 如何在 Entity Framework 中以手动方式设定 Code First 的 Migration 作业
- Photoshop 与网站设计