您的位置:首页 > 运维架构

muduo::EventLoop分析

2015-08-03 23:57 393 查看
EventLoop是整个Reactor的核心。其类图如下:



one loop per thread意味着每个线程只能有一个EventLoop对象,用变量

__thread EventLoop* t_loopInThisThread = 0;


表示,在创建EventLoop对象时将t_loopInThisThread赋值,以后再创建时就可以检查这个变量,如果已经赋值就说明当前线程已经创建过EventLoop对象了。线程调用静态函数EventLoop::getEventLoopOfCurrentThread就可以获得当前线程的EventLoop对象的指针了。

EventLoop有许多变量,几个bool变量,looping_:是否正在执行loop循环;quit_:是否已经调用quit()函数退出loop循环;eventHandling是否正在处理event事件;callingPendingFunctors是否正在调用pendingFunctors_的函数对象。

其他变量,poller_是用来调用pool或epool的,activeChannels_记录这激活事件的集合,currentActiveChannel_是当前正在处理的channel事件,pendingFunctors_是当前线程要执行任务的集合。可以在loop()函数中看到这一点:

void EventLoop::loop()//EventLoop在这里循环
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
  LOG_TRACE << "EventLoop " << this << " start looping";

  while (!quit_)
  {
    activeChannels_.clear();//清空激活事件集合
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//pool_wait或epoll_wait。阻塞在这里
    ++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      currentActiveChannel_->handleEvent(pollReturnTime_);//事件处理I/O
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    doPendingFunctors();
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}


线程在poller_->poll等待监听事件的到来,当poller_->poll返回后,监听到的事件放到了activeChannel中,随后一一处理激活事件。

最后面调用doPendingFunctors()是执行pendingFunctors_中的任务。

void EventLoop::doPendingFunctors()//执行任务队列中的任务
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  {
  MutexLockGuard lock(mutex_);//尽量让临界区小
  functors.swap(pendingFunctors_);
  }

  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}


EventLoop的owner线程除了等待poll、执行poll返回的激活事件,还可以处理一些其他任务,例如调用某一个回调函数,处理其他EventLoop对象的,调用void EventLoop::runInLoop(const Functor& cb)即可让EventLoop的owner线程执行cb函数。

void EventLoop::runInLoop(const Functor& cb)//可以夸线程调用
{
  if (isInLoopThread())//当前线程是ower线程则立即执行,否则放到ower线程的任务队列,异步执行
  {
    cb();
  }
  else
  {
    queueInLoop(cb);
  }
}

void EventLoop::queueInLoop(const Functor& cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(cb);
  }
//不是EventLoop的owner线程,或者是当前线程,但是正在执行任务队列中的任务
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();//唤醒owner线程
  }
}


如果时EventLoop的owner线程,会调用runInLoop会立即执行回调函数cb,否则会把回调函数放到任务队列(其实时vector),即调用queueInLoop函数。如果不是当前线程调用,或者正在执行pendingFunctors_中的任务,都要唤醒EventLoop的owner线程,让其执行pendingFunctors_中的任务。如果正在执行pendingFunctors_中的任务,添加新任务后不会执行新的任务,因为functors.swap(pendingFunctors_)后,执行的时functors中的任务。

这里的唤醒wakeup()用了eventfd。这是2.6内核新增的一个技术。

#include <sys/eventfd.h>  
int eventfd(unsigned int initval, int flags);


创建一个eventfd的fd后,就可以对它进行read、write等操作。eventfd相当于一个计数器,read以后计数器清零,write递增计数器;fd可以进行如下操作:select(poll、epoll)、close操作。

EventLoop中的wakeupFd_就是eventfd,wakeupChannel_和wakeupFd_相关联,EventLoop关注了wakeupChannel_的读事件,当要唤醒(即poller_->poll)时,写wakeupFd_即可。

void EventLoop::wakeup()//wakeupFd写,唤醒读wakeupFd的线程
{
  uint64_t one = 1;
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}


EventLoop中还有定时器,可以在某一时刻(runAt),未来多久(runAfter),每隔多久(runEvery)执行某一函数。定时用到了TimerQueue,稍后分析它。

TimerId EventLoop::runAt(const Timestamp& time, TimerCallback&& cb)
{
  return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

TimerId EventLoop::runAfter(double delay, TimerCallback&& cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, std::move(cb));
}

TimerId EventLoop::runEvery(double interval, TimerCallback&& cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(std::move(cb), time, interval);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: