您的位置:首页 > 其它

一个简单的线程池设计模型

2010-06-04 15:14 471 查看
#pragma once
class CXThreadJob
{
public:
CXThreadJob(void);
~CXThreadJob(void);
public:
/* 用户对象纯虚函数 */
virtual void DoJob(void* pParam) = 0;
};

#pragma once
#include <queue>
#include "XThreadJob.h"
class CXThreadPool
{
public:
/*
CXThreadPool(void);
~CXThreadPool(void);
*/
/* 内部数据类型 */
private:

/* 任务结构 */
struct stJobItem
{
/* 任务执行体(函数)*/
void (*m_pFunc)(void* pParam);
/* 函数参数 */
void* m_pParam;

stJobItem(void (*pFunc)(void* pParam0) = NULL, void* pParam = NULL) : m_pFunc(pFunc), m_pParam(pParam)
{
}
};
/* 线程结构 */
struct stThreadItem
{
/* 线程句柄 */
HANDLE m_hThread;

/* 指向所在的线程池 */
CXThreadPool* m_pThreadPool;
/* 最后一次开始运行时间 */
DWORD m_dwLastBeginTime;
/* 运行次数(做过多少任务)*/
DWORD m_dwRunCount;
/* 线程是否在运行中(处理任务中)*/
BOOL m_bRunning;
stThreadItem(CXThreadPool* pThreadPool) : m_hThread(NULL), m_pThreadPool(pThreadPool), m_dwLastBeginTime(0), m_dwRunCount(0), m_bRunning(FALSE)
{
}
~stThreadItem()
{
if (m_hThread != NULL)
{
CloseHandle(m_hThread);
m_hThread = NULL;
}
}
};
/* 用户对象辅助设施 */
static void CallProc(void *pPara)
{
CallProcPara *cp = static_cast<CallProcPara *>(pPara);
if(cp != NULL)
{
cp->m_pStObj->DoJob(cp->m_pPara);
delete cp;
cp = NULL;
}
}

struct CallProcPara
{
CXThreadJob* m_pStObj;
void * m_pPara;
CallProcPara(CXThreadJob* p, void *pPara) : m_pStObj(p), m_pPara(pPara) { };
};
/* 内部数据 */
private:
/* 线程数 */
long m_lThreadNum;
/* 正在运行中的线程数 */
long m_lRunning;
/* 任务结构队列 */
std::queue<stJobItem*> m_qJobQueue;
/* 线程结构集合 */
std::vector<stThreadItem*> m_vecThreadItem;
/* 任务队列保护区 */
CRITICAL_SECTION m_csWorkQueue;
/* 线程结构集合保护区 */
CRITICAL_SECTION m_csThreadVec;
/* 通知所有线程结束 */
HANDLE m_hEventEnd;

/* 所有线程结束应答 */
HANDLE m_hEventComplete;
/* 有新任务添加进来通知 */
HANDLE m_hSemaphoreCall;
/* 删除线程池线程通知 */
HANDLE m_hSemaphoreDel;
/* 默认线程函数(重点理解)*/
protected:
static DWORD WINAPI DefaultJobProc(LPVOID lpParam = NULL)
{
stThreadItem* pThreadItem = (stThreadItem*)lpParam;
CXThreadPool* pThreadPool = pThreadItem->m_pThreadPool;
HANDLE handle[3] = { 0 };
stJobItem* pJob = NULL;
BOOL bHasJob = FALSE;

/* 原子操作 线程数加1*/
//::InterlockedIncrement(&pThreadPool->m_lThreadNum);转到创建中
handle[0] = pThreadPool->m_hSemaphoreCall;
handle[1] = pThreadPool->m_hSemaphoreDel;
handle[2] = pThreadPool->m_hEventEnd;
for (;;)
{
/* 无限等待三个内核对象(有一个就返回)*/
DWORD wr = ::WaitForMultipleObjects(3, handle, FALSE, INFINITE);
/* 如果是(缩减线程数)删除线程 */
if (wr == WAIT_OBJECT_0 + 1)
{
break;
}
/* 其它的先看看任务队列中是否有任务再作决定 */
::EnterCriticalSection(&pThreadPool->m_csWorkQueue);
bHasJob = !pThreadPool->m_qJobQueue.empty();
if (bHasJob)
{
pJob = pThreadPool->m_qJobQueue.front();
pThreadPool->m_qJobQueue.pop();
}
::LeaveCriticalSection(&pThreadPool->m_csWorkQueue);
/* 如果是结束通知,任务得执行完! */
if (wr == WAIT_OBJECT_0 + 2 && !bHasJob)
{
break;
}
if (pJob != NULL && bHasJob)
{
::InterlockedIncrement(&pThreadPool->m_lRunning);
pThreadItem->m_dwLastBeginTime = ::GetTickCount();
pThreadItem->m_dwRunCount++;
pThreadItem->m_bRunning = FALSE;
(*pJob->m_pFunc)(pJob->m_pParam);
::Sleep(100);
delete pJob;
pJob = NULL;
pThreadItem->m_bRunning = FALSE;
::InterlockedDecrement(&pThreadPool->m_lRunning);
}
}//for
/* 缩减线程数或者收到结束通知(任务全部完成)*/
/* 删除自身 */
::EnterCriticalSection(&pThreadPool->m_csThreadVec);
/* 找到自己 */
std::vector<stThreadItem*>::iterator ite = std::find(pThreadPool->m_vecThreadItem.begin(),
pThreadPool->m_vecThreadItem.end(),
pThreadItem);
pThreadPool->m_vecThreadItem.erase(ite);
::LeaveCriticalSection(&pThreadPool->m_csThreadVec);
delete pThreadItem;
pThreadItem = NULL;
::InterlockedDecrement(&pThreadPool->m_lThreadNum);
/* 如果自己是最后一个退出线程就通知主线程 */
if (pThreadPool->m_lThreadNum == 0)
{
::SetEvent(pThreadPool->m_hEventComplete);
}
return 0;
}
public:
/* 礼貌结束线程池并等待 */
BOOL EndAndWait(DWORD dwWaitTime = INFINITE)
{
::SetEvent(m_hEventEnd);
return ::WaitForSingleObject(m_hEventComplete, dwWaitTime) == WAIT_OBJECT_0;
}
/* 粗暴结束线程池并等待 */
BOOL ForceEndAndWait(DWORD dwWaitTime = INFINITE)
{
AdjustSize(-1 * m_lThreadNum);
return ::WaitForSingleObject(m_hEventComplete, dwWaitTime) == WAIT_OBJECT_0;
}

/* 接口函数 */
public:
/* 线程池构造函数 */
CXThreadPool(DWORD dwNum = 4) : m_lThreadNum(0), m_lRunning(0)
{
::InitializeCriticalSection(&m_csThreadVec);
::InitializeCriticalSection(&m_csWorkQueue);
m_hEventComplete = ::CreateEvent(NULL, TRUE, FALSE, NULL);
m_hEventEnd = ::CreateEvent(NULL, TRUE, FALSE, NULL);
m_hSemaphoreCall = ::CreateSemaphore(NULL, 0, 0x7fffffff, NULL);
m_hSemaphoreDel = ::CreateSemaphore(NULL, 0, 0x7fffffff, NULL);
AdjustSize(dwNum <= 0 ? 4 : dwNum);

}

~CXThreadPool()
{

ForceEndAndWait();
CloseHandle(m_hEventComplete);
CloseHandle(m_hEventEnd);
CloseHandle(m_hSemaphoreCall);
CloseHandle(m_hSemaphoreDel);
::DeleteCriticalSection(&m_csThreadVec);
::DeleteCriticalSection(&m_csWorkQueue);
}
/* 调整线程池规模(负数就是代表减少)*/
int AdjustSize(int nNum)
{
if (nNum > 0)
{
stThreadItem* pNew = NULL;
::EnterCriticalSection(&m_csThreadVec);
for(int i = 0; i < nNum; i++)
{
pNew = new stThreadItem(this);
m_vecThreadItem.push_back(pNew);
pNew->m_hThread = ::CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
::InterlockedIncrement(&m_lThreadNum);
}
::LeaveCriticalSection(&m_csThreadVec);
}
else if(nNum < 0)
{
nNum *= -1;
::ReleaseSemaphore(m_hSemaphoreDel, nNum > m_lThreadNum ? m_lThreadNum : nNum, NULL);
}
return (int)m_lThreadNum;
}
/* 调用线程池(函数) */
void Call(void (*pFunc)(void* pParam0), void* pParam)
{
::EnterCriticalSection(&m_csWorkQueue);
m_qJobQueue.push(new stJobItem(pFunc, pParam));
::LeaveCriticalSection(&m_csWorkQueue);
/* 通知线程,任务来了 */
::ReleaseSemaphore(m_hSemaphoreCall, 1, NULL);
}
/* 调用线程池(对象)*/
void Call(CXThreadJob* pJob, void* pParam)
{
Call(CallProc, new CallProcPara(pJob, pParam));
}

/* 相关Getter*/
public:
/* ... */
DWORD GetThreadNum(void) const
{
return m_lThreadNum;
}
};
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: