从epoll构建muduo-9 加入onWriteComplate回调和Buffer
2015-12-05 11:18
676 查看
mini-muduo版本传送门
version 0.00 从epoll构建muduo-1 mini-muduo介绍
version 0.01 从epoll构建muduo-2 最简单的epoll
version 0.02 从epoll构建muduo-3 加入第一个类,顺便介绍reactor
version 0.03 从epoll构建muduo-4 加入Channel
version 0.04 从epoll构建muduo-5 加入Acceptor和TcpConnection
version 0.05 从epoll构建muduo-6 加入EventLoop和Epoll
version 0.06 从epoll构建muduo-7 加入IMuduoUser
version 0.07 从epoll构建muduo-8 加入发送缓冲区和接收缓冲区
version 0.08 从epoll构建muduo-9 加入onWriteComplate回调和Buffer
version 0.09 从epoll构建muduo-10 Timer定时器
version 0.11 从epoll构建muduo-11 单线程Reactor网络模型成型
version 0.12 从epoll构建muduo-12 多线程代码入场
version 0.13 从epoll构建muduo-13 Reactor + ThreadPool 成型
mini-muduo v0.08版本,这个版本完善了缓冲区的实现。mini-muduo完整可运行的示例可从github下载,使用命令git checkout v0.08可切换到此版本,在线浏览此版本到这里
本版本有两处重要修改,首先实现了IMuduoUser的onWriteComplate回调,这样当用户一次传送大量数据到网络库时,用户会在数据发送完成后收到通知。当然了发送小量数据完成时也会收到通知。其次本版本实现了专门用于表示缓冲区的Buffer类,只不过这个Buffer类的细节实现非常简单。
下面的条目1 ~ 条目5讲解onWireteComplate的实现细节,条目6讲解Buffer。
1 先来看看IMuduoUser的onWriteComplate回调,首先要明确的是onWriteComplate回调什么时候会被调用,那就是用户递交数据给网络库,当网络库把这些信息全部传送给操作系统后(通过::write()调用),用户对象的onWriteComplate会被回调到。muduo使用的是level triger的Epoll,所以应该在下面两个位置调用onWriteComplate。注意这两处不是直接通过_pUser->onWriteComplate(this);的方式调用的,而是使用了EventLoop::queueLoop方法来异步调用。下条说明会对queueLoop方法做详细介绍。在原始muduo里关于这个回调的讲解可参见<<LInux多线程服务端编程>>P322页。
位置1:第一次调用size_t n = ::write(...len...) 后,n和len相等的时候,这表明一次系统调用已经将数据全部发送完了,需要通知用户了。调用点位于TcpConnection.cc 91行
[cpp] view
plaincopyprint?
82 void TcpConnection::send(const string& message)
83 {
84 int n = 0;
85 if(_outBuf.readableBytes() == 0)
86 {
87 n = ::write(_sockfd, message.c_str(), message.size());
88 if(n < 0)
89 cout << "write error" << endl;
90 if(n == static_cast<int>(message.size()))
91 _pLoop->queueLoop(this); //invoke onWriteComplate
92 }
93
94 if( n < static_cast<int>(message.size()))
95 {
96 _outBuf.append(message.substr(n, message.size()));
97 if(_pChannel->isWriting())
98 {
99 _pChannel->enableWriting(); //add EPOLLOUT
100 }
101 }
102 }
位置2: 当某次调用size_t n = ::write(...len...) 后,n < len,表明操作系统无法全部接收本次递交给他的数据。当操作系统的发送缓冲区有更多可用空间时,通过让epoll_wait返回来通知我们发生了EPOLLOUT事件,这时网络库会把自己保存缓冲区数据继续送给操作系统,如果全部数据操作系统都接收完毕,这时也需要通知用户了。调用点位于TcpConnection.cc 76行。
[cpp] view
plaincopyprint?
63 void TcpConnection::handleWrite()
64 {
65 int sockfd = _pChannel->getSockfd();
66 if(_pChannel->isWriting())
67 {
68 int n = ::write(sockfd, _outBuf.peek(), _outBuf.readableBytes());
69 if( n > 0)
70 {
71 cout << "write " << n << " bytes data again" << endl;
72 _outBuf.retrieve(n);
73 if(_outBuf.readableBytes() == 0)
74 {
75 _pChannel->disableWriting(); //remove EPOLLOUT
76 _pLoop->queueLoop(this); //invoke onWriteComplate
77 }
78 }
79 }
80 }
在这两处设置好回调后,即可保证用户得到准确的通知。
2 EventLoop::queueLoop()方法,本版本新添加的方法,这是一个非常非常重要的方法。在没有这个方法之前,我们使用epoll_wait()接收的所有事件,都是来自操作系统的,比如EPOLLIN/EPOLLOUT,我们使用epollfd只是用来接收操作系统的网络通知。现在我们需要让epollfd多做点事情,让他能接收网络库自己发送的通知。这种通知有两个重要的价值
价值1: 使得网络库内部在单线程的情况下具备异步处理事件的能力。
价值2: 使得网络库的IO线程(跑epoll_wait的那个线程),可以接收来自非本线程的发送请求。
这种通知正是通过eventfd机制实现的,eventfd由Linux 2.6.22新引入,可以像socket一样被epollfd所监听,如果向eventfd写点东西,epoll就会获得这个通知并返回。EventLoop正是通过封装eventfd才获得了异步处理能力。
EventLoop::wakeup()里调用了::wirte()
[cpp] view
plaincopyprint?
53 void EventLoop::wakeup()
54 {
55 uint64_t one = 1;
56 ssize_t n = ::write(_eventfd, &one, sizeof one);
57 if (n != sizeof one)
58 {
59 cout << "EventLoop::wakeup() writes " << n << " bytes instead of 8" << endl;
60 }
61 }
EventLoop::handleRead()里调用了::read()
[cpp] view
plaincopyprint?
73 void EventLoop::handleRead()
74 {
75 uint64_t one = 1;
76 ssize_t n = ::read(_eventfd, &one, sizeof one);
77 if (n != sizeof one)
78 {
79 cout << "EventEventLoop::handleRead() reads " << n << " bytes instead of 8" << endl;
80 }
81 }
真正的事件处理过程位于EventLoop::loop()方法中,新添加的EventLoop::doPendingFunctors()方法正是用来触发异步调用的。
现在重新理顺一下EventLoop::queueLoop()方法的实现,这个方法其实就是先将一个代表回调的IRun*放入到EventLoop的vector中保存,然后就触发eventfd的事件,本次循环完毕,当下次EventLoop::loop循环到epoll_wait时,会因为eventfd的触发而返回,这时eventfd对应的Channel会被通知,进而通知到EventLoop::handleRead方法,我们在里面把事件读出来,这样确保事件只触发一次。EventLoop::loop循环会继续调用到doPendingFunctors()方法,这里面遍历保存IRun*的vector,于是所有异步事件就开始处理了。
EventLoop::loop()方法
[cpp] view
plaincopyprint?
25 void EventLoop::loop()
26 {
27 while(!_quit)
28 {
29 vector<Channel*> channels;
30 _pPoller->poll(&channels);
31
32 vector<Channel*>::iterator it;
33 for(it = channels.begin(); it != channels.end(); ++it)
34 {
35 (*it)->handleEvent();
36 }
37
38 doPendingFunctors();
39 }
40 }
EventLoop::queueLoop()方法
[cpp] view
plaincopyprint?
47 void EventLoop::queueLoop(IRun* pRun)
48 {
49 _pendingFunctors.push_back(pRun);
50 wakeup();
51 }
3 注意doPendingFunctors方法的实现,这里不是通过简单的遍历vector来调用回调,而是新建了一个vector,然后调用vector::swap方法将数组交换出来后再调用,这么做的目的是“减小临界区的长度和避免死锁”,在<<Linux多线程服务器端编程>>P295页有详细介绍。当然我们的mini-muduo目前还是单线程,影响不大。
4 之所以通过EventLoop::queueLoop()来异步触发onWriteComplate而不是直接在TcpConnection里触发onWriteComplate,我想是为了防止回调嵌套,因为我们在onMessage里调用了TcpConnection::send()方法,如果onWriteComplate又是直接在send里被调用的话,就会导致onMessage嵌套调用了onWriteComplate,这样事件的序列性被打破,会引入一堆问题。
5 因为目前程序都跑在一个进程的唯一线程中,muduo中的所有线程相关代码还未加入,当后面版本多线程被加入进来后,一些关键数据要被mutex保护起来。
6 将const string& 和 string* 都换成Buffer,保持和muduo一致,Buffer里只实现了几个基本方法,比如append()只实现了const string&版本而没有(const char* data, int len)版本,Buffer使用了std::string作为自己的存储介质,方法实现也比较粗糙,效率比较差,好处是简单易懂而且和原始muduo有相同的接口。muduo里的Buffer设计作者花费了一些心思,使用了栈和堆结合的方法,在书中7.4节已经进行了详细的介绍。Buffer::retrieve方法的作用是丢弃掉缓冲区里前n个字节。mini-muduo没有使用定制的string类而是直接使用了std::string。
7 muduo里另外的关于缓冲区的回调我在mini-muduo里没有实现,个人认为这不会影响到对基础框架的理解。比如“高水位回调”HighWaterMarkCallback,如果输出缓冲的长度超过用户制定的大小会触发。对这个回调的实现有兴趣的同学可参看muduo代码。
version 0.00 从epoll构建muduo-1 mini-muduo介绍
version 0.01 从epoll构建muduo-2 最简单的epoll
version 0.02 从epoll构建muduo-3 加入第一个类,顺便介绍reactor
version 0.03 从epoll构建muduo-4 加入Channel
version 0.04 从epoll构建muduo-5 加入Acceptor和TcpConnection
version 0.05 从epoll构建muduo-6 加入EventLoop和Epoll
version 0.06 从epoll构建muduo-7 加入IMuduoUser
version 0.07 从epoll构建muduo-8 加入发送缓冲区和接收缓冲区
version 0.08 从epoll构建muduo-9 加入onWriteComplate回调和Buffer
version 0.09 从epoll构建muduo-10 Timer定时器
version 0.11 从epoll构建muduo-11 单线程Reactor网络模型成型
version 0.12 从epoll构建muduo-12 多线程代码入场
version 0.13 从epoll构建muduo-13 Reactor + ThreadPool 成型
mini-muduo v0.08版本,这个版本完善了缓冲区的实现。mini-muduo完整可运行的示例可从github下载,使用命令git checkout v0.08可切换到此版本,在线浏览此版本到这里
本版本有两处重要修改,首先实现了IMuduoUser的onWriteComplate回调,这样当用户一次传送大量数据到网络库时,用户会在数据发送完成后收到通知。当然了发送小量数据完成时也会收到通知。其次本版本实现了专门用于表示缓冲区的Buffer类,只不过这个Buffer类的细节实现非常简单。
下面的条目1 ~ 条目5讲解onWireteComplate的实现细节,条目6讲解Buffer。
1 先来看看IMuduoUser的onWriteComplate回调,首先要明确的是onWriteComplate回调什么时候会被调用,那就是用户递交数据给网络库,当网络库把这些信息全部传送给操作系统后(通过::write()调用),用户对象的onWriteComplate会被回调到。muduo使用的是level triger的Epoll,所以应该在下面两个位置调用onWriteComplate。注意这两处不是直接通过_pUser->onWriteComplate(this);的方式调用的,而是使用了EventLoop::queueLoop方法来异步调用。下条说明会对queueLoop方法做详细介绍。在原始muduo里关于这个回调的讲解可参见<<LInux多线程服务端编程>>P322页。
位置1:第一次调用size_t n = ::write(...len...) 后,n和len相等的时候,这表明一次系统调用已经将数据全部发送完了,需要通知用户了。调用点位于TcpConnection.cc 91行
[cpp] view
plaincopyprint?
82 void TcpConnection::send(const string& message)
83 {
84 int n = 0;
85 if(_outBuf.readableBytes() == 0)
86 {
87 n = ::write(_sockfd, message.c_str(), message.size());
88 if(n < 0)
89 cout << "write error" << endl;
90 if(n == static_cast<int>(message.size()))
91 _pLoop->queueLoop(this); //invoke onWriteComplate
92 }
93
94 if( n < static_cast<int>(message.size()))
95 {
96 _outBuf.append(message.substr(n, message.size()));
97 if(_pChannel->isWriting())
98 {
99 _pChannel->enableWriting(); //add EPOLLOUT
100 }
101 }
102 }
位置2: 当某次调用size_t n = ::write(...len...) 后,n < len,表明操作系统无法全部接收本次递交给他的数据。当操作系统的发送缓冲区有更多可用空间时,通过让epoll_wait返回来通知我们发生了EPOLLOUT事件,这时网络库会把自己保存缓冲区数据继续送给操作系统,如果全部数据操作系统都接收完毕,这时也需要通知用户了。调用点位于TcpConnection.cc 76行。
[cpp] view
plaincopyprint?
63 void TcpConnection::handleWrite()
64 {
65 int sockfd = _pChannel->getSockfd();
66 if(_pChannel->isWriting())
67 {
68 int n = ::write(sockfd, _outBuf.peek(), _outBuf.readableBytes());
69 if( n > 0)
70 {
71 cout << "write " << n << " bytes data again" << endl;
72 _outBuf.retrieve(n);
73 if(_outBuf.readableBytes() == 0)
74 {
75 _pChannel->disableWriting(); //remove EPOLLOUT
76 _pLoop->queueLoop(this); //invoke onWriteComplate
77 }
78 }
79 }
80 }
在这两处设置好回调后,即可保证用户得到准确的通知。
2 EventLoop::queueLoop()方法,本版本新添加的方法,这是一个非常非常重要的方法。在没有这个方法之前,我们使用epoll_wait()接收的所有事件,都是来自操作系统的,比如EPOLLIN/EPOLLOUT,我们使用epollfd只是用来接收操作系统的网络通知。现在我们需要让epollfd多做点事情,让他能接收网络库自己发送的通知。这种通知有两个重要的价值
价值1: 使得网络库内部在单线程的情况下具备异步处理事件的能力。
价值2: 使得网络库的IO线程(跑epoll_wait的那个线程),可以接收来自非本线程的发送请求。
这种通知正是通过eventfd机制实现的,eventfd由Linux 2.6.22新引入,可以像socket一样被epollfd所监听,如果向eventfd写点东西,epoll就会获得这个通知并返回。EventLoop正是通过封装eventfd才获得了异步处理能力。
EventLoop::wakeup()里调用了::wirte()
[cpp] view
plaincopyprint?
53 void EventLoop::wakeup()
54 {
55 uint64_t one = 1;
56 ssize_t n = ::write(_eventfd, &one, sizeof one);
57 if (n != sizeof one)
58 {
59 cout << "EventLoop::wakeup() writes " << n << " bytes instead of 8" << endl;
60 }
61 }
EventLoop::handleRead()里调用了::read()
[cpp] view
plaincopyprint?
73 void EventLoop::handleRead()
74 {
75 uint64_t one = 1;
76 ssize_t n = ::read(_eventfd, &one, sizeof one);
77 if (n != sizeof one)
78 {
79 cout << "EventEventLoop::handleRead() reads " << n << " bytes instead of 8" << endl;
80 }
81 }
真正的事件处理过程位于EventLoop::loop()方法中,新添加的EventLoop::doPendingFunctors()方法正是用来触发异步调用的。
现在重新理顺一下EventLoop::queueLoop()方法的实现,这个方法其实就是先将一个代表回调的IRun*放入到EventLoop的vector中保存,然后就触发eventfd的事件,本次循环完毕,当下次EventLoop::loop循环到epoll_wait时,会因为eventfd的触发而返回,这时eventfd对应的Channel会被通知,进而通知到EventLoop::handleRead方法,我们在里面把事件读出来,这样确保事件只触发一次。EventLoop::loop循环会继续调用到doPendingFunctors()方法,这里面遍历保存IRun*的vector,于是所有异步事件就开始处理了。
EventLoop::loop()方法
[cpp] view
plaincopyprint?
25 void EventLoop::loop()
26 {
27 while(!_quit)
28 {
29 vector<Channel*> channels;
30 _pPoller->poll(&channels);
31
32 vector<Channel*>::iterator it;
33 for(it = channels.begin(); it != channels.end(); ++it)
34 {
35 (*it)->handleEvent();
36 }
37
38 doPendingFunctors();
39 }
40 }
EventLoop::queueLoop()方法
[cpp] view
plaincopyprint?
47 void EventLoop::queueLoop(IRun* pRun)
48 {
49 _pendingFunctors.push_back(pRun);
50 wakeup();
51 }
3 注意doPendingFunctors方法的实现,这里不是通过简单的遍历vector来调用回调,而是新建了一个vector,然后调用vector::swap方法将数组交换出来后再调用,这么做的目的是“减小临界区的长度和避免死锁”,在<<Linux多线程服务器端编程>>P295页有详细介绍。当然我们的mini-muduo目前还是单线程,影响不大。
4 之所以通过EventLoop::queueLoop()来异步触发onWriteComplate而不是直接在TcpConnection里触发onWriteComplate,我想是为了防止回调嵌套,因为我们在onMessage里调用了TcpConnection::send()方法,如果onWriteComplate又是直接在send里被调用的话,就会导致onMessage嵌套调用了onWriteComplate,这样事件的序列性被打破,会引入一堆问题。
5 因为目前程序都跑在一个进程的唯一线程中,muduo中的所有线程相关代码还未加入,当后面版本多线程被加入进来后,一些关键数据要被mutex保护起来。
6 将const string& 和 string* 都换成Buffer,保持和muduo一致,Buffer里只实现了几个基本方法,比如append()只实现了const string&版本而没有(const char* data, int len)版本,Buffer使用了std::string作为自己的存储介质,方法实现也比较粗糙,效率比较差,好处是简单易懂而且和原始muduo有相同的接口。muduo里的Buffer设计作者花费了一些心思,使用了栈和堆结合的方法,在书中7.4节已经进行了详细的介绍。Buffer::retrieve方法的作用是丢弃掉缓冲区里前n个字节。mini-muduo没有使用定制的string类而是直接使用了std::string。
7 muduo里另外的关于缓冲区的回调我在mini-muduo里没有实现,个人认为这不会影响到对基础框架的理解。比如“高水位回调”HighWaterMarkCallback,如果输出缓冲的长度超过用户制定的大小会触发。对这个回调的实现有兴趣的同学可参看muduo代码。
相关文章推荐
- 元素水平居中的几个方法
- AngularJS ng-model在ng-if里面无效
- Jsp语法结构
- [译]脱离jQuery,使用原生Ajax
- json与对象之间的转化(方案一)
- jquery-ui datepicker使用
- 【 D3.js 入门系列 --- 9.1 】 生产饼图
- 失物招领发布-HTML5调摄像头
- 使用CDN和AJAX加速WordPress中jQuery的加载
- 失物招领发布界面-表单设计
- CSS复习总结(4)
- jQuery的学习过程
- 在HTML中用Javascript接收参数
- AngularJs Cookies 操作
- js精度丢失解决办法
- js精度丢失解决办法
- js实现数字分页
- JS中的两个规范 CMD和AMD
- Windows10+Caffe+CUDA7.5+VS2013环境配置
- LintCode-剑指Offer-(56)两数之和