线程池相关源码(网上搜得)
2012-02-01 10:01
183 查看
//ThreadPool.h 线程池头文件 #ifndef OTA_THREADPOOL_H_CAESAR__DEF #define OTA_THREADPOOL_H_CAESAR__DEF #include <windows.h> class CWorkDesc { public: CWorkDesc(){} virtual ~CWorkDesc(){} }; class CWork { public: CWork(){} virtual ~CWork(){} virtual void ProcessJob(CWorkDesc* pJob)=0; }; class CThreadPool { friend static unsigned __stdcall ManagerProc(void* pThread); friend static unsigned __stdcall WorkerProc(void* pThread); typedef struct THREADINFO { THREADINFO() { hThreadHandle = NULL; dwThreadID = 0; bIsBusy = false; bIsQuit = false; } HANDLE hThreadHandle; unsigned dwThreadID; volatile bool bIsBusy; volatile bool bIsQuit; }*LPTHREADINFO; public: CThreadPool(); virtual ~CThreadPool(); enum EReturnValue { MANAGERPROC_RETURN_VALUE = 10001, WORKERPROC_RETURN_VALUE = 10002 }; enum EThreadStatus { BUSY, NORMAL, IDLE }; bool Start(WORD wStatic,WORD wMax); void Stop(void); void ProcessJob(CWorkDesc* pJob, CWork* pWorker) const; protected: static unsigned __stdcall ManagerProc(void* pThread); static unsigned __stdcall WorkerProc(void* pThread); LPTHREADINFO m_pThreadInfo; volatile WORD m_wStaticThreadNum; volatile WORD m_wMaxThreadNum; volatile bool m_bQuitManager; DWORD m_dwMSeconds; HANDLE m_hManagerIO; HANDLE m_hWorkerIO; HANDLE m_hManagerThread; CRITICAL_SECTION m_csLock; private: CThreadPool::EThreadStatus GetWorkThreadStatus(); void AddThread(void); void DelThread(void); int GetThreadbyID(DWORD dwID); }; #endif
//ThreadPool.cpp
#include "StdAfx.h" #include "ThreadPool.h" #include "process.h" CThreadPool::CThreadPool() { m_hManagerIO = NULL; m_hWorkerIO = NULL; m_hManagerThread= NULL; m_pThreadInfo = NULL; m_dwMSeconds = 200; m_bQuitManager = false; //初使化互斥体 ::InitializeCriticalSection(&m_csLock); } CThreadPool::~CThreadPool() { //关闭IO if(m_hManagerIO) { ::CloseHandle(m_hManagerIO); } if(m_hWorkerIO) { ::CloseHandle(m_hWorkerIO); } if(m_pThreadInfo) { delete [] m_pThreadInfo; } //关闭互斥体 ::DeleteCriticalSection(&m_csLock); } bool CThreadPool::Start(WORD wStatic,WORD wMax) { if(!(wStatic && wMax)) return false; m_wStaticThreadNum = wStatic; m_wMaxThreadNum = wMax; //LOCK ::EnterCriticalSection(&m_csLock); //创建工作线程数据 if(m_pThreadInfo) { delete [] m_pThreadInfo; m_pThreadInfo = NULL; } m_pThreadInfo = new THREADINFO[wMax](); if(m_pThreadInfo == NULL) return false; //创建异步的不关联文件的完全IO端口 if(m_hManagerIO) { ::CloseHandle(m_hManagerIO); m_hManagerIO = NULL; } m_hManagerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); if(m_hManagerIO == NULL) { return false; } if(m_hWorkerIO) { ::CloseHandle(m_hWorkerIO); m_hManagerIO = NULL; } m_hWorkerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); if(m_hWorkerIO == NULL) { return false; } //创建管理线程 m_bQuitManager = false; if(m_hManagerThread) { ::TerminateThread(m_hManagerThread,0); m_hManagerThread = NULL; } unsigned ManagerThreadID; m_hManagerThread = (HANDLE)_beginthreadex(NULL,0,ManagerProc,this,0,&ManagerThreadID); if(m_hManagerThread == NULL) { return false; } //创建工作线程 for(WORD i=0;i<wStatic;++i) { m_pThreadInfo[i].hThreadHandle = (HANDLE)_beginthreadex(NULL,0,WorkerProc,this,0,&m_pThreadInfo[i].dwThreadID); } //UNLOCK ::LeaveCriticalSection(&m_csLock); return true; } void CThreadPool::Stop(void) { //LOCK ::EnterCriticalSection(&m_csLock); //向管理线程发送关闭线程消息 m_bQuitManager = true; //判断并关闭管理线程 DWORD dwRes; for(int i=0;i<10;i++) { ::GetExitCodeThread(m_hManagerThread,&dwRes); if(dwRes == CThreadPool::MANAGERPROC_RETURN_VALUE) { break; } if(i == 9) { //关闭线程 ::TerminateThread(m_hManagerThread,0); } else { ::Sleep(1000); } } //关闭IO ::CloseHandle(m_hManagerIO); m_hManagerIO = NULL; //关闭所有工作线程 for(i=0;i<m_wMaxThreadNum;i++) { if(m_pThreadInfo[i].dwThreadID == 0) { continue; } m_pThreadInfo[i].bIsQuit = true; for(int j=0;j<10;j++) { ::GetExitCodeThread(m_pThreadInfo[i].hThreadHandle,&dwRes); if(dwRes == CThreadPool::WORKERPROC_RETURN_VALUE) { break; } if(j == 9) { ::TerminateThread(m_pThreadInfo[i].hThreadHandle,0); } else { ::Sleep(500); } } } //关闭IO ::CloseHandle(m_hWorkerIO); m_hWorkerIO = NULL; //删除线程结构 if(m_pThreadInfo) { delete [] m_pThreadInfo; m_pThreadInfo = NULL; } //删除所有代处理数据 unsigned long pN1,pN2; OVERLAPPED* pOverLapped; while(::GetQueuedCompletionStatus(m_hWorkerIO,&pN1,&pN2,&pOverLapped,0)) { CWork* pWork = reinterpret_cast<CWork*>(pN1); CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2); delete pWorkDesc; } //UNLOCK ::LeaveCriticalSection(&m_csLock); } void CThreadPool::ProcessJob(CWorkDesc* pJob, CWork* pWorker) const { ::PostQueuedCompletionStatus(m_hWorkerIO, reinterpret_cast<DWORD>(pWorker), reinterpret_cast<DWORD>(pJob), NULL); } unsigned __stdcall CThreadPool::ManagerProc(void* pThread) { unsigned long pN1, pN2; OVERLAPPED* pOverLapped; CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); while(!pServer->m_bQuitManager) { if(::GetQueuedCompletionStatus(pServer->m_hManagerIO,&pN1,&pN2,&pOverLapped,pServer->m_dwMSeconds) == TRUE) { if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF) { //收到退出码 break; } } else { //超时,判断工作线程状态 EThreadStatus stat = pServer->GetWorkThreadStatus(); if(stat == CThreadPool::BUSY) { puts("Add thread"); pServer->AddThread(); } else if(stat == CThreadPool::IDLE) { puts("Del thread"); pServer->DelThread(); } } }; _endthreadex(CThreadPool::MANAGERPROC_RETURN_VALUE); return CThreadPool::MANAGERPROC_RETURN_VALUE; } unsigned __stdcall CThreadPool::WorkerProc(void* pThread) { unsigned long pN1, pN2; OVERLAPPED* pOverLapped; CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); DWORD threadID = ::GetCurrentThreadId(); int nSeq = pServer->GetThreadbyID(threadID); if(nSeq<0) { return 0; } while(!pServer->m_pThreadInfo[nSeq].bIsQuit) { if(::GetQueuedCompletionStatus(pServer->m_hWorkerIO,&pN1,&pN2,&pOverLapped,pServer->m_dwMSeconds)) { CWork* pWork = reinterpret_cast<CWork*>(pN1); CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2); printf("do work\n"); //在工作之前将状态设置为Busy pServer->m_pThreadInfo[nSeq].bIsBusy = true; //工作处理过程 pWork->ProcessJob(pWorkDesc); delete pWorkDesc; //在工作后将状态设置为非Busy pServer->m_pThreadInfo[nSeq].bIsBusy = false; printf("do work over\n"); } } printf("worker thread down\n"); //退出之前将线程ID设置为0 pServer->m_pThreadInfo[nSeq].dwThreadID = 0; _endthreadex(CThreadPool::WORKERPROC_RETURN_VALUE); return CThreadPool::WORKERPROC_RETURN_VALUE; } CThreadPool::EThreadStatus CThreadPool::GetWorkThreadStatus() { float fAll=0.0,fRun=0.0; for(WORD wi=0;wi<m_wMaxThreadNum;++wi) { if(m_pThreadInfo[wi].dwThreadID) { fAll++; if(m_pThreadInfo[wi].bIsBusy) { fRun++; } } } if(fAll == 0) return CThreadPool::IDLE; if(fRun/(1.0*fAll)>0.8) { return CThreadPool::BUSY; } if(fRun/(1.0*fAll)<0.2) { return CThreadPool::IDLE; } return CThreadPool::NORMAL; } void CThreadPool::AddThread(void) { for(WORD wi=m_wStaticThreadNum;wi<m_wMaxThreadNum;++wi) { if(!m_pThreadInfo[wi].dwThreadID) { m_pThreadInfo[wi].bIsBusy = false; m_pThreadInfo[wi].bIsQuit = false; m_pThreadInfo[wi].hThreadHandle = (HANDLE)_beginthreadex(NULL,0,WorkerProc,this,0,&m_pThreadInfo[wi].dwThreadID); break; } } } void CThreadPool::DelThread(void) { for(WORD wi=m_wMaxThreadNum;wi>m_wStaticThreadNum;--wi) { if(m_pThreadInfo[wi-1].dwThreadID) { m_pThreadInfo[wi-1].bIsQuit = true; ::Sleep(m_dwMSeconds); break; } } } int CThreadPool::GetThreadbyID(DWORD dwID) { for(WORD wi=0;wi<m_wMaxThreadNum;wi++) { if(m_pThreadInfo[wi].dwThreadID == dwID) { return wi; } } return -1; }
相关文章推荐
- 多线程进阶与源码分析--线程池相关(一)
- Deep Learning(深度学习)-----我整理了网上与此相关的资料
- NetShopForge网上商店程序(VB)源码—讨论-发布
- Java 线程池源码
- [Hadoop源码解读](四)MapReduce篇之Counter相关类
- 从网上下载文件(源码)
- 基于TCP网络通信的自动升级程序源码分析--生成升级文件相关的配置文件
- SEO的HTML布局规范 (收集网上关于SEO的相关信息)
- NGINX----源码阅读---数据结构---配置相关
- Okhttp源码解析(一)——Http相关知识
- 一个聊天室程序的服务器端,有相关说明与源码!!!
- 动态规划相关问题源码(包括矩阵链乘、LCS、和max sum)
- Redis2.2.2源码学习——Server&Client链接的建立以及相关Event
- 编译curl 源码包出现"TESTFAIL: These test cases failed: 20 172 507 "等相关的错误
- Androidstudio-framwork开发MTK平台,导入关联源码及AS一些相关设置
- springmvc基本配置及相关源码解读
- 超详细的java线程池源码解读
- HashMap相关源码阅读
- JDK中多线程之JUC线程池的JDK源码解读配合大神的一起看,秒懂。
- 计算机视觉相关的部分测试数据集和源码站点