muduo_net库源码分析(26-1
2013-11-07 09:21
375 查看
Channel是selectable IO channel,负责注册与响应IO 事件,它不拥有file descriptor。
Channel是Acceptor、Connector、EventLoop、TimerQueue、TcpConnection的成员,生命期由后者控制。
时序图
EventLoop头文件
eventloop.h// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file, it must only include public header files. #ifndef MUDUO_NET_EVENTLOOP_H #define MUDUO_NET_EVENTLOOP_H #include <vector> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> #include <muduo/base/CurrentThread.h> #include <muduo/base/Thread.h> #include <muduo/base/Timestamp.h> namespace muduo { namespace net { class Channel; //聚合 class Poller; //组合 /// /// Reactor, at most one per thread. /// /// This is an interface class, so don't expose too much details. class EventLoop : boost::noncopyable { public: EventLoop(); ~EventLoop(); // force out-line dtor, for scoped_ptr members. /// /// Loops forever. /// /// Must be called in the same thread as creation of the object. /// void loop(); void quit(); /// /// Time when poll returns, usually means data arrivial. /// Timestamp pollReturnTime() const { return pollReturnTime_; } // internal usage void updateChannel(Channel* channel); // 在Poller中添加(注册)或者更新通道 void removeChannel(Channel* channel); // 从Poller中移除通道 /*断言是否在Loop 线程中*/ void assertInLoopThread() { //如果不是在LoopThread中,终止程序 if (!isInLoopThread()) { abortNotInLoopThread(); } } //判断是在当前线程是否在LoopThread中 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } static EventLoop* getEventLoopOfCurrentThread(); private: void abortNotInLoopThread(); void printActiveChannels() const; // DEBUG typedef std::vector<Channel*> ChannelList; bool looping_; /*是否处于循环状态 atomic */ bool quit_; /*是否退出Loop atomic */ bool eventHandling_; /*当前是否处于事件处理的状态 atomic */ const pid_t threadId_; // 当前对象所属线程ID Timestamp pollReturnTime_; //调用poller时候的返回时间 boost::scoped_ptr<Poller> poller_; //poller对象,生命周期由EventLoop控制 ChannelList activeChannels_; // Poller返回的活动通道 , //就是活动状态的socket///typedef std::vector<Channel*> ChannelList; Channel* currentActiveChannel_; // 当前正在处理的活动通道 }; } } #endif // MUDUO_NET_EVENTLOOP_H
[/code]
EventLoop源文件
EventLoop.cc// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/EventLoop.h> #include <muduo/base/Logging.h> #include <muduo/net/Channel.h> #include <muduo/net/Poller.h> //#include <poll.h> using namespace muduo; using namespace muduo::net; namespace { // 当前线程EventLoop对象指针 // 线程局部存储 __thread EventLoop* t_loopInThisThread = 0; const int kPollTimeMs = 10000; } EventLoop* EventLoop::getEventLoopOfCurrentThread() { return t_loopInThisThread; } /* 事件循环,该函数不能夸线程调用 只能在创建该对象的线程中调用 **/ EventLoop::EventLoop() : looping_(false), quit_(false), eventHandling_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), currentActiveChannel_(NULL) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; // 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL) if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } //如果当前线程没有创建EventLoop对象,则创建EventLoop对象,并且进行绑定 else { t_loopInThisThread = this; } } EventLoop::~EventLoop() { t_loopInThisThread = NULL; } //这是IO线程 // 事件循环,该函数不能跨线程调用 // 只能在创建该对象的线程中调用 void EventLoop::loop() { //断言是否处于非循环状态 assert(!looping_); // 断言当前处于创建该对象的线程中 assertInLoopThread(); looping_ = true; LOG_TRACE << "EventLoop " << this << " start looping"; //::poll(NULL, 0, 5*1000); //循环直到退出 while (!quit_) { //先清除活动通道 activeChannels_.clear(); //调用poller_->poll返回活动的通道 &activeChannels_以及poll返回的时间 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //++iteration_; //输出 处于活动状态的channel 到日志中 if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority //当前事件处理状态设为true eventHandling_ = true; //循环处理活动通道的事件 for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } //把当前的活动通道设为 NULL currentActiveChannel_ = NULL; //当前事件处理状态设为false eventHandling_ = false; //doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } //该函数可以跨线程调用,这也是防止程序 void EventLoop::quit() { quit_ = true; //由于是多线程访问这个变量,那么是否需要保护这个变量??,要的。但是quit_是布尔类型,是一种对布尔类型的变量操作是一种原子操作,所以我们不用显式的进行保护了 if (!isInLoopThread()) { //wakeup(); } } // 用于注册或者更新 channel的事件 void EventLoop::updateChannel(Channel* channel) { // 断言Channel 是否属于当前的EventLoop对象 assert(channel->ownerLoop() == this); // 断言EventLoop所属的线程是当前的线程 assertInLoopThread(); // 更新channel的事件 poller_->updateChannel(channel); } void EventLoop::removeChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel); } //如果不是在LoopThread线程中将终止线程 void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); } /*打印处于活动状态的channel 到日志中*/ void EventLoop::printActiveChannels() const { for (ChannelList::const_iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { const Channel* ch = *it; LOG_TRACE << "{" << ch->reventsToString() << "} "; } }
[/code]
Poller头文件
poller.h// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file, you should not include this. #ifndef MUDUO_NET_POLLER_H #define MUDUO_NET_POLLER_H #include <vector> #include <boost/noncopyable.hpp> #include <muduo/base/Timestamp.h> #include <muduo/net/EventLoop.h> namespace muduo { namespace net { class Channel; /// /// Base class for IO Multiplexing /// /// This class doesn't own the Channel objects. class Poller : boost::noncopyable { public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); virtual ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0; /// Changes the interested I/O events. /// Must be called in the loop thread. virtual void updateChannel(Channel* channel) = 0; /// Remove the channel, when it destructs. /// Must be called in the loop thread. virtual void removeChannel(Channel* channel) = 0; static Poller* newDefaultPoller(EventLoop* loop); void assertInLoopThread() { ownerLoop_->assertInLoopThread(); } private: EventLoop* ownerLoop_; // Poller所属EventLoop }; } } #endif // MUDUO_NET_POLLER_H
[/code]
Poller源文件
poller.cc// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/Poller.h> using namespace muduo; using namespace muduo::net; Poller::Poller(EventLoop* loop) : ownerLoop_(loop) { } Poller::~Poller() { }
[/code]
PollPoller头文件
pollpoller.h// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file, you should not include this. #ifndef MUDUO_NET_POLLER_POLLPOLLER_H #define MUDUO_NET_POLLER_POLLPOLLER_H #include <muduo/net/Poller.h> #include <map> #include <vector> struct pollfd; namespace muduo { namespace net { /// /// IO Multiplexing with poll(2). /// class PollPoller : public Poller { public: PollPoller(EventLoop* loop); virtual ~PollPoller(); /*返回活动的通道列表*/ virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels); virtual void updateChannel(Channel* channel); virtual void removeChannel(Channel* channel); private: void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int, Channel*> ChannelMap; // key是文件描述符,value是Channel* PollFdList pollfds_; // 事件结构 /*被关注的通道列表 , 不是活动的通道列表*/ ChannelMap channels_; }; } } #endif // MUDUO_NET_POLLER_POLLPOLLER_H
[/code]
PollPoller源文件
pollpoller.cc// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/poller/PollPoller.h> #include <muduo/base/Logging.h> #include <muduo/base/Types.h> #include <muduo/net/Channel.h> #include <assert.h> #include <poll.h> using namespace muduo; using namespace muduo::net; PollPoller::PollPoller(EventLoop* loop) : Poller(loop) { } PollPoller::~PollPoller() { } /* 真正的poll 函数,终于到主角了^<>^ /// **/ Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels) { // XXX pollfds_ shouldn't change int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs); Timestamp now(Timestamp::now()); //如果有活动事件 , 则填充到activeChannels 中去 if (numEvents > 0) { LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents, activeChannels); } else if (numEvents == 0) { LOG_TRACE << " nothing happended"; } else { LOG_SYSERR << "PollPoller::poll()"; } return now; } //填充活动通道 // activeChannels 活动通道 // pollfds_ 已注册的事件集合 /*被关注的通道列表 , 不是活动的通道列表*/ void PollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd) { //返回活动的文件描述符 if (pfd->revents > 0) { --numEvents; // 从关注列表channels_中找出 已产生事件 的fd---》channel ChannelMap::const_iterator ch = channels_.find(pfd->fd); // 断言是否已近到了channels_的尾部 assert(ch != channels_.end()); // 获取活动channel Channel* channel = ch->second; // 断言channel的文件描述符是否和pfd 的描述符一致 assert(channel->fd() == pfd->fd); // 设置channel的事件 channel->set_revents(pfd->revents); // pfd->revents = 0; // 把活动channel 加入activeChannel容器 activeChannels->push_back(channel); } } } /* 更行channel通道, **/ void PollPoller::updateChannel(Channel* channel) { //断言实在LoopThread 当中 Poller::assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events(); //如果新的通道,还没有注册事件 if (channel->index() < 0) { // index < 0说明是一个新的通道 // a new one, add to pollfds_ //断言是新的通道 assert(channels_.find(channel->fd()) == channels_.end()); //注册事件 struct pollfd pfd; pfd.fd = channel->fd(); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; pollfds_.push_back(pfd); //channel在pollfds中索引 int idx = static_cast<int>(pollfds_.size())-1; //更新channel的索引 channel->set_index(idx); //加入channels_ 的map中 channels_[pfd.fd] = channel; } else { // update existing one //断言channel 已存在--》 channels_ ,index >0 < pollfds_.size() assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(pollfds_.size())); struct pollfd& pfd = pollfds_[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1); //下面才是真正的更新 pfd.events = static_cast<short>(channel->events()); //revents 貌似不用清零??? pfd.revents = 0; // 将一个通道暂时更改为不关注事件,但不从Poller中移除该通道 if (channel->isNoneEvent()) { // ignore this pollfd // 暂时忽略该文件描述符的事件 // 这里pfd.fd 可以直接设置为-1 , -1是为了排除0描述符的干扰 pfd.fd = -channel->fd()-1; // 这样子设置是为了removeChannel优化 } } } // 把channel 真正的从pollfds 中移除 void PollPoller::removeChannel(Channel* channel) { Poller::assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd(); assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); //要移除的channel一定是没有关注事件了 assert(channel->isNoneEvent()); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(pollfds_.size())); const struct pollfd& pfd = pollfds_[idx]; (void)pfd; assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events()); //1-----------》先把channel 从channels_中移除 size_t n = channels_.erase(channel->fd()); assert(n == 1); (void)n; //2-----------》如果是最后一个,则直接移除 if (implicit_cast<size_t>(idx) == pollfds_.size()-1) { pollfds_.pop_back(); } else { // 2----------》这里移除的算法复杂度是O(1),将待删除元素与最后一个元素交换再pop_back int channelAtEnd = pollfds_.back().fd; iter_swap(pollfds_.begin()+idx, pollfds_.end()-1); //channelAtEnd 是从 pollfds_ 里面拿出来的,所以可能是负数 if (channelAtEnd < 0) { channelAtEnd = -channelAtEnd-1; } channels_[channelAtEnd]->set_index(idx); pollfds_.pop_back(); } }
[/code]
Channel头文件
channel.h// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file, you should not include this. #ifndef MUDUO_NET_CHANNEL_H #define MUDUO_NET_CHANNEL_H #include <boost/function.hpp> #include <boost/noncopyable.hpp> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> #include <muduo/base/Timestamp.h> namespace muduo { namespace net { class EventLoop; /// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel : boost::noncopyable //不可拷贝的 { public: /*事件回调处理函数*/ typedef boost::function<void()> EventCallback; /*读事件的回调处理*/ typedef boost::function<void(Timestamp)> ReadEventCallback; /*一个EventLoop 包含多个Channel , 一个channel只属于一个EventLoop Channel(n)-----> EventLoop */ Channel(EventLoop* loop, int fd); //虚构函数没干什么事^V^ ~Channel(); void handleEvent(Timestamp receiveTime); /*回调函数的注册*/ // 读的回调函数 void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; } // 写的回调函数 void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; } // 关闭的回调函数 void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; } // 出错的回调函数 void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; } ////////////////////////////////////////////////// /* 这个函数下一节才开始分析^V^ ... 这个函数跟TcpConnection 生命周期是有关系的,为了防止在事件处理的时候 **/ /// Tie this channel to the owner object managed by shared_ptr, /// prevent the owner object being destroyed in handleEvent. void tie(const boost::shared_ptr<void>&); //channel 的描述符 int fd() const { return fd_; } // channel注册的事件(关注的事件) int events() const { return events_; } // poller 实际返回的事件, void set_revents(int revt) { revents_ = revt; } // used by pollers //清空channel的事件 // int revents() const { return revents_; } bool isNoneEvent() const { return events_ == kNoneEvent; } void enableReading() { events_ |= kReadEvent; update(); } // void disableReading() { events_ &= ~kReadEvent; update(); } void enableWriting() { events_ |= kWriteEvent; update(); } void disableWriting() { events_ &= ~kWriteEvent; update(); } void disableAll() { events_ = kNoneEvent; update(); } bool isWriting() const { return events_ & kWriteEvent; } // for Poller ; 在Poller数组中的索引 int index() { return index_; } // 设置channel在poller的索引 void set_index(int idx) { index_ = idx; } // for debug 事件的调试信息 string reventsToString() const; void doNotLogHup() { logHup_ = false; } EventLoop* ownerLoop() { return loop_; } void remove(); private: void update(); void handleEventWithGuard(Timestamp receiveTime); //事件常量 //没有事件就是说等于零 static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop* loop_; // 所属EventLoop const int fd_; // 文件描述符,但不负责关闭该文件描述符 int events_; // 关注的事件 int revents_; // poll/epoll返回的事件 int index_; // used by Poller.表示在poll的事件数组中的序号 bool logHup_; // for POLLHUP // 下面两个是负责生存期的控制 boost::weak_ptr<void> tie_; bool tied_; bool eventHandling_; // 是否处于处理事件中 ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; }; } } #endif // MUDUO_NET_CHANNEL_H
[/code]
Channel源文件
channel.cc// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/base/Logging.h> #include <muduo/net/Channel.h> #include <muduo/net/EventLoop.h> #include <sstream> #include <poll.h> using namespace muduo; using namespace muduo::net; const int Channel::kNoneEvent = 0; //没有事件 const int Channel::kReadEvent = POLLIN | POLLPRI; //产生量可读事件或者紧急数据 const int Channel::kWriteEvent = POLLOUT; Channel::Channel(EventLoop* loop, int fd__) : loop_(loop), fd_(fd__), events_(0), revents_(0), index_(-1), //没加入poller是,channel的索引为 -1 logHup_(true), tied_(false), eventHandling_(false) //channel的事件处理状态初始化为 false { } Channel::~Channel() { assert(!eventHandling_); } void Channel::tie(const boost::shared_ptr<void>& obj) { tie_ = obj; tied_ = true; } //更新channel的事件 void Channel::update() { loop_->updateChannel(this); } // 调用这个函数之前确保调用disableAll // void disableAll() { events_ = kNoneEvent; update()-->assert(isNoneEvent()); } void Channel::remove() { assert(isNoneEvent()); loop_->removeChannel(this); } /** 当时到达时,调用事件处理函数handleEvent进行处理 */ void Channel::handleEvent(Timestamp receiveTime) { boost::shared_ptr<void> guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } } /** 事件处理 **/ void Channel::handleEventWithGuard(Timestamp receiveTime) {//把channel的事件处理状态设为 true eventHandling_ = true; //如果poller返回的channel 事件是pollhup挂断(写时才会产生),并且不是可读的 if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { // 如果挂断了 if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } //如果挂断了,就返回closeCallback if (closeCallback_) closeCallback_(); } //文件描述符不合法或者文件描述符没有打开 if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } //可读事件 , //POLLRDHUP :stream socket peer closed connection ,or shutdown writing half of connection //POLLPRI : there is urgent data to read if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(receiveTime); } //可写事件 if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } //事件处理完后,把事件处理状态恢复为false状态 eventHandling_ = false; } string Channel::reventsToString() const { std::ostringstream oss; oss << fd_ << ": "; if (revents_ & POLLIN) oss << "IN "; if (revents_ & POLLPRI) oss << "PRI "; if (revents_ & POLLOUT) oss << "OUT "; if (revents_ & POLLHUP) oss << "HUP "; if (revents_ & POLLRDHUP) oss << "RDHUP "; if (revents_ & POLLERR) oss << "ERR "; if (revents_ & POLLNVAL) oss << "NVAL "; return oss.str().c_str(); }
[/code]
相关文章推荐
- GPIO模拟I2C程序实现.
- div css 实现tabs标签
- 开箱即用!Android四款系统架构工具
- 清华同方台试机Bioss默认密码
- 汇编语言程序设计读书笔记(3)- 程序范例
- 安全软件应该具备的,你具备了吗?
- TCP连接状态详解
- Makefile详解
- 调整VirtualBox的VDI硬盘文件大小
- 多线程与并发服务器设计(23 - 2 )
- muduo_net库源码分析(25)
- LINK : warning LNK4068: /MACHINE not specified; defaulting to IX86
- 关于Oracle EBS R12 表格XLA_TRANSACTION_ENTITIES 的安全策略(VPD)组研究
- ORACLE 等待事件的分类
- 多线程与并发服务器设计(23-1)
- ora_00845
- PHP识别电脑还是手机访问网站
- eclipse3.6默认指向WebContent目录修改为webRoot 设置说明 .
- tomcat6版本虚拟目录详细配置
- LogFile (22)