您的位置:首页 > Web前端

从epoll构建muduo-9 加入onWriteComplate回调和Buffer

2015-12-05 11:18 676 查看
mini-muduo版本传送门

version 0.00 从epoll构建muduo-1 mini-muduo介绍

version 0.01 从epoll构建muduo-2 最简单的epoll

version 0.02 从epoll构建muduo-3 加入第一个类,顺便介绍reactor

version 0.03 从epoll构建muduo-4 加入Channel

version 0.04 从epoll构建muduo-5 加入Acceptor和TcpConnection

version 0.05 从epoll构建muduo-6 加入EventLoop和Epoll

version 0.06 从epoll构建muduo-7 加入IMuduoUser

version 0.07 从epoll构建muduo-8 加入发送缓冲区和接收缓冲区

version 0.08 从epoll构建muduo-9 加入onWriteComplate回调和Buffer

version 0.09 从epoll构建muduo-10 Timer定时器

version 0.11 从epoll构建muduo-11 单线程Reactor网络模型成型

version 0.12 从epoll构建muduo-12 多线程代码入场

version 0.13 从epoll构建muduo-13 Reactor + ThreadPool 成型

mini-muduo v0.08版本,这个版本完善了缓冲区的实现。mini-muduo完整可运行的示例可从github下载,使用命令git checkout v0.08可切换到此版本,在线浏览此版本到这里

 本版本有两处重要修改,首先实现了IMuduoUser的onWriteComplate回调,这样当用户一次传送大量数据到网络库时,用户会在数据发送完成后收到通知。当然了发送小量数据完成时也会收到通知。其次本版本实现了专门用于表示缓冲区的Buffer类,只不过这个Buffer类的细节实现非常简单。

下面的条目1 ~ 条目5讲解onWireteComplate的实现细节,条目6讲解Buffer。

1 先来看看IMuduoUser的onWriteComplate回调,首先要明确的是onWriteComplate回调什么时候会被调用,那就是用户递交数据给网络库,当网络库把这些信息全部传送给操作系统后(通过::write()调用),用户对象的onWriteComplate会被回调到。muduo使用的是level triger的Epoll,所以应该在下面两个位置调用onWriteComplate。注意这两处不是直接通过_pUser->onWriteComplate(this);的方式调用的,而是使用了EventLoop::queueLoop方法来异步调用。下条说明会对queueLoop方法做详细介绍。在原始muduo里关于这个回调的讲解可参见<<LInux多线程服务端编程>>P322页。

    位置1:第一次调用size_t n = ::write(...len...) 后,n和len相等的时候,这表明一次系统调用已经将数据全部发送完了,需要通知用户了。调用点位于TcpConnection.cc 91行

[cpp] view
plaincopyprint?





 82 void TcpConnection::send(const string& message)  

 83 {  

 84     int n = 0;  

 85     if(_outBuf.readableBytes() == 0)  

 86     {  

 87         n = ::write(_sockfd, message.c_str(), message.size());  

 88         if(n < 0)  

 89             cout << "write error" << endl;  

 90         if(n == static_cast<int>(message.size()))  

 91             _pLoop->queueLoop(this); //invoke onWriteComplate  

 92     }  

 93   

 94     if( n < static_cast<int>(message.size()))  

 95     {  

 96         _outBuf.append(message.substr(n, message.size()));  

 97         if(_pChannel->isWriting())  

 98         {  

 99             _pChannel->enableWriting(); //add EPOLLOUT  

100         }  

101     }  

102 }  

    位置2: 当某次调用size_t n = ::write(...len...) 后,n < len,表明操作系统无法全部接收本次递交给他的数据。当操作系统的发送缓冲区有更多可用空间时,通过让epoll_wait返回来通知我们发生了EPOLLOUT事件,这时网络库会把自己保存缓冲区数据继续送给操作系统,如果全部数据操作系统都接收完毕,这时也需要通知用户了。调用点位于TcpConnection.cc 76行。

[cpp] view
plaincopyprint?





63 void TcpConnection::handleWrite()  

64 {  

65     int sockfd = _pChannel->getSockfd();  

66     if(_pChannel->isWriting())  

67     {  

68         int n = ::write(sockfd, _outBuf.peek(), _outBuf.readableBytes());  

69         if( n > 0)  

70         {  

71             cout << "write " << n << " bytes data again" << endl;  

72             _outBuf.retrieve(n);  

73             if(_outBuf.readableBytes() == 0)  

74             {  

75                 _pChannel->disableWriting(); //remove EPOLLOUT  

76                 _pLoop->queueLoop(this); //invoke onWriteComplate  

77             }  

78         }  

79     }  

80 }  

在这两处设置好回调后,即可保证用户得到准确的通知。

2 EventLoop::queueLoop()方法,本版本新添加的方法,这是一个非常非常重要的方法。在没有这个方法之前,我们使用epoll_wait()接收的所有事件,都是来自操作系统的,比如EPOLLIN/EPOLLOUT,我们使用epollfd只是用来接收操作系统的网络通知。现在我们需要让epollfd多做点事情,让他能接收网络库自己发送的通知。这种通知有两个重要的价值

    价值1: 使得网络库内部在单线程的情况下具备异步处理事件的能力。

    价值2: 使得网络库的IO线程(跑epoll_wait的那个线程),可以接收来自非本线程的发送请求。

这种通知正是通过eventfd机制实现的,eventfd由Linux 2.6.22新引入,可以像socket一样被epollfd所监听,如果向eventfd写点东西,epoll就会获得这个通知并返回。EventLoop正是通过封装eventfd才获得了异步处理能力。

EventLoop::wakeup()里调用了::wirte()

[cpp] view
plaincopyprint?





 53 void EventLoop::wakeup()  

 54 {  

 55     uint64_t one = 1;  

 56     ssize_t n = ::write(_eventfd, &one, sizeof one);  

 57     if (n != sizeof one)  

 58     {  

 59         cout << "EventLoop::wakeup() writes " << n << " bytes instead of 8" << endl;  

 60     }  

 61 }  

EventLoop::handleRead()里调用了::read()

[cpp] view
plaincopyprint?





73 void EventLoop::handleRead()  

74 {  

75     uint64_t one = 1;  

76     ssize_t n = ::read(_eventfd, &one, sizeof one);  

77     if (n != sizeof one)  

78     {  

79         cout << "EventEventLoop::handleRead() reads " << n << " bytes instead of 8" << endl;  

80     }  

81 }  

真正的事件处理过程位于EventLoop::loop()方法中,新添加的EventLoop::doPendingFunctors()方法正是用来触发异步调用的。

现在重新理顺一下EventLoop::queueLoop()方法的实现,这个方法其实就是先将一个代表回调的IRun*放入到EventLoop的vector中保存,然后就触发eventfd的事件,本次循环完毕,当下次EventLoop::loop循环到epoll_wait时,会因为eventfd的触发而返回,这时eventfd对应的Channel会被通知,进而通知到EventLoop::handleRead方法,我们在里面把事件读出来,这样确保事件只触发一次。EventLoop::loop循环会继续调用到doPendingFunctors()方法,这里面遍历保存IRun*的vector,于是所有异步事件就开始处理了。

EventLoop::loop()方法

[cpp] view
plaincopyprint?





25 void EventLoop::loop()  

26 {  

27     while(!_quit)  

28     {  

29         vector<Channel*> channels;  

30         _pPoller->poll(&channels);  

31   

32         vector<Channel*>::iterator it;  

33         for(it = channels.begin(); it != channels.end(); ++it)  

34         {  

35             (*it)->handleEvent();  

36         }  

37   

38         doPendingFunctors();  

39     }  

40 }  

EventLoop::queueLoop()方法

[cpp] view
plaincopyprint?





47 void EventLoop::queueLoop(IRun* pRun)  

48 {  

49     _pendingFunctors.push_back(pRun);  

50     wakeup();  

51 }  

3 注意doPendingFunctors方法的实现,这里不是通过简单的遍历vector来调用回调,而是新建了一个vector,然后调用vector::swap方法将数组交换出来后再调用,这么做的目的是“减小临界区的长度和避免死锁”,在<<Linux多线程服务器端编程>>P295页有详细介绍。当然我们的mini-muduo目前还是单线程,影响不大。

4 之所以通过EventLoop::queueLoop()来异步触发onWriteComplate而不是直接在TcpConnection里触发onWriteComplate,我想是为了防止回调嵌套,因为我们在onMessage里调用了TcpConnection::send()方法,如果onWriteComplate又是直接在send里被调用的话,就会导致onMessage嵌套调用了onWriteComplate,这样事件的序列性被打破,会引入一堆问题。

5 因为目前程序都跑在一个进程的唯一线程中,muduo中的所有线程相关代码还未加入,当后面版本多线程被加入进来后,一些关键数据要被mutex保护起来。

6 将const string& 和 string* 都换成Buffer,保持和muduo一致,Buffer里只实现了几个基本方法,比如append()只实现了const string&版本而没有(const char* data, int len)版本,Buffer使用了std::string作为自己的存储介质,方法实现也比较粗糙,效率比较差,好处是简单易懂而且和原始muduo有相同的接口。muduo里的Buffer设计作者花费了一些心思,使用了栈和堆结合的方法,在书中7.4节已经进行了详细的介绍。Buffer::retrieve方法的作用是丢弃掉缓冲区里前n个字节。mini-muduo没有使用定制的string类而是直接使用了std::string。

7 muduo里另外的关于缓冲区的回调我在mini-muduo里没有实现,个人认为这不会影响到对基础框架的理解。比如“高水位回调”HighWaterMarkCallback,如果输出缓冲的长度超过用户制定的大小会触发。对这个回调的实现有兴趣的同学可参看muduo代码。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: