Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装
2017-06-20 19:21
417 查看
muduo的并发模型为one loop per thread+ threadpool。
为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool。所以这篇博文并没有涉及到新鲜的技术,但是也有一些封装和逻辑方面的注意点需要我们去分析和理解。
EventLoopThread
任何一个线程,只要创建并运行了EventLoop,就是一个IO线程。 EventLoopThread类就是一个封装好了的IO线程。
EventLoopThread的工作流程为:
1、在主线程创建EventLoopThread对象。
2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),并且主线程要等待IO线程创建完成EventLoop对象。
3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完成。
4、主线程返回创建的EventLoop对象。
EventLoopThread.h
测试程序:
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
}
int main()
{
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
EventLoopThread loopThread;
EventLoop* loop = loopThread.startLoop();
// 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
loop->runInLoop(runInThread);
sleep(1);
// runAfter内部也调用了runInLoop,所以这里也是异步调用
loop->runAfter(2, runInThread);
sleep(3);
loop->quit();
printf("exit main().\n");
}
对调用过程进行分析:(查看日志)
主线程调用 loop->runInLoop(runInThread); 由于主线程(不是IO线程)调用runInLoop, 故调用queueInLoop() 将runInThead 添加到队列,然后wakeup() IO线程,IO线程在doPendingFunctors() 中取loop->runAfter() 要唤醒一下,此时只是执行runAfter() 添加了一个2s的定时器, 2s超时,timerfd_ 可读,先handleRead()一下然后执行回调函数runInThread()。
那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是因为EventLoopThead 栈上对象析构,在析构函数内 loop_ ->quit(), 由于不是在IO线程调用quit(),故也需要唤醒一下,IO线程才能从poll 返回,这样再次循环判断 while (!quit_) 就能退出IO线程。
muduo的线程模型:
![](https://img-blog.csdn.net/20170620192002971?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYW5keWxhdTAwag==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
muduo的思想时eventLoop+thread pool,为了更方便使用,将EventLoopThread做了封装。main reactor可以创建sub reactor,并发一些任务分发到sub reactor中去。EventLoopThreadPool的思想比较简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,可以返回EventLoop对象来使用EventLoopThreadPool中的Thread。
EventLoopThreadPool.h
mainReactor关注监听事件,已连接套接字事件轮询给线程池中的subReactors 处理,一个新的连接对应一个subReactor
我们采用round-robin(RR,轮叫)的调度方式选择一个EventLoop,也就是getNextLoop函数。极端情况下,线程池中个数为0时,那么新的连接交给mainReactor,这样就退化成单线程的模式。
为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool。所以这篇博文并没有涉及到新鲜的技术,但是也有一些封装和逻辑方面的注意点需要我们去分析和理解。
EventLoopThread
任何一个线程,只要创建并运行了EventLoop,就是一个IO线程。 EventLoopThread类就是一个封装好了的IO线程。
EventLoopThread的工作流程为:
1、在主线程创建EventLoopThread对象。
2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),并且主线程要等待IO线程创建完成EventLoop对象。
3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完成。
4、主线程返回创建的EventLoop对象。
EventLoopThread.h
class EventLoopThread : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback()); ~EventLoopThread(); EventLoop* startLoop(); // 启动线程,该线程就成为了IO线程 private: void threadFunc(); // 线程函数 EventLoop* loop_; // loop_指针指向一个EventLoop对象 bool exiting_; Thread thread_; MutexLock mutex_; Condition cond_; ThreadInitCallback callback_; // 回调函数在EventLoop::loop事件循环之前被调用 };
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb) : loop_(NULL), exiting_(false), thread_(boost::bind(&EventLoopThread::threadFunc, this)), mutex_(), cond_(mutex_), callback_(cb) { } EventLoopThread::~EventLoopThread() { exiting_ = true; loop_->quit(); // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程 thread_.join(); //等待线程退出 } EventLoop* EventLoopThread::startLoop() { assert(!thread_.started()); thread_.start();//线程启动,调用threadFunc() { MutexLockGuard lock(mutex_); while (loop_ == NULL) { cond_.wait();//需要等待EventLoop对象的创建 } } return loop_; } void EventLoopThread::threadFunc() { EventLoop loop; if (callback_) { callback_(&loop); } { MutexLockGuard lock(mutex_); // loop_指针指向了一个栈上的对象,threadFunc函数退出之后,这个指针就失效了 // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。 // 因而不会有什么大的问题 loop_ = &loop; cond_.notify(); //创建好,发送通知 } loop.loop();// 会在这里循环,直到EventLoopThread析构。此后不再使用loop_访问EventLoop了 //assert(exiting_); }
测试程序:
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
}
int main()
{
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
EventLoopThread loopThread;
EventLoop* loop = loopThread.startLoop();
// 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
loop->runInLoop(runInThread);
sleep(1);
// runAfter内部也调用了runInLoop,所以这里也是异步调用
loop->runAfter(2, runInThread);
sleep(3);
loop->quit();
printf("exit main().\n");
}
对调用过程进行分析:(查看日志)
主线程调用 loop->runInLoop(runInThread); 由于主线程(不是IO线程)调用runInLoop, 故调用queueInLoop() 将runInThead 添加到队列,然后wakeup() IO线程,IO线程在doPendingFunctors() 中取loop->runAfter() 要唤醒一下,此时只是执行runAfter() 添加了一个2s的定时器, 2s超时,timerfd_ 可读,先handleRead()一下然后执行回调函数runInThread()。
那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是因为EventLoopThead 栈上对象析构,在析构函数内 loop_ ->quit(), 由于不是在IO线程调用quit(),故也需要唤醒一下,IO线程才能从poll 返回,这样再次循环判断 while (!quit_) 就能退出IO线程。
EventLoopThreadPool
muduo的线程模型:muduo的思想时eventLoop+thread pool,为了更方便使用,将EventLoopThread做了封装。main reactor可以创建sub reactor,并发一些任务分发到sub reactor中去。EventLoopThreadPool的思想比较简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,可以返回EventLoop对象来使用EventLoopThreadPool中的Thread。
EventLoopThreadPool.h
class EventLoopThreadPool : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; EventLoopThreadPool(EventLoop* baseLoop); ~EventLoopThreadPool(); void setThreadNum(int numThreads) { numThreads_ = numThreads; } void start(const ThreadInitCallback& cb = ThreadInitCallback()); EventLoop* getNextLoop(); private: EventLoop* baseLoop_; // 与Acceptor所属EventLoop相同 bool started_; int numThreads_; // 线程数 int next_; // 新连接到来,所选择的EventLoop对象下标 boost::ptr_vector<EventLoopThread> threads_; // IO线程列表 std::vector<EventLoop*> loops_; // EventLoop列表 };
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop) : baseLoop_(baseLoop), started_(false), numThreads_(0), next_(0) { } EventLoopThreadPool::~EventLoopThreadPool() { // Don't delete loop, it's stack variable } void EventLoopThreadPool::start(const ThreadInitCallback& cb) { assert(!started_); baseLoop_->assertInLoopThread(); started_ = true; for (int i = 0; i < numThreads_; ++i) { EventLoopThread* t = new EventLoopThread(cb); threads_.push_back(t); loops_.push_back(t->startLoop()); // 启动EventLoopThread线程,在进入事件循环之前,会调用cb } if (numThreads_ == 0 && cb) { // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb cb(baseLoop_); } } EventLoop* EventLoopThreadPool::getNextLoop() { baseLoop_->assertInLoopThread(); EventLoop* loop = baseLoop_; // 如果loops_为空,则loop指向baseLoop_ // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop if (!loops_.empty()) { // round-robin loop = loops_[next_]; ++next_; if (implicit_cast<size_t>(next_) >= loops_.size()) { next_ = 0; } } return loop; }
mainReactor关注监听事件,已连接套接字事件轮询给线程池中的subReactors 处理,一个新的连接对应一个subReactor
我们采用round-robin(RR,轮叫)的调度方式选择一个EventLoop,也就是getNextLoop函数。极端情况下,线程池中个数为0时,那么新的连接交给mainReactor,这样就退化成单线程的模式。
相关文章推荐
- Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装
- muduo::EventLoopThread、EventLoopThreadPool分析
- muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类
- Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
- python threadpool 源码分析以及自己封装的简易版线程池
- muduo源码分析---EventLoopThread
- muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool
- WPA_SUPPLICANT源码分析(1):EVENT LOOP的实现
- muduo源码分析:线程类Thread封装
- muduo源码解析之EventLoopThread
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- 【Netty4.X】Netty源码分析之NioEventLoop(六)
- muduo 30 支持多线程 EventLoopThreadPool
- 【Netty4.X】Netty源码分析之NioEventLoopGroup(五)
- Netty线程模型源码分析之NioEventLoopGroup的初始化
- jetty源码分析:QueuedThreadPool
- muduo库阅读(36)——Net部分:事件循环线程池EventLoopThreadPool
- 【QEMU-KVM代码分析之三】IO thread源码浅析之main loop
- Muduo网络库源码分析(三)线程间使用eventfd通信和EventLoop::runInLoop系列函数