您的位置:首页 > 编程语言 > C语言/C++

C++线程池

2016-04-23 20:57 519 查看
一般用于服务器,防止建立过多的线程导致系统响应慢等。线程池主要由以下几个部分组成:

任务接口:用于具体任务重载,使线程池中的线程获取其任务;

工作线程:用于创造一个线程,并用线程从任务列表中获取任务并执行;

线程管理器:用于管理线程类创建的线程。即线程池管理器,此类内包括了工作线程类,任务列表,线程列表(即线程池)。利用线程管理器,实现开启工作线程,管理线程池(负责对线程池里面的线程进行添加或终止),添加或删除任务列表里面的任务。

代码如下:

任务接口和工作线程的父类,用于被继承

Thread.h

//任务借口
#ifndef _THREAD_H_
#define _THREAD_H_
#include<string>
#include<windows.h>
#include<process.h>
class Runnable
{
public:
virtual ~Runnable(){};
virtual void Run() = 0;
};

class CThread : public Runnable
{
private:
explicit CThread(const CThread& rhs);//阻止调用者复制CThread对象,提供独一无二的对象
public:
CThread();
CThread(Runnable *pRunnable);
CThread(const char *ThreadName, Runnable *pRunnable = NULL);
CThread(std::string ThreadName, Runnable * pRunnable = NULL);
~CThread(void);
/**
开始运行线程
bSuspend 开始运行时是否挂起
**/
bool Start(bool bSuspend = false);
/**
运行的线程函数,可以使用派生类重写此函数
**/
virtual void Run();

/**
当前执行此函数线程等待线程结束
timeout 等待超时时间,如果为负数,等待无限时长
**/
void Join(int timeout = -1);
/**
恢复挂起的线程
**/
void Resume();
/**
挂起线程
**/
void Suspend();
/**
终止线程的执行
**/
bool Terminate(unsigned long ExitCode);

unsigned int GetThreadID();
std::string GetThreadName();
void SetThreadName(std::string ThreadName);
void SetThreadName(const char * ThreadName);
private:
static unsigned int WINAPI StaticThreadFunc(void * arg);

private:
HANDLE m_handle;
Runnable * const m_pRunnable;
unsigned int m_ThreadID;
std::string m_ThreadName;
bool volatile  m_bRun;//volatile,遇到这个关键字声明的变量,编译器对访问该变量的代码就不再进行优化,从而可以提供对特殊地址的稳定访问,防止编译器进行优化
//声明的变量的值的时候,系统总是重新从它所在的内存读取数据,即使它前面的指令刚刚从该处读取过数据。而且读取的数据立刻被保存。
// volatile int i指出 i是随时可能发生变化的,每次使用它的时候必须从i的地址中读取,因而编译器生成的汇编代码会重新从i的地址读取数据放在b中。
//而优化做法是,由于编译器发现两次从i读数据的代码之间的代码没有对i进行过操作,它会自动把上次读的数据放在b中。而不是重新从i里面读。
//这样以来,如果i是一个寄存器变量或者表示一个端口数据就容易出错,所以说volatile可以保证对特殊地址的稳定访问。
};

#endif


Thread.cpp

#include"Thread.h"
CThread::CThread(void):
m_pRunnable(NULL),m_bRun(false)
{

}
CThread::~CThread(void)
{
}
CThread::CThread(Runnable *pRunnable):
m_ThreadName(""),m_pRunnable(pRunnable),
m_bRun(false)
{

}
CThread::CThread(const char * ThreadName, Runnable * pRunnable) :
m_ThreadName(ThreadName),m_pRunnable(pRunnable),
m_bRun(false)
{

}
CThread::CThread(std::string ThreadName, Runnable * pRunnable) :
m_ThreadName(ThreadName),
m_pRunnable(pRunnable),
m_bRun(false)
{

}
bool CThread::Start(bool bSuspend)
{
if(m_bRun)
{
return true;
}
if(bSuspend)
{
m_handle = (HANDLE)_beginthreadex(NULL,0,StaticThreadFunc,this,CREATE_SUSPENDED,&m_ThreadID);// StaticThreadFunc作为线程入口函数
}
else
{
m_handle = (HANDLE)_beginthreadex(NULL, 0, StaticThreadFunc, this, 0, &m_ThreadID);
}
m_bRun = (NULL != m_handle);
return m_bRun;
}
void CThread::Run()
{
if(!m_bRun)
{
return;
}
if(NULL != m_pRunnable)
{
m_pRunnable->Run();
}
m_bRun = false;
}

void CThread::Join(int timeout)
{
if(NULL == m_handle || !m_bRun)
{
return;
}
if(timeout <= 0)
{
timeout = INFINITE;//无限等待
}
::WaitForSingleObject(m_handle, timeout);
}
void CThread::Resume()
{
if(NULL == m_handle || !m_bRun)
{
return;
}
::ResumeThread(m_handle);
}

void CThread::Suspend()
{
if(NULL == m_handle || !m_bRun)
{
return;
}
::SuspendThread(m_handle);
}
bool CThread::Terminate(unsigned long ExitCode)
{
if(NULL == m_handle || !m_bRun)
{
return true;
}
if(::TerminateThread(m_handle,ExitCode))
{
::CloseHandle(m_handle);
return true;
}
return false;
}

unsigned int CThread::GetThreadID()
{
return m_ThreadID;
}

std::string CThread::GetThreadName()
{
return m_ThreadName;
}

void CThread::SetThreadName(std::string ThreadName)
{
m_ThreadName = ThreadName;
}
void CThread::SetThreadName(const char * ThreadName)
{
if(NULL == ThreadName)
{
m_ThreadName = "";
}
else
{
m_ThreadName = ThreadName;
}
}
unsigned int CThread::StaticThreadFunc(void * arg)
{
CThread * pThread = (CThread *)arg;
pThread->Run();
return 0;
}


线程池管理器,包含工作线程类,继承CThread,线程池,工作列表,线程池管理器ThreadPoolExecutor.h如下:

#ifndef _THREAD_POOL_EXECUTOR_
#define _THREAD_POOL_EXECUTOR_
#include"Thread.h"
#include<set>
#include<list>
#include<Windows.h>
class CThreadPoolExecutor
{
public:
CThreadPoolExecutor(void);
~CThreadPoolExecutor(void);
/**
初始化线程池,创建minThreads个线程,能容纳的最大任务个数
**/
bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks);
/**
执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true
若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true
若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false
**/
bool Execute(Runnable * pRunnable);
/**
终止线程池,先制止塞入任务,
然后等待直到任务列表为空,
然后设置最小线程数量为0,
等待直到线程数量为空,
清空垃圾堆中的任务
**/
void Terminate();

/**
返回线程池中当前的线程数量
**/
unsigned int GetThreadPoolSize();

private:
/**
获取任务列表中的任务,若任务列表为空,返回NULL
**/
Runnable * GetTask();
static unsigned int WINAPI StaticThreadFunc(void * arg);
private:
class CWorker:public CThread  //继承线程类,所以一个此对象就是一个线程
{
public:
CWorker(CThreadPoolExecutor * pThreadPool, Runnable *pFirstTask = NULL);
~CWorker();
void Run();
private:
CThreadPoolExecutor * m_pThreadPool;
Runnable * m_pFirstTask;
volatile bool m_bRun;
};
typedef std::set<CWorker*> ThreadPool;//线程池,管理线程
typedef std::list<Runnable *> Tasks;  //管理任务列表,使用list,使删除和插入具有O(1)的时间复杂度
typedef Tasks::iterator TasksItr;
typedef ThreadPool::iterator ThreadPoolItr;
ThreadPool m_ThreadPool;
ThreadPool m_TrashThread;
Tasks m_Tasks;
CRITICAL_SECTION m_csTasksLock;//任务列表锁,CRITICAL_SECTION(在window.h中)临界区结构对象,用于多线程共享临界区资源的互斥管理结构
CRITICAL_SECTION m_csThreadPoolLock;  //线程池锁
volatile bool m_bRun;
volatile bool m_bEnableInsertTask;
volatile unsigned int m_minThreads;
volatile unsigned int m_maxThreads;
volatile unsigned int m_maxPendingTasks;
};

#endif


CThreadPoolExecutor.cpp

#include"ThreadPoolExecutor.h"
CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable *pFirstTask):
m_pThreadPool(pThreadPool),m_pFirstTask(pFirstTask),m_bRun(true)
{

}

CThreadPoolExecutor::CWorker::~CWorker()
{
}
/**
执行任务的工作线程。
当前没有任务时,
如果当前线程数量大于最小线程数量,减少线程,
否则,执行清理程序,将线程类给释放掉
**/
void CThreadPoolExecutor::CWorker::Run()
{
Runnable *pTask = NULL;
while(m_bRun)
{
if(NULL == m_pFirstTask)
{
pTask =  m_pThreadPool->GetTask();//在这里取出任务,添加任务的函数由线程池管理类Execute执行
}
else
{
pTask = m_pFirstTask;
m_pFirstTask = NULL;
}
if(NULL == pTask)
{
EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));//进入临界区
if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)//当前线程数量大于最小线程数量,减少线程
{
ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
if(itr != m_pThreadPool->m_ThreadPool.end())
{
m_pThreadPool->m_ThreadPool.erase(this);
m_pThreadPool->m_TrashThread.insert(this);//挂起线程
}
m_bRun = false;//没有任务,那么标志就为false,此线程的标志位false,也就是此线程不工作,否则,一直死循环,一直在取任务
}
else//已经为最小线程数,也就是没有线程在工作了,执行清理程序,将线程类给释放掉
{
ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
while(itr != m_pThreadPool->m_TrashThread.end())
{
(*itr)->Join();
delete(*itr);
m_pThreadPool->m_TrashThread.erase(itr);
itr = m_pThreadPool->m_TrashThread.begin();
}
}
LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));//离开临界区资源
continue;
}
else//如果有任务,则执行任务
{
pTask->Run();
pTask = NULL;
}
}
}
////////////////以上是worker类的函数,也就是线程池函数,以下是线程池管理类函数//////////////////////////

CThreadPoolExecutor::CThreadPoolExecutor(void) :
m_bRun(false),
m_bEnableInsertTask(false)
{
InitializeCriticalSection(&m_csTasksLock); //临界资源都需要初始化
InitializeCriticalSection(&m_csThreadPoolLock);
}
CThreadPoolExecutor::~CThreadPoolExecutor(void)
{
Terminate();
DeleteCriticalSection(&m_csTasksLock);
DeleteCriticalSection(&m_csThreadPoolLock);
}

//初始化,创建最小线程数的线程个数,初始化后,那些线程就在开始向任务列表中取任务了,注意,那些线程是一直处于运行状态的
bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)
{
if(minThreads == 0)//边界条件
{
return false;
}
if(maxThreads < minThreads)
{
return false;
}
m_minThreads = minThreads;
m_maxThreads = maxThreads;
m_maxPendingTasks = maxPendingTasks;
unsigned int i = m_ThreadPool.size();
for(;i < m_minThreads;++i)
{
//创建线程
CWorker *pWorker = new CWorker(this);//初始化时,传入worker类中的Runnable为空,就使得他从从任务列表中取任务
if(NULL == pWorker)
{
return false;
}
EnterCriticalSection(&m_csThreadPoolLock);//线程池锁
m_ThreadPool.insert(pWorker);
LeaveCriticalSection(&m_csThreadPoolLock);
pWorker->Start();//开始线程,此函数继承与CThread
}
m_bRun = true;
m_bEnableInsertTask = true;
return true;
}

bool CThreadPoolExecutor::Execute(Runnable *pRunnable)
{
if(!m_bEnableInsertTask)
{
return false;
}
if(NULL == pRunnable)
{
return false;
}
if(m_Tasks.size() > m_maxPendingTasks)//m_Tasks为任务列表,当前任务列表满了
{
if(m_ThreadPool.size() < m_maxThreads)//若当前线程数量小于最大线程数,将创建新线程执行此任务,返回true
{
CWorker *pWorker = new CWorker(this, pRunnable);
if(NULL == pWorker)
{
return false;
}
EnterCriticalSection(&m_csThreadPoolLock);
m_ThreadPool.insert(pWorker);
LeaveCriticalSection(&m_csThreadPoolLock);
pWorker->Start();
}
else//当前线程数量等于最大线程数,将丢弃此任务,返回false
{
return false;
}
}
else//当前任务列表没有满,将此任务插入到任务列表,返回true,来一个任务就加一个任务
{
EnterCriticalSection(&m_csTasksLock);//任务列表锁
m_Tasks.push_back(pRunnable);//添加任务,使线程类worker的Run函数获取并开始任务,Run函数只要有任务一直处于死循环运行状态,即多个线程
//同时在跑,当任务结束了,只要那些线程不停止,又开始去任务,所以执行类执行将任务添加到任务列表即可
LeaveCriticalSection(&m_csTasksLock);
}
return true;
}
Runnable* CThreadPoolExecutor::GetTask()
{
Runnable *Task = NULL;
EnterCriticalSection(&m_csTasksLock);
if(!m_Tasks.empty())
{
Task = m_Tasks.front();
m_Tasks.pop_front();//将这个任务从任务列表中除去
}
LeaveCriticalSection(&m_csTasksLock);
return Task;
}

unsigned int CThreadPoolExecutor::GetThreadPoolSize()
{
return m_ThreadPool.size();
}
void CThreadPoolExecutor::Terminate()
{
m_bEnableInsertTask = false; //所有线程都停止,只要线程Execute停止,就不会再增加新线程,其他线程不是添加到阻塞里面就是正常结束
while(m_ThreadPool.size() > 0)
{
Sleep(1);
}
EnterCriticalSection(&m_csThreadPoolLock);
ThreadPoolItr itr = m_TrashThread.begin();
while(itr != m_TrashThread.end())
{
(*itr)->Join();
delete (*itr);
m_TrashThread.erase(itr);
itr = m_TrashThread.begin();
}
LeaveCriticalSection(&m_csThreadPoolLock);
}


主函数main.cpp

#include"Thread.h"
#include"ThreadPoolExecutor.h"
class R : public Runnable
{
public:
~R()
{

}
void Run()
{
printf("H/n");
}
};
int main()
{
CThreadPoolExecutor *pExecutor = new CThreadPoolExecutor();
pExecutor->Init(1, 3, 4);
R r;
for(int i=0;i<100;i++)
{
while(!pExecutor->Execute(&r))//每执行一次,就添加一个
//任务到任务列表中,工作线程一直处于运行状态,所以只要线程没有
//满,就可以一直取任务,而不用新建线程
{
}
}
pExecutor->Terminate();
delete pExecutor;
getchar();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: