您的位置:首页 > 编程语言 > C语言/C++

c++封装线程池

2015-08-05 21:15 465 查看
线程池

ThreadPool声明

class Thread;

class ThreadPool final
{
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

explicit ThreadPool(size_t threadNums);

~ThreadPool();

void start();
void stop();

typedef std::function<void()> TaskFunc;
void submitTask(const TaskFunc& task);

private:
void _runThread();

TaskFunc _takeTask();

size_t _threadNums;
bool _running;

std::vector<std::shared_ptr<Thread>> _threadPool;
std::queue<TaskFunc> _tasksQueue;

Mutex _mutex;
Condition _cond;
};
说明几点:

(1)Thread使用前向声明,减少头文件的依赖;

(2)当任务队列中任务为空时,线程池中的线程要等待任务产生,此时线程处于睡眠状态,等待条件,应该使用条件变量;当计算任务被提交到任务队列中,要使用条件变量发送信号,唤醒等待的线程;这里使用此前博客讲述的Condition和Mutex来实现;

(3)线程向任务队列中提交任务和获取任务,需要使用互斥量保护任务队列本身,这里任务队列使用stl中queue实现;对于线程池的实现,使用vector<std::shared_ptr<Thread>>来实现;
ThreadPool实现
ThreadPool::ThreadPool(size_t threadNums):
_threadNums(threadNums),
_running(false),
_mutex(),
_cond(_mutex)
{
assert(!_running);
assert(_threadNums > 0);
_threadPool.reserve(_threadNums);
}

ThreadPool::~ThreadPool()
{
if (_running)
stop();
}

void ThreadPool::start()
{
assert(!_running);
assert(_threadNums > 0);

_running = true;

for (size_t i = 0; i < _threadNums; ++i)
{
shared_ptr<Thread> thread = make_shared<Thread>(std::bind(&ThreadPool::_runThread, this));
_threadPool.push_back(thread);
thread->start();
}
}

void ThreadPool::stop()
{
assert(_running);
//important, sure threads exit, othrewise thead->join will wait long time, becasuse the thread will sleep long Time;
_running = false;

_cond.wakeAll(); //impotant

for (auto& thread : _threadPool) //等待线程结束
{
thread->join();
}
}

void ThreadPool::submitTask(const TaskFunc& task)
{
{
MutexLockGuard lock(_mutex);
_tasksQueue.push(task);
}

_cond.wake();
}

ThreadPool::TaskFunc ThreadPool::_takeTask()
{
{
MutexLockGuard lock(_mutex);
while ( _running && _tasksQueue.empty())
{
LOG_INFO << "thread tid [" << CurrentThread::tid() << "] wait";
_cond.wait();
}
}

MutexLockGuard lock(_mutex);
TaskFunc task;
if (!_tasksQueue.empty())
{
task = _tasksQueue.front();
_tasksQueue.pop();
}

return task;
}

void ThreadPool::_runThread()
{
assert(_running);
while (_running)
{
try
{
TaskFunc task = _takeTask();

if (task)
task();
}
catch (...)
{
LOG_SYSERR << "Exception happen in ThreadPool";
}
}
}

说明几点:

(1)存在ThreadPool对象已经析构,但是线程池中线程未终止,因此在Thread析构函数中,首先要对当前状态_running进行判断,若仍为True,是要执行ThreadPool的stop函数的;

(2)在stop函数中,首先将_running置为false,然后通知所有线程唤醒,此时所有的线程执行完当前的任务后,都会退出_runThread()函数;在stop最后一部分,要join等待线程池中的各个线程,若不等待ThreadPool已经析构后,std::vector<std::shared_ptr<Thread>> _threadPool也将开始析构,造成Thread析构,这样此后线程执行任何与Thread相关的操作都将会未定义;因此需要线程退出后,Thread才开始析构,这样Thread的生命周期要长于线程的生命周期;

(3)在_takeTask(),线程等待的条件是while ( _running && _tasksQueue.empty()),当_running为false时,说明此时线程池将要停止,因此线程要退出_runThread()函数,join才会返回,取到的task有可能为空的,在_runThread()中在执行task之前,还要判断if (task)该任务是否为空,不为空才会执行任务;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  c++ linux 编程 线程池