一个简单的线程池设计模型
2010-06-04 15:14
471 查看
#pragma once class CXThreadJob { public: CXThreadJob(void); ~CXThreadJob(void); public: /* 用户对象纯虚函数 */ virtual void DoJob(void* pParam) = 0; };
#pragma once #include <queue> #include "XThreadJob.h" class CXThreadPool { public: /* CXThreadPool(void); ~CXThreadPool(void); */ /* 内部数据类型 */ private: /* 任务结构 */ struct stJobItem { /* 任务执行体(函数)*/ void (*m_pFunc)(void* pParam); /* 函数参数 */ void* m_pParam; stJobItem(void (*pFunc)(void* pParam0) = NULL, void* pParam = NULL) : m_pFunc(pFunc), m_pParam(pParam) { } }; /* 线程结构 */ struct stThreadItem { /* 线程句柄 */ HANDLE m_hThread; /* 指向所在的线程池 */ CXThreadPool* m_pThreadPool; /* 最后一次开始运行时间 */ DWORD m_dwLastBeginTime; /* 运行次数(做过多少任务)*/ DWORD m_dwRunCount; /* 线程是否在运行中(处理任务中)*/ BOOL m_bRunning; stThreadItem(CXThreadPool* pThreadPool) : m_hThread(NULL), m_pThreadPool(pThreadPool), m_dwLastBeginTime(0), m_dwRunCount(0), m_bRunning(FALSE) { } ~stThreadItem() { if (m_hThread != NULL) { CloseHandle(m_hThread); m_hThread = NULL; } } }; /* 用户对象辅助设施 */ static void CallProc(void *pPara) { CallProcPara *cp = static_cast<CallProcPara *>(pPara); if(cp != NULL) { cp->m_pStObj->DoJob(cp->m_pPara); delete cp; cp = NULL; } } struct CallProcPara { CXThreadJob* m_pStObj; void * m_pPara; CallProcPara(CXThreadJob* p, void *pPara) : m_pStObj(p), m_pPara(pPara) { }; }; /* 内部数据 */ private: /* 线程数 */ long m_lThreadNum; /* 正在运行中的线程数 */ long m_lRunning; /* 任务结构队列 */ std::queue<stJobItem*> m_qJobQueue; /* 线程结构集合 */ std::vector<stThreadItem*> m_vecThreadItem; /* 任务队列保护区 */ CRITICAL_SECTION m_csWorkQueue; /* 线程结构集合保护区 */ CRITICAL_SECTION m_csThreadVec; /* 通知所有线程结束 */ HANDLE m_hEventEnd; /* 所有线程结束应答 */ HANDLE m_hEventComplete; /* 有新任务添加进来通知 */ HANDLE m_hSemaphoreCall; /* 删除线程池线程通知 */ HANDLE m_hSemaphoreDel; /* 默认线程函数(重点理解)*/ protected: static DWORD WINAPI DefaultJobProc(LPVOID lpParam = NULL) { stThreadItem* pThreadItem = (stThreadItem*)lpParam; CXThreadPool* pThreadPool = pThreadItem->m_pThreadPool; HANDLE handle[3] = { 0 }; stJobItem* pJob = NULL; BOOL bHasJob = FALSE; /* 原子操作 线程数加1*/ //::InterlockedIncrement(&pThreadPool->m_lThreadNum);转到创建中 handle[0] = pThreadPool->m_hSemaphoreCall; handle[1] = pThreadPool->m_hSemaphoreDel; handle[2] = pThreadPool->m_hEventEnd; for (;;) { /* 无限等待三个内核对象(有一个就返回)*/ DWORD wr = ::WaitForMultipleObjects(3, handle, FALSE, INFINITE); /* 如果是(缩减线程数)删除线程 */ if (wr == WAIT_OBJECT_0 + 1) { break; } /* 其它的先看看任务队列中是否有任务再作决定 */ ::EnterCriticalSection(&pThreadPool->m_csWorkQueue); bHasJob = !pThreadPool->m_qJobQueue.empty(); if (bHasJob) { pJob = pThreadPool->m_qJobQueue.front(); pThreadPool->m_qJobQueue.pop(); } ::LeaveCriticalSection(&pThreadPool->m_csWorkQueue); /* 如果是结束通知,任务得执行完! */ if (wr == WAIT_OBJECT_0 + 2 && !bHasJob) { break; } if (pJob != NULL && bHasJob) { ::InterlockedIncrement(&pThreadPool->m_lRunning); pThreadItem->m_dwLastBeginTime = ::GetTickCount(); pThreadItem->m_dwRunCount++; pThreadItem->m_bRunning = FALSE; (*pJob->m_pFunc)(pJob->m_pParam); ::Sleep(100); delete pJob; pJob = NULL; pThreadItem->m_bRunning = FALSE; ::InterlockedDecrement(&pThreadPool->m_lRunning); } }//for /* 缩减线程数或者收到结束通知(任务全部完成)*/ /* 删除自身 */ ::EnterCriticalSection(&pThreadPool->m_csThreadVec); /* 找到自己 */ std::vector<stThreadItem*>::iterator ite = std::find(pThreadPool->m_vecThreadItem.begin(), pThreadPool->m_vecThreadItem.end(), pThreadItem); pThreadPool->m_vecThreadItem.erase(ite); ::LeaveCriticalSection(&pThreadPool->m_csThreadVec); delete pThreadItem; pThreadItem = NULL; ::InterlockedDecrement(&pThreadPool->m_lThreadNum); /* 如果自己是最后一个退出线程就通知主线程 */ if (pThreadPool->m_lThreadNum == 0) { ::SetEvent(pThreadPool->m_hEventComplete); } return 0; } public: /* 礼貌结束线程池并等待 */ BOOL EndAndWait(DWORD dwWaitTime = INFINITE) { ::SetEvent(m_hEventEnd); return ::WaitForSingleObject(m_hEventComplete, dwWaitTime) == WAIT_OBJECT_0; } /* 粗暴结束线程池并等待 */ BOOL ForceEndAndWait(DWORD dwWaitTime = INFINITE) { AdjustSize(-1 * m_lThreadNum); return ::WaitForSingleObject(m_hEventComplete, dwWaitTime) == WAIT_OBJECT_0; } /* 接口函数 */ public: /* 线程池构造函数 */ CXThreadPool(DWORD dwNum = 4) : m_lThreadNum(0), m_lRunning(0) { ::InitializeCriticalSection(&m_csThreadVec); ::InitializeCriticalSection(&m_csWorkQueue); m_hEventComplete = ::CreateEvent(NULL, TRUE, FALSE, NULL); m_hEventEnd = ::CreateEvent(NULL, TRUE, FALSE, NULL); m_hSemaphoreCall = ::CreateSemaphore(NULL, 0, 0x7fffffff, NULL); m_hSemaphoreDel = ::CreateSemaphore(NULL, 0, 0x7fffffff, NULL); AdjustSize(dwNum <= 0 ? 4 : dwNum); } ~CXThreadPool() { ForceEndAndWait(); CloseHandle(m_hEventComplete); CloseHandle(m_hEventEnd); CloseHandle(m_hSemaphoreCall); CloseHandle(m_hSemaphoreDel); ::DeleteCriticalSection(&m_csThreadVec); ::DeleteCriticalSection(&m_csWorkQueue); } /* 调整线程池规模(负数就是代表减少)*/ int AdjustSize(int nNum) { if (nNum > 0) { stThreadItem* pNew = NULL; ::EnterCriticalSection(&m_csThreadVec); for(int i = 0; i < nNum; i++) { pNew = new stThreadItem(this); m_vecThreadItem.push_back(pNew); pNew->m_hThread = ::CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL); ::InterlockedIncrement(&m_lThreadNum); } ::LeaveCriticalSection(&m_csThreadVec); } else if(nNum < 0) { nNum *= -1; ::ReleaseSemaphore(m_hSemaphoreDel, nNum > m_lThreadNum ? m_lThreadNum : nNum, NULL); } return (int)m_lThreadNum; } /* 调用线程池(函数) */ void Call(void (*pFunc)(void* pParam0), void* pParam) { ::EnterCriticalSection(&m_csWorkQueue); m_qJobQueue.push(new stJobItem(pFunc, pParam)); ::LeaveCriticalSection(&m_csWorkQueue); /* 通知线程,任务来了 */ ::ReleaseSemaphore(m_hSemaphoreCall, 1, NULL); } /* 调用线程池(对象)*/ void Call(CXThreadJob* pJob, void* pParam) { Call(CallProc, new CallProcPara(pJob, pParam)); } /* 相关Getter*/ public: /* ... */ DWORD GetThreadNum(void) const { return m_lThreadNum; } };
相关文章推荐
- epoll一个简单模型设计
- Linux下设计一个简单的线程池
- 线程池的概念及Linux 怎么设计一个简单的线程池
- 第六章:通过mvc模型设计一个简单的留言系统
- 一个基于线程池的WSAEventSelect模型服务器设计
- Linux下设计一个简单的线程池
- Linux下设计一个简单的线程池
- Linux下设计一个简单的线程池
- Linux下设计一个简单的线程池
- 一个基于线程池的WSAEventSelect模型服务器设计
- 一个WinForm程序配置信息的简单模型和维护工具——设计说明
- 线程池的概念及Linux 怎么设计一个简单的线程池
- 2-1. Creating a Simple Model 使用图形界面设计器创建一个简单的模型
- Linux下设计一个简单的线程池
- Linux下设计一个简单的线程池
- Linux下设计一个简单的线程池
- 一个通用简单线程池实现的初步封装(C语言)
- 使用common-pool实现的一个简单的线程池
- 一个简单的powerdesigner模型(oom,pdm)分析器
- 一天学一个设计模式之(二):简单工厂模式