您的位置:首页 > 其它

线程池相关源码(网上搜得)

2012-02-01 10:01 183 查看
//ThreadPool.h 线程池头文件
#ifndef OTA_THREADPOOL_H_CAESAR__DEF
#define OTA_THREADPOOL_H_CAESAR__DEF

#include <windows.h>

class CWorkDesc
{
public:
CWorkDesc(){}

virtual ~CWorkDesc(){}
};

class CWork
{
public:
CWork(){}
virtual	~CWork(){}
virtual void ProcessJob(CWorkDesc* pJob)=0;
};

class CThreadPool
{
friend	static	unsigned __stdcall ManagerProc(void* pThread);
friend static	unsigned __stdcall WorkerProc(void* pThread);

typedef struct  THREADINFO
{
THREADINFO()
{
hThreadHandle	=	NULL;
dwThreadID		=	0;
bIsBusy			=	false;
bIsQuit			=	false;
}
HANDLE		hThreadHandle;
unsigned	dwThreadID;
volatile bool		bIsBusy;
volatile bool		bIsQuit;
}*LPTHREADINFO;

public:
CThreadPool();
virtual ~CThreadPool();

enum	EReturnValue
{
MANAGERPROC_RETURN_VALUE	=	10001,
WORKERPROC_RETURN_VALUE		=	10002
};

enum	EThreadStatus
{
BUSY,
NORMAL,
IDLE
};

bool Start(WORD	wStatic,WORD wMax);
void Stop(void);
void ProcessJob(CWorkDesc* pJob, CWork* pWorker) const;

protected:

static	unsigned __stdcall ManagerProc(void* pThread);
static	unsigned __stdcall WorkerProc(void* pThread);

LPTHREADINFO		m_pThreadInfo;

volatile WORD		m_wStaticThreadNum;
volatile WORD		m_wMaxThreadNum;
volatile bool		m_bQuitManager;
DWORD				m_dwMSeconds;

HANDLE				m_hManagerIO;
HANDLE				m_hWorkerIO;

HANDLE				m_hManagerThread;
CRITICAL_SECTION	m_csLock;
private:
CThreadPool::EThreadStatus	GetWorkThreadStatus();
void	AddThread(void);
void	DelThread(void);
int		GetThreadbyID(DWORD dwID);
};
#endif


//ThreadPool.cpp

#include "StdAfx.h"
#include "ThreadPool.h"
#include "process.h"

CThreadPool::CThreadPool()
{
m_hManagerIO	=	NULL;
m_hWorkerIO		=	NULL;

m_hManagerThread=	NULL;
m_pThreadInfo	=	NULL;

m_dwMSeconds	=	200;
m_bQuitManager	=	false;
//初使化互斥体
::InitializeCriticalSection(&m_csLock);
}

CThreadPool::~CThreadPool()
{
//关闭IO
if(m_hManagerIO)
{
::CloseHandle(m_hManagerIO);
}
if(m_hWorkerIO)
{
::CloseHandle(m_hWorkerIO);
}
if(m_pThreadInfo)
{
delete	[]	m_pThreadInfo;
}
//关闭互斥体
::DeleteCriticalSection(&m_csLock);
}

bool CThreadPool::Start(WORD	wStatic,WORD wMax)
{
if(!(wStatic && wMax))
return false;

m_wStaticThreadNum	=	wStatic;
m_wMaxThreadNum		=	wMax;

//LOCK
::EnterCriticalSection(&m_csLock);

//创建工作线程数据
if(m_pThreadInfo)
{
delete	[]	m_pThreadInfo;
m_pThreadInfo	=	NULL;
}
m_pThreadInfo	=	new	THREADINFO[wMax]();
if(m_pThreadInfo	==	NULL)
return false;

//创建异步的不关联文件的完全IO端口
if(m_hManagerIO)
{
::CloseHandle(m_hManagerIO);
m_hManagerIO	=	NULL;
}
m_hManagerIO	=	::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
if(m_hManagerIO	==	NULL)
{
return false;
}

if(m_hWorkerIO)
{
::CloseHandle(m_hWorkerIO);
m_hManagerIO	=	NULL;
}
m_hWorkerIO		=	::CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
if(m_hWorkerIO	==	NULL)
{
return false;
}

//创建管理线程
m_bQuitManager	=	false;

if(m_hManagerThread)
{
::TerminateThread(m_hManagerThread,0);
m_hManagerThread	=	NULL;
}
unsigned ManagerThreadID;

m_hManagerThread	=	(HANDLE)_beginthreadex(NULL,0,ManagerProc,this,0,&ManagerThreadID);

if(m_hManagerThread == NULL)
{
return false;
}

//创建工作线程
for(WORD i=0;i<wStatic;++i)
{
m_pThreadInfo[i].hThreadHandle	=	(HANDLE)_beginthreadex(NULL,0,WorkerProc,this,0,&m_pThreadInfo[i].dwThreadID);
}

//UNLOCK
::LeaveCriticalSection(&m_csLock);

return true;
}

void CThreadPool::Stop(void)
{
//LOCK
::EnterCriticalSection(&m_csLock);

//向管理线程发送关闭线程消息
m_bQuitManager	=	true;

//判断并关闭管理线程
DWORD	dwRes;

for(int i=0;i<10;i++)
{
::GetExitCodeThread(m_hManagerThread,&dwRes);
if(dwRes	==	CThreadPool::MANAGERPROC_RETURN_VALUE)
{
break;
}
if(i	==	9)
{
//关闭线程
::TerminateThread(m_hManagerThread,0);
}
else
{
::Sleep(1000);
}
}

//关闭IO
::CloseHandle(m_hManagerIO);
m_hManagerIO	=	NULL;

//关闭所有工作线程

for(i=0;i<m_wMaxThreadNum;i++)
{
if(m_pThreadInfo[i].dwThreadID	==	0)
{
continue;
}

m_pThreadInfo[i].bIsQuit	=	true;

for(int j=0;j<10;j++)
{
::GetExitCodeThread(m_pThreadInfo[i].hThreadHandle,&dwRes);

if(dwRes	==	CThreadPool::WORKERPROC_RETURN_VALUE)
{
break;
}
if(j == 9)
{
::TerminateThread(m_pThreadInfo[i].hThreadHandle,0);
}
else
{
::Sleep(500);
}
}
}

//关闭IO
::CloseHandle(m_hWorkerIO);
m_hWorkerIO	=	NULL;

//删除线程结构
if(m_pThreadInfo)
{
delete	[]	m_pThreadInfo;
m_pThreadInfo	=	NULL;
}

//删除所有代处理数据
unsigned long						pN1,pN2;
OVERLAPPED*							pOverLapped;

while(::GetQueuedCompletionStatus(m_hWorkerIO,&pN1,&pN2,&pOverLapped,0))
{
CWork*		pWork		=	reinterpret_cast<CWork*>(pN1);
CWorkDesc*	pWorkDesc	=	reinterpret_cast<CWorkDesc*>(pN2);

delete	pWorkDesc;
}

//UNLOCK
::LeaveCriticalSection(&m_csLock);
}

void CThreadPool::ProcessJob(CWorkDesc* pJob, CWork* pWorker) const
{
::PostQueuedCompletionStatus(m_hWorkerIO,
reinterpret_cast<DWORD>(pWorker),
reinterpret_cast<DWORD>(pJob),
NULL);
}

