您的位置:首页 > 运维架构 > Linux

Linux下的一个简单的线程池

2011-08-03 12:42 561 查看
在做linux下一个项目,应用到的功能需要经常的切换线程,如果进行动态的创建关闭的话会造成在当前进程中达到系统允许的最大线程的数量限制,所以就编了一个用于线程管理的线程池来处理该问题,这个里面设计的线程是完全抽象的跟执行方法毫无关系.首先是任务类:
/*
* CJob.h
*
*  Created on: 2011-7-19
*      Author: zhx
*/

#ifndef CJOB_H_
#define CJOB_H_

#include <signal.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>
#include <ctype.h>

typedef int (*RZ_PTHREAD_RUN)(void *lpParameter, int nflag);  //lpParameter,设备上下文;nflag,回传的标志

class CJob {
friend class CThread;
public:
CJob();
virtual ~CJob();

void SetRunFuction(RZ_PTHREAD_RUN pRunFuction, void *lpParameter, int nflag=1);	//外部类中调用

protected:
RZ_PTHREAD_RUN m_lpPThreadRun;
void 	*m_lpContext;
int 	m_nRunFlage;
int Job_Run() const;		//执行设置的回掉方法,线程类中调用
};

#endif /* CJOB_H_ */

具体实现:

/*
* CJob.cpp
*
*  Created on: 2011-7-19
*      Author: zhx
*/

#include "CJob.h"

CJob::CJob() {
m_lpPThreadRun = NULL;
m_lpContext = NULL;
m_nRunFlage = 0;
}

CJob::~CJob() {
}

void CJob::SetRunFuction(RZ_PTHREAD_RUN pRunFuction, void *lpParameter, int nflag)
{
m_lpPThreadRun = pRunFuction;
m_lpContext = lpParameter;
m_nRunFlage = nflag;
}

int CJob::Job_Run() const
{
if(m_lpPThreadRun && m_lpContext){
return m_lpPThreadRun(m_lpContext, m_nRunFlage);
}
return -1;
}


线程类:

/*
* CThread.h
*
*  Created on: 2011-7-19
*      Author: zhx
*/

#ifndef CTHREAD_H_
#define CTHREAD_H_
#include "CJob.h"
enum RZ_THREAD_EVENT{
RZ_THREAD_BUSY = 0,  	//线程忙
RZ_THREAD_FREE = 1		//线程空闲
};

typedef int (*RZ_PTHREAD_EVENT)(void *lpParameter, unsigned long int threadId, int nflag);  //线程事件的回掉

class CThread {
friend class CThreadManager;
public:
CThread();
virtual ~CThread();
int SetJob(const CJob *pJob);
void SetEventFuction(RZ_PTHREAD_EVENT pEventFuction, void *lpParameter);	//设置事件回掉
pthread_t GetThreadId()const{ return m_hThread; };

protected:
pthread_t  						m_hThread;
const CJob						*m_pJob;
bool							m_bExit;

pthread_mutex_t					m_lock_of_Thread;		//锁定当前对象的线程
pthread_cond_t					m_cond_Thread;			//无任务时的等待条件

RZ_PTHREAD_EVENT m_lpPThreadEvent;
void 	*m_lpContext;
protected:
static void* Thread_Run(void* lpParameter);
int Proc_Run();
int ThreadExit();		//线程退出
};

#endif /* CTHREAD_H_ */


线程类实现:

/*
* CThread.cpp
*
*  Created on: 2011-7-19
*      Author: zhx
*/

#include "CThread.h"

CThread::CThread() {
m_hThread = 0;
m_pJob = 0;
m_bExit = false;
m_lpContext = 0;
m_lpPThreadEvent = 0;

pthread_mutex_init(&m_lock_of_Thread, NULL);

pthread_cond_init(&m_cond_Thread, NULL);
int ret = 0;
ret = pthread_create(&m_hThread, NULL, Thread_Run, (void*) this);
pthread_mutex_lock(&m_lock_of_Thread);
int nError = 0;
nError = pthread_cond_wait(&m_cond_Thread, &m_lock_of_Thread);
pthread_mutex_unlock(&m_lock_of_Thread);
}

CThread::~CThread() {
ThreadExit();
pthread_mutex_destroy(&m_lock_of_Thread);
pthread_cond_destroy(&m_cond_Thread);
}

int CThread::Proc_Run()
{
while(!m_bExit){
pthread_cond_signal(&m_cond_Thread);
pthread_mutex_lock(&m_lock_of_Thread);
int nError = 0;
nError = pthread_cond_wait(&m_cond_Thread, &m_lock_of_Thread);
if(m_pJob){
m_pJob->Job_Run();
}
m_pJob = NULL;
if(m_lpPThreadEvent){
if(0 != m_lpPThreadEvent(m_lpContext, m_hThread, RZ_THREAD_FREE)){	//标明当前线程需要退出
pthread_mutex_unlock(&m_lock_of_Thread);
break;
}
}
pthread_mutex_unlock(&m_lock_of_Thread);
}
return 0;
}

void* CThread::Thread_Run(void* lpParameter)
{
CThread * lpthis = (CThread *) lpParameter;

lpthis->Proc_Run();

pthread_exit(0);
}

int CThread::SetJob(const CJob *pJob)
{
struct timeval tv1;
gettimeofday(&tv1, NULL);
timespec timeOut; //超时时间
timeOut.tv_sec = tv1.tv_sec + 1;//+5;
timeOut.tv_nsec = (tv1.tv_usec) * 1000;
int nError = pthread_mutex_timedlock(&m_lock_of_Thread, &timeOut);
if (ETIMEDOUT == nError) {
return nError;
}
m_pJob = pJob;
pthread_mutex_unlock(&m_lock_of_Thread);
pthread_cond_signal(&m_cond_Thread); //唤醒处理线程
return nError;
}

int CThread::ThreadExit()
{
int ret  = 0;
m_bExit = true;
pthread_cond_signal(&m_cond_Thread); //唤醒处理线程
struct timeval tv1;
gettimeofday(&tv1, NULL);
timespec timeOut; //超时时间
timeOut.tv_sec = tv1.tv_sec + 1;//+5;
timeOut.tv_nsec = (tv1.tv_usec) * 1000;
int nError = pthread_mutex_timedlock(&m_lock_of_Thread, &timeOut);
m_pJob = 0;
if (ETIMEDOUT == nError) {
return nError;
}
pthread_mutex_unlock(&m_lock_of_Thread);
return ret;
}

void CThread::SetEventFuction(RZ_PTHREAD_EVENT pEventFuction, void *lpParameter)
{
pthread_mutex_lock(&m_lock_of_Thread);
m_lpPThreadEvent = pEventFuction;
m_lpContext = lpParameter;
pthread_mutex_unlock(&m_lock_of_Thread);
}


线程管理类(外部调用):

/*
* CThreadManager.h
*
*  Created on: 2011-7-21
*      Author: zhx
*      外部调用不必关心内部的线程调度,在线程未达到所允许的最大线程数目前不会出现
*      分配不到工作线程的问题,一旦没有空闲的线程就会创建新的线程
*/

#ifndef CTHREADMANAGER_H_
#define CTHREADMANAGER_H_
#include "CThread.h"

#include <vector>
#include <map>
using namespace std;

#define GETTHREADMANAGER() (CThreadManager::GetInstance())
#define THREAD_MAX 20

class CThreadManager {
private:
static CThreadManager * m_sThreadManager;

CThreadManager();
virtual ~CThreadManager();

int AllThreadExit();

private:
static	int								m_nThreadSum;			//线程总数
int										m_nAllowMax;			//所允许的最大的线程总数
map<pthread_t, CThread *>				m_mapThread_Free;		//空闲线程列表
map<pthread_t, CThread *>				m_mapThread_Busy;		//忙碌线程列表

pthread_mutex_t							m_lock_of_Map;			//锁定

volatile int							m_exitTest;				//回调中用于查看是否要退出的标示

static int _Thread_Event(void *lpParameter, unsigned long int threadId, int nflag);
int Thread_Event(unsigned long int threadId, int nflag);
public:
static CThreadManager* GetInstance();
static void UnInstance();

int SetJob(const CJob *pJob);
};

#endif /* CTHREADMANAGER_H_ */


线程管理类实现:

/*
* CThreadManager.cpp
*
*  Created on: 2011-7-21
*      Author: zhx
*/

#include "CThreadManager.h"

int CThreadManager::m_nThreadSum = 0;
CThreadManager * CThreadManager::m_sThreadManager = NULL;

CThreadManager::CThreadManager() {
m_nAllowMax = THREAD_MAX;
m_exitTest = false;
pthread_mutex_init(&m_lock_of_Map, NULL);
}

CThreadManager::~CThreadManager() {
AllThreadExit();
pthread_mutex_destroy(&m_lock_of_Map);
}

CThreadManager* CThreadManager::GetInstance()
{
if(NULL == m_sThreadManager){
m_sThreadManager = new CThreadManager();
}
return m_sThreadManager;
}

void CThreadManager::UnInstance()
{
if(NULL != m_sThreadManager){
delete m_sThreadManager;
m_sThreadManager = NULL;
}
}

int CThreadManager::AllThreadExit()
{
pthread_mutex_lock(&m_lock_of_Map);
m_exitTest = true;
map<pthread_t, CThread *>::iterator ite;
CThread * pThread = NULL;
for(ite = m_mapThread_Free.begin(); ite != m_mapThread_Free.end(); ite++){
pThread = ite->second;
pThread->SetEventFuction(NULL, NULL);
if(pThread)
delete pThread;
m_nThreadSum--;
pThread = NULL;
}
m_mapThread_Free.clear();
for(ite = m_mapThread_Busy.begin(); ite != m_mapThread_Busy.end(); ite++){
pThread = ite->second;
pThread->SetEventFuction(NULL, NULL);
if(pThread)
delete pThread;
m_nThreadSum--;
pThread = NULL;
}
m_mapThread_Busy.clear();
pthread_mutex_unlock(&m_lock_of_Map);
return 0;
}

int CThreadManager::_Thread_Event(void *lpParameter, unsigned long int threadId, int nflag)
{
CThreadManager* lpThis = (CThreadManager*)lpParameter;
return lpThis->Thread_Event(threadId, nflag);
}

int CThreadManager::Thread_Event(unsigned long int threadId, int nflag)
{
int ret = 0;
if(m_exitTest){		//首先测试当前是否正在全部的清理线程,否则会造成在杀线程时造成回调的死锁
return -1;
}
map<pthread_t, CThread *>::iterator ite;
pthread_t threadId_Temp = (pthread_t)threadId;
CThread * pThread = NULL;
pthread_mutex_lock(&m_lock_of_Map);
if(RZ_THREAD_FREE == nflag){
ite = m_mapThread_Busy.find(threadId_Temp);
if(ite != m_mapThread_Busy.end()){
pThread = ite->second;
m_mapThread_Busy.erase(ite);
m_mapThread_Free.insert(map<pthread_t, CThread *>::value_type(threadId_Temp, pThread));
}
else{
ret = -1;
}
}
pthread_mutex_unlock(&m_lock_of_Map);
return ret;
}

int CThreadManager::SetJob(const CJob *pJob)
{
static char str[256];
int ret = 0;
map<pthread_t, CThread *>::iterator ite;
pthread_mutex_lock(&m_lock_of_Map);
if(m_mapThread_Free.size() > 0){
ite = m_mapThread_Free.begin();
m_mapThread_Free.erase(ite->first);
m_mapThread_Busy.insert(map<pthread_t, CThread *>::value_type(ite->first, ite->second));
ret = ite->second->SetJob(pJob);
}
else{
if(m_nThreadSum == m_nAllowMax){
ret = -10;
}
else{
m_nThreadSum++;
CThread* pThread = new CThread();
sprintf(str,"%s:: %s thread Sum = %d",   __FUNCTION__, "created new thread", m_nThreadSum);

m_mapThread_Busy.insert(map<pthread_t, CThread *>::value_type(pThread->m_hThread, pThread));
ret = pThread->SetJob(pJob);
pThread->SetEventFuction(_Thread_Event, (void*)this);
}
}
pthread_mutex_unlock(&m_lock_of_Map);
return ret;
}

具体使用:

/*
* work_class.h
*
*  Created on: 2011-7-19
*      Author: root
*/

#ifndef WORK_CLASS_H_
#define WORK_CLASS_H_
#include "CJob.h"

class work_class {
protected:
CJob m_Job;

int runFuction(int nflag);
static int Static_Run_Fuction(void* lpParameter, int nflag);

public:
work_class(int nflag);
virtual ~work_class();
const CJob* GetJob()const {return &m_Job;};
};

#endif /* WORK_CLASS_H_ */


/*
* work_class.cpp
*
*  Created on: 2011-7-19
*      Author: root
*/

#include "work_class.h"

#include <iostream>
using namespace std;

work_class::work_class(int nflag) {
m_Job.SetRunFuction(Static_Run_Fuction, (void*)this, nflag);
}

work_class::~work_class() {
}

int work_class::runFuction(int nflag)
{
int i = 0;
if(nflag == 1){
while(i++<1000)
cout << i <<" " <<endl; // prints !!!Hello World!!!
}
else{
i = 3000;
while(i++<4000)
cout << i << " "<<endl; // prints !!!Hello World!!!
}

return 0;
}

int work_class::Static_Run_Fuction(void* lpParameter, int nflag)
{
work_class * lpthis = (work_class *) lpParameter;
return lpthis->runFuction(nflag);
}


main文件:

#include <iostream>
using namespace std;
#include "CThreadManager.h"
#include "work_class.h"

int main() {
//cout << "!!!Hello World!!!" << endl; // prints !!!Hello World!!!
work_class  *pWorkClass = new work_class(0);
work_class  *pWorkClass1 = new work_class(1);
CThreadManager * pThreadManager = GETTHREADMANAGER();
pThreadManager->SetJob(pWorkClass1->GetJob());

pThreadManager->SetJob(pWorkClass->GetJob());

pThreadManager->SetJob(pWorkClass1->GetJob());
pThreadManager->SetJob(pWorkClass->GetJob());

CThreadManager::UnInstance();
delete pWorkClass;
delete pWorkClass1;
return 0;
}


源码下载:点击打开链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: