您的位置:首页 > 其它

Muduo库源码分析(7):线程池

2017-08-07 21:26 218 查看
线程池的本质

生产者与消费者模型,往线程池添加任务相当于生产者,从线程池取出任务相当于消费者,线程池容量相当于有界的缓冲区,所以实现类似于有界缓冲区

class ThreadPool : noncopyable
{
public:
typedef std::function<void ()> Task;

explicit ThreadPool(const string& nameArg = string("ThreadPool"));
~ThreadPool();

void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb; }

void start(int numThreads);// 启动线程
void stop();// 停止线程

const string& name() const
{ return name_; }

size_t queueSize() const;

// Could block if maxQueueSize > 0
void run(const Task& f);// 添加任务
void run(Task&& f);

private:
bool isFull() const;
void runInThread();// 子线程运行
Task take();// 取出任务

mutable MutexLock mutex_;// 互斥锁
Condition notEmpty_;// 条件变量标记可消费的数量
Condition notFull_;// 条件变量标记可生产的数量
string name_;// 线程名字
Task threadInitCallback_;// 初始化线程回调函数
std::vector<std::unique_ptr<muduo::Thread>> threads_;// 线程数组
std::deque<Task> queue_;// 队列实现线程池
size_t maxQueueSize_;// 线程池容量
bool running_;// 标记线程是否运行
};


ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
maxQueueSize_(0),
running_(false)
{
}

ThreadPool::~ThreadPool()
{
if (running_)
{
stop();
}
}

void ThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true;
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
threads_.push_back(std::unique_ptr<muduo::Thread>(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_+id)));
threads_[i]->start();
}
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
// 线程终止,将所有的线程调用join方法,避免僵尸线程
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
notEmpty_.notifyAll();
}
std::for_each(threads_.begin(),
threads_.end(),
std::bind(&muduo::Thread::join, std::placeholders::_1));
}

size_t ThreadPool::queueSize() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
// 往线程池添加任务
void ThreadPool::run(const Task& task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())// 线程池满,等待
{
notFull_.wait();
}
assert(!isFull());

queue_.push_back(task);
notEmpty_.notify();// 添加任务成功,唤醒阻塞的消费者线程
}
}

void ThreadPool::run(Task&& task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())
{
notFull_.wait();
}
assert(!isFull());

queue_.push_back(std::move(task));
notEmpty_.notify();
}
}

ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)// 线程启动并且线程池没有任务,等待
{
notEmpty_.wait();
}
Task task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task;
}

bool ThreadPool::isFull() const
{
mutex_.assertLocked();
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}

// 子线程执行函数
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)
{
Task task(take());// 线程池中没有任务阻塞
if (task)
{
task();
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Muduo