Muduo库源码分析(7):线程池
2017-08-07 21:26
218 查看
线程池的本质
生产者与消费者模型,往线程池添加任务相当于生产者,从线程池取出任务相当于消费者,线程池容量相当于有界的缓冲区,所以实现类似于有界缓冲区
生产者与消费者模型,往线程池添加任务相当于生产者,从线程池取出任务相当于消费者,线程池容量相当于有界的缓冲区,所以实现类似于有界缓冲区
class ThreadPool : noncopyable { public: typedef std::function<void ()> Task; explicit ThreadPool(const string& nameArg = string("ThreadPool")); ~ThreadPool(); void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; } void setThreadInitCallback(const Task& cb) { threadInitCallback_ = cb; } void start(int numThreads);// 启动线程 void stop();// 停止线程 const string& name() const { return name_; } size_t queueSize() const; // Could block if maxQueueSize > 0 void run(const Task& f);// 添加任务 void run(Task&& f); private: bool isFull() const; void runInThread();// 子线程运行 Task take();// 取出任务 mutable MutexLock mutex_;// 互斥锁 Condition notEmpty_;// 条件变量标记可消费的数量 Condition notFull_;// 条件变量标记可生产的数量 string name_;// 线程名字 Task threadInitCallback_;// 初始化线程回调函数 std::vector<std::unique_ptr<muduo::Thread>> threads_;// 线程数组 std::deque<Task> queue_;// 队列实现线程池 size_t maxQueueSize_;// 线程池容量 bool running_;// 标记线程是否运行 };
ThreadPool::ThreadPool(const string& nameArg) : mutex_(), notEmpty_(mutex_), notFull_(mutex_), name_(nameArg), maxQueueSize_(0), running_(false) { } ThreadPool::~ThreadPool() { if (running_) { stop(); } } void ThreadPool::start(int numThreads) { assert(threads_.empty()); running_ = true; threads_.reserve(numThreads); for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof id, "%d", i+1); threads_.push_back(std::unique_ptr<muduo::Thread>(new muduo::Thread( std::bind(&ThreadPool::runInThread, this), name_+id))); threads_[i]->start(); } if (numThreads == 0 && threadInitCallback_) { threadInitCallback_(); } } // 线程终止,将所有的线程调用join方法,避免僵尸线程 void ThreadPool::stop() { { MutexLockGuard lock(mutex_); running_ = false; notEmpty_.notifyAll(); } std::for_each(threads_.begin(), threads_.end(), std::bind(&muduo::Thread::join, std::placeholders::_1)); } size_t ThreadPool::queueSize() const { MutexLockGuard lock(mutex_); return queue_.size(); } // 往线程池添加任务 void ThreadPool::run(const Task& task) { if (threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); while (isFull())// 线程池满,等待 { notFull_.wait(); } assert(!isFull()); queue_.push_back(task); notEmpty_.notify();// 添加任务成功,唤醒阻塞的消费者线程 } } void ThreadPool::run(Task&& task) { if (threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); while (isFull()) { notFull_.wait(); } assert(!isFull()); queue_.push_back(std::move(task)); notEmpty_.notify(); } } ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty() && running_)// 线程启动并且线程池没有任务,等待 { notEmpty_.wait(); } Task task; if (!queue_.empty()) { task = queue_.front(); queue_.pop_front(); if (maxQueueSize_ > 0) { notFull_.notify(); } } return task; } bool ThreadPool::isFull() const { mutex_.assertLocked(); return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_; } // 子线程执行函数 void ThreadPool::runInThread() { try { if (threadInitCallback_) { threadInitCallback_(); } while (running_) { Task task(take());// 线程池中没有任务阻塞 if (task) { task(); } } } catch (const Exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTrace()); abort(); } catch (const std::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); } catch (...) { fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str()); throw; // rethrow } }
相关文章推荐
- Java 线程池 ThreadPoolExecutor 源码分析
- Muduo库源码分析(5):互斥锁,条件变量类
- Java并发之线程池ThreadPoolExecutor源码分析学习
- Android中的线程和线程池及其源码分析:
- 线程池(ThreadPoolExecutor)源码分析之如何保证核心线程不被销毁的
- Java 线程池 ThreadPoolExecutor 源码分析
- 【网络编程】半同步--半异步线程池源码分析之任务队列(基于C++11)
- 【网络编程】半同步--半异步线程池源码分析之线程池(基于C++11)
- UE4线程池源码分析和线程池的封装
- Java 线程池 ThreadPoolExecutor 源码分析
- JUC源码分析26-线程池-ThreadPoolExecutor
- 【图灵学院10】高并发之java线程池源码分析
- Java线程池及其底层源码实现分析
- [图解tensorflow源码] 线程池模块分析 (CPU thread pool device)
- memcached源码分析之线程池机制(一)
- Jetty源码分析之线程池:QueuedThreadPool
- memcached源码分析之线程池机制(二)
- [转载] Java线程池框架源码分析
- java ThreadPoolExecutor 线程池源码分析
- JUC源码分析27-线程池-FutureTask