unsigned __stdcall CThreadPool::ManagerProc(void* pThread)
{
unsigned long						pN1, pN2;
OVERLAPPED*							pOverLapped;

CThreadPool* pServer	=	reinterpret_cast<CThreadPool*>(pThread);

while(!pServer->m_bQuitManager)
{
if(::GetQueuedCompletionStatus(pServer->m_hManagerIO,&pN1,&pN2,&pOverLapped,pServer->m_dwMSeconds)	==	TRUE)
{
if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
{
//收到退出码
break;
}
}
else
{
//超时,判断工作线程状态
EThreadStatus	stat	=	 pServer->GetWorkThreadStatus();
if(stat	==	CThreadPool::BUSY)
{
puts("Add thread");
pServer->AddThread();
}
else
if(stat == CThreadPool::IDLE)
{
puts("Del thread");
pServer->DelThread();
}
}
};

_endthreadex(CThreadPool::MANAGERPROC_RETURN_VALUE);

return CThreadPool::MANAGERPROC_RETURN_VALUE;
}

unsigned __stdcall CThreadPool::WorkerProc(void* pThread)
{
unsigned long						pN1, pN2;
OVERLAPPED*							pOverLapped;
CThreadPool* pServer	=	reinterpret_cast<CThreadPool*>(pThread);

DWORD	threadID	=	::GetCurrentThreadId();
int		nSeq		=	pServer->GetThreadbyID(threadID);

if(nSeq<0)
{
return 0;
}

while(!pServer->m_pThreadInfo[nSeq].bIsQuit)
{
if(::GetQueuedCompletionStatus(pServer->m_hWorkerIO,&pN1,&pN2,&pOverLapped,pServer->m_dwMSeconds))
{
CWork*		pWork		=	reinterpret_cast<CWork*>(pN1);
CWorkDesc*	pWorkDesc	=	reinterpret_cast<CWorkDesc*>(pN2);

printf("do work\n");
//在工作之前将状态设置为Busy
pServer->m_pThreadInfo[nSeq].bIsBusy	=	true;

//工作处理过程
pWork->ProcessJob(pWorkDesc);
delete	pWorkDesc;

//在工作后将状态设置为非Busy
pServer->m_pThreadInfo[nSeq].bIsBusy	=	false;
printf("do work over\n");
}
}

printf("worker thread down\n");

//退出之前将线程ID设置为0
pServer->m_pThreadInfo[nSeq].dwThreadID	=	0;

_endthreadex(CThreadPool::WORKERPROC_RETURN_VALUE);

return CThreadPool::WORKERPROC_RETURN_VALUE;
}

CThreadPool::EThreadStatus	CThreadPool::GetWorkThreadStatus()
{
float fAll=0.0,fRun=0.0;
for(WORD wi=0;wi<m_wMaxThreadNum;++wi)
{
if(m_pThreadInfo[wi].dwThreadID)
{
fAll++;
if(m_pThreadInfo[wi].bIsBusy)
{
fRun++;
}
}
}

if(fAll	==	0)
return CThreadPool::IDLE;

if(fRun/(1.0*fAll)>0.8)
{
return CThreadPool::BUSY;
}
if(fRun/(1.0*fAll)<0.2)
{
return CThreadPool::IDLE;
}

return CThreadPool::NORMAL;
}

void	CThreadPool::AddThread(void)
{
for(WORD wi=m_wStaticThreadNum;wi<m_wMaxThreadNum;++wi)
{
if(!m_pThreadInfo[wi].dwThreadID)
{
m_pThreadInfo[wi].bIsBusy		=	false;
m_pThreadInfo[wi].bIsQuit		=	false;
m_pThreadInfo[wi].hThreadHandle	=	(HANDLE)_beginthreadex(NULL,0,WorkerProc,this,0,&m_pThreadInfo[wi].dwThreadID);

break;
}
}
}

void	CThreadPool::DelThread(void)
{
for(WORD wi=m_wMaxThreadNum;wi>m_wStaticThreadNum;--wi)
{
if(m_pThreadInfo[wi-1].dwThreadID)
{
m_pThreadInfo[wi-1].bIsQuit	=	true;
::Sleep(m_dwMSeconds);
break;
}
}
}

int	CThreadPool::GetThreadbyID(DWORD dwID)
{
for(WORD wi=0;wi<m_wMaxThreadNum;wi++)
{
if(m_pThreadInfo[wi].dwThreadID	==	dwID)
{
return wi;
}
}

return -1;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: