您的位置:首页 > 运维架构

深入tornado中的ioLoop

2017-05-08 11:40 477 查看
本文所剖析的tornado源码版本为4.4.2ioloop是tornado的关键,是他的最底层。ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._instance中ioloop实现了Reactor模型,将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。另外,ioloop还被用来集中运行回调函数以及集中处理定时任务。

一 准备知识:

  1 首先我们要了解Reactor模型  2 其次,我们要了解I/O多路复用,由于本文假设系统为Linux,所以要了解epoll以及Python中的select模块  3 IOLoop类是Configurable类的子类,而Configurable类是一个工厂类,讲解在这

二 创建IOLoop实例

来看IOLoop,它的父类是Configurable类,也就是说:IOLoop是一个直属配置子类
class IOLoop(Configurable):
......
这里就要结合Configurable类进行讲解:

Configurable中的__new__方法
1 首先实例化一个该直属配置子类的'执行类对象',也就是调用该类的configurable_default方法并返回赋值给impl:
@classmethod    def configurable_default(cls):        if hasattr(select, "epoll"):     # 因为我们假设我们的系统为Linux,且支持epoll,所以这里为True
from tornado.platform.epoll import EPollIOLoop            return EPollIOLoop
if hasattr(select, "kqueue"):            # Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop            return KQueueIOLoop        from tornado.platform.select import SelectIOLoop        return SelectIOLoop

2 也就是impl是EPollIOLoop类对象,然后实例化该对象,运行其initialize方法
class EPollIOLoop(PollIOLoop):  # 该类只有这么短短的几句,可见主要的方法是在其父类PollIOLoop中实现。
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 执行了父类PollIOLoop的initialize方法,并将select.epoll()传入
  来看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())干了些啥:
class PollIOLoop(IOLoop):  # 从属配置子类

def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)                # 调用IOLoop的initialize方法
self._impl = impl                               # self._impl = select.epoll()
if hasattr(self._impl, 'fileno'):               # 文件描述符的close_on_exec属性            set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}                             # 文件描述符对应的fileno()作为key,(文件描述符对象,处理函数)作为value
self._events = {}                               # 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……}
self._callbacks = []
self._callback_lock = threading.Lock()          # 添加线程锁
self._timeouts = []                             # 存储定时任务
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None                       # 获得当前线程标识符
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()        # Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),                         lambda fd, events: self._waker.consume(),
self.READ)

  首先调用了IOLoop.initialize(self,**kwargs)方法:
initialize(self, make_current= make_current  IOLoop.current(instance=False)  IOLoop.current(instance=False)   RuntimeError( current(instance== getattr(IOLoop._current,  current  None = self

我们可以看到IOLoop.initialize()主要是对线程做了一些支持和操作。3 返回该实例

三 剖析PollIOLoop

1 处理I/O事件以及其对应handler的相关属性以及方法 使用self._handlers用来存储fd与handler的对应关系,文件描述符对应的fileno()作为key,元组(文件描述符对象,处理函数)作为value  self._events 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……} add_handler方法用来添加handler  update_handle方法用来更新handler remove_handler方法用来移除handler
def add_handler(self, fd, handler, events):        # 向epoll中注册事件 , 并在self._handlers[fd]中为该文件描述符添加相应处理函数
fd, obj = self.split_fd(fd)   # fd.fileno(),fd
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)    def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)    def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)        try:
self._impl.unregister(fd)        except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

2 处理回调函数的相关属性以及方法  self._callbacks用来存储回调函数  add_callback方法用来直接添加回调函数  add_future方法用来间接的添加回调函数,future对象详解在这
def add_callback(self, callback, *args, **kwargs):        # 因为Python的GIL的限制,导致Python线程并不算高效。加上tornado实现了多进程 + 协程的模式,所以我们略过源码中的部分线程相关的一些操作
if self._closing:            return
self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))    def add_future(self, future, callback):        # 为future对象添加经过包装后的回调函数,该回调函数会在future对象被set_done后添加至_callbacks中
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(            lambda future: self.add_callback(callback, future))

3 处理定时任务的相关属性以及方法  self._timeouts用来存储定时任务  self.add_timeout用来添加定时任务(self.call_later self.call_at都是间接调用了该方法)
def add_timeout(self, deadline, callback, *args, **kwargs):        """
``deadline``可能是一个数字,表示相对于当前时间的时间(与“IOLoop.time”通常为“time.time”相同的大小),或者是datetime.timedelta对象。
自从Tornado 4.0以来,`call_later`是一个比较方便的替代方案,因为它不需要timedelta对象。        """
if isinstance(deadline, numbers.Real):            return self.call_at(deadline, callback, *args, **kwargs)        elif isinstance(deadline, datetime.timedelta):            return self.call_at(self.time() + timedelta_to_seconds(deadline),
callback, *args, **kwargs)        else:            raise TypeError("Unsupported deadline %r" % deadline)

4 启动io多路复用器  启动也一般就意味着开始循环,那么循环什么呢?    1 运行回调函数    2 运行时间已到的定时任务    3 当某个文件描述法发生事件时,运行该事件对应的handler  使用start方法启动ioloop,看一下其简化版(去除线程相关,以及一些相对不重要的细节):
def start(self):        try:            while True:
callbacks = self._callbacks
self._callbacks = []
due_timeouts = []                # 将时间已到的定时任务放置到due_timeouts中,过程省略
for callback in callbacks:          # 执行callback
self._run_callback(callback)                for timeout in due_timeouts:        # 执行定时任务
if timeout.callback is not None:
self._run_callback(timeout.callback)
callbacks = callback = due_timeouts = timeout = None    # 释放内存
# 根据情况设置poll_timeout的值,过程省略
if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环
breaktry:
event_pairs = self._impl.poll(poll_timeout)                except Exception as e:                    if errno_from_exception(e) == errno.EINTR:  # 系统调用被信号处理函数中断,进行下一次循环
continue
else:                        raise
self._events.update(event_pairs)                while self._events:
fd, events = self._events.popitem()             # 获取一个fd以及对应事件
try:
fd_obj, handler_func = self._handlers[fd]   # 获取该fd对应的事件处理函数
handler_func(fd_obj, events)                # 运行该事件处理函数
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:     # 当客户端关闭连接时会产生EPIPE错误
pass
# 其他异常处理已经省略
fd_obj = handler_func = None       # 释放内存空间


start完整版
5 关闭io多路复用器
def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self.remove_handler(self._waker.fileno())        if all_fds:    # 该参数若为True,则表示会关闭所有文件描述符
for fd, handler in self._handlers.values():
self.close_fd(fd)
self._waker.close()
self._impl.close()
self._callbacks = None
self._timeouts = None

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Linux 处理器 复用器