c++封装线程池
2015-08-05 21:15
465 查看
线程池
ThreadPool声明
class Thread;
class ThreadPool final
{
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
explicit ThreadPool(size_t threadNums);
~ThreadPool();
void start();
void stop();
typedef std::function<void()> TaskFunc;
void submitTask(const TaskFunc& task);
private:
void _runThread();
TaskFunc _takeTask();
size_t _threadNums;
bool _running;
std::vector<std::shared_ptr<Thread>> _threadPool;
std::queue<TaskFunc> _tasksQueue;
Mutex _mutex;
Condition _cond;
};
说明几点:
(1)Thread使用前向声明,减少头文件的依赖;
(2)当任务队列中任务为空时,线程池中的线程要等待任务产生,此时线程处于睡眠状态,等待条件,应该使用条件变量;当计算任务被提交到任务队列中,要使用条件变量发送信号,唤醒等待的线程;这里使用此前博客讲述的Condition和Mutex来实现;
(3)线程向任务队列中提交任务和获取任务,需要使用互斥量保护任务队列本身,这里任务队列使用stl中queue实现;对于线程池的实现,使用vector<std::shared_ptr<Thread>>来实现;
ThreadPool实现
ThreadPool::ThreadPool(size_t threadNums):
_threadNums(threadNums),
_running(false),
_mutex(),
_cond(_mutex)
{
assert(!_running);
assert(_threadNums > 0);
_threadPool.reserve(_threadNums);
}
ThreadPool::~ThreadPool()
{
if (_running)
stop();
}
void ThreadPool::start()
{
assert(!_running);
assert(_threadNums > 0);
_running = true;
for (size_t i = 0; i < _threadNums; ++i)
{
shared_ptr<Thread> thread = make_shared<Thread>(std::bind(&ThreadPool::_runThread, this));
_threadPool.push_back(thread);
thread->start();
}
}
void ThreadPool::stop()
{
assert(_running);
//important, sure threads exit, othrewise thead->join will wait long time, becasuse the thread will sleep long Time;
_running = false;
_cond.wakeAll(); //impotant
for (auto& thread : _threadPool) //等待线程结束
{
thread->join();
}
}
void ThreadPool::submitTask(const TaskFunc& task)
{
{
MutexLockGuard lock(_mutex);
_tasksQueue.push(task);
}
_cond.wake();
}
ThreadPool::TaskFunc ThreadPool::_takeTask()
{
{
MutexLockGuard lock(_mutex);
while ( _running && _tasksQueue.empty())
{
LOG_INFO << "thread tid [" << CurrentThread::tid() << "] wait";
_cond.wait();
}
}
MutexLockGuard lock(_mutex);
TaskFunc task;
if (!_tasksQueue.empty())
{
task = _tasksQueue.front();
_tasksQueue.pop();
}
return task;
}
void ThreadPool::_runThread()
{
assert(_running);
while (_running)
{
try
{
TaskFunc task = _takeTask();
if (task)
task();
}
catch (...)
{
LOG_SYSERR << "Exception happen in ThreadPool";
}
}
}
说明几点:
(1)存在ThreadPool对象已经析构,但是线程池中线程未终止,因此在Thread析构函数中,首先要对当前状态_running进行判断,若仍为True,是要执行ThreadPool的stop函数的;
(2)在stop函数中,首先将_running置为false,然后通知所有线程唤醒,此时所有的线程执行完当前的任务后,都会退出_runThread()函数;在stop最后一部分,要join等待线程池中的各个线程,若不等待ThreadPool已经析构后,std::vector<std::shared_ptr<Thread>> _threadPool也将开始析构,造成Thread析构,这样此后线程执行任何与Thread相关的操作都将会未定义;因此需要线程退出后,Thread才开始析构,这样Thread的生命周期要长于线程的生命周期;
(3)在_takeTask(),线程等待的条件是while ( _running && _tasksQueue.empty()),当_running为false时,说明此时线程池将要停止,因此线程要退出_runThread()函数,join才会返回,取到的task有可能为空的,在_runThread()中在执行task之前,还要判断if (task)该任务是否为空,不为空才会执行任务;
ThreadPool声明
class Thread;
class ThreadPool final
{
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
explicit ThreadPool(size_t threadNums);
~ThreadPool();
void start();
void stop();
typedef std::function<void()> TaskFunc;
void submitTask(const TaskFunc& task);
private:
void _runThread();
TaskFunc _takeTask();
size_t _threadNums;
bool _running;
std::vector<std::shared_ptr<Thread>> _threadPool;
std::queue<TaskFunc> _tasksQueue;
Mutex _mutex;
Condition _cond;
};
说明几点:
(1)Thread使用前向声明,减少头文件的依赖;
(2)当任务队列中任务为空时,线程池中的线程要等待任务产生,此时线程处于睡眠状态,等待条件,应该使用条件变量;当计算任务被提交到任务队列中,要使用条件变量发送信号,唤醒等待的线程;这里使用此前博客讲述的Condition和Mutex来实现;
(3)线程向任务队列中提交任务和获取任务,需要使用互斥量保护任务队列本身,这里任务队列使用stl中queue实现;对于线程池的实现,使用vector<std::shared_ptr<Thread>>来实现;
ThreadPool实现
ThreadPool::ThreadPool(size_t threadNums):
_threadNums(threadNums),
_running(false),
_mutex(),
_cond(_mutex)
{
assert(!_running);
assert(_threadNums > 0);
_threadPool.reserve(_threadNums);
}
ThreadPool::~ThreadPool()
{
if (_running)
stop();
}
void ThreadPool::start()
{
assert(!_running);
assert(_threadNums > 0);
_running = true;
for (size_t i = 0; i < _threadNums; ++i)
{
shared_ptr<Thread> thread = make_shared<Thread>(std::bind(&ThreadPool::_runThread, this));
_threadPool.push_back(thread);
thread->start();
}
}
void ThreadPool::stop()
{
assert(_running);
//important, sure threads exit, othrewise thead->join will wait long time, becasuse the thread will sleep long Time;
_running = false;
_cond.wakeAll(); //impotant
for (auto& thread : _threadPool) //等待线程结束
{
thread->join();
}
}
void ThreadPool::submitTask(const TaskFunc& task)
{
{
MutexLockGuard lock(_mutex);
_tasksQueue.push(task);
}
_cond.wake();
}
ThreadPool::TaskFunc ThreadPool::_takeTask()
{
{
MutexLockGuard lock(_mutex);
while ( _running && _tasksQueue.empty())
{
LOG_INFO << "thread tid [" << CurrentThread::tid() << "] wait";
_cond.wait();
}
}
MutexLockGuard lock(_mutex);
TaskFunc task;
if (!_tasksQueue.empty())
{
task = _tasksQueue.front();
_tasksQueue.pop();
}
return task;
}
void ThreadPool::_runThread()
{
assert(_running);
while (_running)
{
try
{
TaskFunc task = _takeTask();
if (task)
task();
}
catch (...)
{
LOG_SYSERR << "Exception happen in ThreadPool";
}
}
}
说明几点:
(1)存在ThreadPool对象已经析构,但是线程池中线程未终止,因此在Thread析构函数中,首先要对当前状态_running进行判断,若仍为True,是要执行ThreadPool的stop函数的;
(2)在stop函数中,首先将_running置为false,然后通知所有线程唤醒,此时所有的线程执行完当前的任务后,都会退出_runThread()函数;在stop最后一部分,要join等待线程池中的各个线程,若不等待ThreadPool已经析构后,std::vector<std::shared_ptr<Thread>> _threadPool也将开始析构,造成Thread析构,这样此后线程执行任何与Thread相关的操作都将会未定义;因此需要线程退出后,Thread才开始析构,这样Thread的生命周期要长于线程的生命周期;
(3)在_takeTask(),线程等待的条件是while ( _running && _tasksQueue.empty()),当_running为false时,说明此时线程池将要停止,因此线程要退出_runThread()函数,join才会返回,取到的task有可能为空的,在_runThread()中在执行task之前,还要判断if (task)该任务是否为空,不为空才会执行任务;
相关文章推荐
- Linux socket 初步
- 使用C++实现JNI接口需要注意的事项
- 10 篇对初学者和专家都有用的 Linux 命令教程
- Linux 与 Windows 对UNICODE 的处理方式
- Ubuntu12.04下QQ完美走起啊!走起啊!有木有啊!
- 解決Linux下Android开发真机调试设备不被识别问题
- 运维入门
- 运维提升
- Linux 自检和 SystemTap
- Ubuntu Linux使用体验
- c语言实现hashmap(转载)
- Linux 信号signal处理机制
- linux下mysql添加用户
- 关于指针的一些事情
- Scientific Linux 5.5 图形安装教程
- 基于 Linux 集群环境上 GPFS 的问题诊断
- 谁是桌面王者?Win PK Linux三大镇山之宝
- vivi下重新调整分区
- Linux VS Unix:Linux欲一统天下 Unix不死