您的位置:首页 > 编程语言 > C语言/C++

C++ 线程池

2015-07-09 15:11 513 查看

简单的线程池实现

下面这段代码摘自CppCMS thread_pool.cpp 。

CppCMS是一个C++ 的Web开发框架,用于可发展的网站,性能极高。推荐给大家使用。

///////////////////////////////////////////////////////////////////////////////
//
//  Copyright (C) 2008-2012  Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>
//
//  See accompanying file COPYING.TXT file for licensing details.
//
///////////////////////////////////////////////////////////////////////////////
#define CPPCMS_SOURCE
#include <cppcms/thread_pool.h>
#include <booster/backtrace.h>
#include <booster/log.h>
#include <ostream>
#include <list>
#include <vector>
#include <cppcms/config.h>
#ifdef CPPCMS_USE_EXTERNAL_BOOST
#   include <boost/bind.hpp>
#else // Internal Boost
#   include <cppcms_boost/bind.hpp>
namespace boost = cppcms_boost;
#endif

#include <booster/shared_ptr.h>
#include <booster/thread.h>

#if defined(CPPCMS_POSIX)
#include <signal.h>
#endif

namespace cppcms {
namespace impl {
class thread_pool : public booster::noncopyable {
public:

bool cancel(int id) {
booster::unique_lock<booster::mutex> lock(mutex_);
queue_type::iterator p;
for(p=queue_.begin();p!=queue_.end();++p) {
if(p->first==id) {
queue_.erase(p);
return true;
}
}
return false;
}
int post(booster::function<void()> const &job)
{
booster::unique_lock<booster::mutex> lock(mutex_);
int id=job_id_++;
queue_.push_back(std::make_pair(id,job));
cond_.notify_one();
return id;
}
thread_pool(int threads) :
shut_down_(false),
job_id_(0)
{
workers_.resize(threads);
#if defined(CPPCMS_POSIX)
sigset_t set,old;
sigfillset(&set);
pthread_sigmask(SIG_BLOCK,&set,&old);
#endif
for(int i=0;i<threads;i++) {
workers_[i].reset(new booster::thread(boost::bind(&thread_pool::worker,this)));
}

#if defined(CPPCMS_POSIX)
pthread_sigmask(SIG_SETMASK,&old,0);
#endif

}
void stop()
{
{
booster::unique_lock<booster::mutex> lock(mutex_);
shut_down_=true;
cond_.notify_all();
}

for(unsigned i=0;i<workers_.size();i++) {
booster::shared_ptr<booster::thread> thread=workers_[i];
workers_[i].reset();
if(thread)
thread->join();
}
}
~thread_pool()
{
try {
stop();
}
catch(...)
{
}
}

private:

void worker()
{
for(;;) {
booster::function<void()> job;

{
booster::unique_lock<booster::mutex> lock(mutex_);
if(shut_down_)
return;
if(!queue_.empty()) {
queue_.front().second.swap(job);
queue_.pop_front();
}
else {
cond_.wait(lock);
}
}

if(job) {
try {
job();
}
catch(std::exception const &e) {
BOOSTER_ERROR("cppcms") << "Catched exception in thread pool" << e.what() <<'\n'
<< booster::trace(e);

}
catch(...) {
BOOSTER_ERROR("cppcms") << "Catched unknown exception in thread pool";
}
}
}
}

booster::mutex mutex_;
booster::condition_variable cond_;

bool shut_down_;
int job_id_;
typedef std::list<std::pair<int,booster::function<void()> > > queue_type;
queue_type queue_;
std::vector<booster::shared_ptr<booster::thread> > workers_;

};

}

thread_pool::thread_pool(int n) :
impl_(new impl::thread_pool(n))
{
}

int thread_pool::post(booster::function<void()>  const &job)
{
return impl_->post(job);
}

void thread_pool::stop()
{
impl_->stop();
}

bool thread_pool::cancel(int id)
{
return impl_->cancel(id);
}

thread_pool::~thread_pool()
{
}

} // cppcms
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: