tornado IOLoop源码阅读
2017-12-16 18:27
393 查看
tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。因此,有必要去了解一下关于epoll的预备知识。
select select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
poll pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
Linux IO模式及select,poll,epoll详解
1.int epoll_create(int size)
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。
ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
主要看Configurable类中的__new()__方法,这里base = cls.configurable_base(),但是cls.configurable_base()在这个类中是一个接口,显然是要在IOLoop中要实现的。
在IOLoop中,这个base类就是自己,于是走进了if判断中。于是关键就落在了impl这个变量上。接下来就轮到了cls.configured_class(),在configured_class()中又调用了configurable_default(),又是一个接口,再次回到IOLoop中查看
ok,到这里就清楚了,这是一个工厂函数,根据操作系统选择不同的event loop,找到EPollIOLoop
到这里已经理清了EPollIOLoop,PollIOLoop和IOLoop间的关系,可以进入关键方法阅读了。
类中首先定义了epoll 监听的事件,并重点对EPOLLIN 、 EPOLLOUT 、 EPOLLERR定义了处理事件。
instance 、 initialized、 install、 clear_instance、 current、 make_current、 clear_current 这些方法都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop。
initialize,close,add_handler,update_handler,remove_handler,start,stop等比较核心的方法都在下面的PollIOLoop中实现。
值得一提的是run_sync方法,其作用是提供给利用gen.coroutine的自定义协程方法使用IOLoop执行。
由前面epoll介绍可知道,epoll_create,epoll_ctl,epoll_wait,epoll_close四个api即可使用epoll。
epoll_create: epoll 的生成在EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create。
epoll_ctl: add_handler, update_handler, remove_handler三个方法对应epoll_ctl中的三个参数add 、 modify 、 del。
在add_handler中,self.split_fd方法将文件描述符或者file-like object包装成(文件描述符,object),self._handlers字段保存文件描述符对应的处理器,然后将需要监视的I/O事件注册到select中。在Tornado中只关心READ, WRITE, 和 ERROR事件,其中ERROR事件是自动添加的。
epoll_wait: epoll_wait 操作在下面要介绍的start() 中:event_pairs = self._impl.poll(poll_timeout)。
epoll_close:epoll 的 close 在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。
参考:
IOLoop模块解析
深入理解 tornado 之 底层 ioloop 实现
epoll
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。select select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
poll pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
Linux IO模式及select,poll,epoll详解
epoll操作过程
epoll操作过程需要三个接口,分别如下:int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1.int epoll_create(int size)
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; //events可以是以下几个宏的集合: EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
epoll工作模式
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。
ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
Tornado.IOLoop
了解完epoll,可以开始阅读源码了。在IOLoop.py中,主要有两个类,IOLoop和PollIOLoop。可是在IOLoop中,发现很多关键的方法都是作为了接口交由子类实现,比如PollIOLoop,而这两者的关系由一个Configurable类联系起来。另外值得注意的是IOLoop没有__init__,这一点同样可以在Configurable类中找到解释。class Configurable(object): __impl_class = None # type: type __impl_kwargs = None # type: Dict[str, Any] def __new__(cls, *args, **kwargs): base = cls.configurable_base() init_kwargs = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) else: impl = cls init_kwargs.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatibility with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(*args, **init_kwargs) return instance @classmethod def configurable_base(cls): # type: () -> Any # TODO: This class needs https://github.com/python/typing/issues/107 # to be fully typeable. raise NotImplementedError() @classmethod def configurable_default(cls): # type: () -> type raise NotImplementedError() def initialize(self): # type: () -> None @classmethod def configure(cls, impl, **kwargs): # type: (Any, **Any) -> None base = cls.configurable_base() if isinstance(impl, (str, unicode_type)): impl = import_object(impl) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl base.__impl_kwargs = kwargs @classmethod def configured_class(cls): # type: () -> type """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class @classmethod def _save_configuration(cls): # type: () -> Tuple[type, Dict[str, Any]] base = cls.configurable_base() return (base.__impl_class, base.__impl_kwargs) @classmethod def _restore_configuration(cls, saved): # type: (Tuple[type, Dict[str, Any]]) -> None base = cls.configurable_base() base.__impl_class = saved[0] base.__impl_kwargs = saved[1]
主要看Configurable类中的__new()__方法,这里base = cls.configurable_base(),但是cls.configurable_base()在这个类中是一个接口,显然是要在IOLoop中要实现的。
@classmethod def configurable_base(cls): return IOLoop
在IOLoop中,这个base类就是自己,于是走进了if判断中。于是关键就落在了impl这个变量上。接下来就轮到了cls.configured_class(),在configured_class()中又调用了configurable_default(),又是一个接口,再次回到IOLoop中查看
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
ok,到这里就清楚了,这是一个工厂函数,根据操作系统选择不同的event loop,找到EPollIOLoop
class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
到这里已经理清了EPollIOLoop,PollIOLoop和IOLoop间的关系,可以进入关键方法阅读了。
IOLoop
from __future__ import absolute_import, division, print_function, with_statement
import datetime
import errno
import functools
import heapq # 最小堆
import itertools
import logging
import numbers
import os
import select
import sys
import threading
import time
import traceback
import math
from tornado.concurrent import TracebackFuture, is_future
from tornado.log import app_log, gen_log
from tornado.platform.auto import set_close_exec, Waker
from tornado import stack_context
from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds
try:
import signal
except ImportError:
signal = None
if PY3:
import _thread as thread
else:
import thread
_POLL_TIMEOUT = 3600.0
class TimeoutError(Exception):
pass
class IOLoop(Configurable):
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
_current = threading.local()
@staticmethod
def instance():
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
@staticmethod
def initialized():
"""Returns true if the singleton instance has been
created."""
return hasattr(IOLoop, "_instance")
def install(self):
assert not IOLoop.initialized()
IOLoop._instance = self
@staticmethod
def clear_instance():
"""Clear the global `IOLoop` instance.
.. versionadded:: 4.0
"""
if hasattr(IOLoop, "_instance"):
del IOLoop._instance
@staticmethod
def current(instance=True):
current = getattr(IOLoop._current, "instance", None)
if current is None and instance:
return IOLoop.instance()
return current
def make_current(self):
IOLoop._current.instance = self
@staticmethod
def clear_current():
IOLoop._current.instance = None
@classmethod def configurable_base(cls): return IOLoop
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
def initialize(self, make_current=None):
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
elif make_current:
if IOLoop.current(instance=False) is not None:
raise RuntimeError("current IOLoop already exists")
self.make_current()
def close(self, all_fds=False):
raise NotImplementedError()
def add_handler(self, fd, handler, events):
raise NotImplementedError()
def update_handler(self, fd, events):
raise NotImplementedError()
def remove_handler(self, fd):
raise NotImplementedError()
def set_blocking_signal_threshold(self, seconds, action):
raise NotImplementedError()
def set_blocking_log_threshold(self, seconds):
self.set_blocking_signal_threshold(seconds, self.log_stack)
def log_stack(self, signal, frame):
gen_log.warning('IOLoop blocked for %f seconds in\n%s',
self._blocking_signal_threshold,
''.join(traceback.format_stack(frame)))
def start(self):
raise NotImplementedError()
def _setup_logging(self):
if not any(
[logging.getLogger().handlers,
logging.getLogger('tornado').handlers,
logging.getLogger('tornado.application').handlers]):
logging.basicConfig()
def stop(self):
raise NotImplementedError()
def run_sync(self, func, timeout=None):
future_cell = [None]
def run():
try:
result = func()
if result is not None:
from tornado.gen import convert_yielded
result = convert_yielded(result)
except Exception:
future_cell[0] = TracebackFuture()
future_cell[0].set_exc_info(sys.exc_info())
else:
if is_future(result):
future_cell[0] = result
else:
future_cell[0] = TracebackFuture()
future_cell[0].set_result(result)
self.add_future(future_cell[0], lambda future: self.stop())
self.add_callback(run)
if timeout is not None:
timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
self.start()
if timeout is not None:
self.remove_timeout(timeout_handle)
if not future_cell[0].done():
raise TimeoutError('Operation timed out after %s seconds' % timeout)
return future_cell[0].result()
def time(self):
return time.time()
...
类中首先定义了epoll 监听的事件,并重点对EPOLLIN 、 EPOLLOUT 、 EPOLLERR定义了处理事件。
READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP
instance 、 initialized、 install、 clear_instance、 current、 make_current、 clear_current 这些方法都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop。
initialize,close,add_handler,update_handler,remove_handler,start,stop等比较核心的方法都在下面的PollIOLoop中实现。
值得一提的是run_sync方法,其作用是提供给利用gen.coroutine的自定义协程方法使用IOLoop执行。
tornado.ioloop.PollIOLoop
class PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = collections.deque() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def close(self, all_fds=False): self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in list(self._handlers.values()): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL) def start(self): if self._running: raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() self._running = True # signal.set_wakeup_fd closes a race condition in event loops: # a signal may arrive at the beginning of select/poll/etc # before it goes into its interruptible sleep, so the signal # will be consumed without waking the select. The solution is # for the (C, synchronous) signal handler to write to a pipe, # which will then be seen by select. # # In python's signal handling semantics, this only matters on the # main thread (fortunately, set_wakeup_fd only works on the main # thread and will raise a ValueError otherwise). # # If someone has already set a wakeup fd, we don't want to # disturb it. This is an issue for twisted, which does its # SIGCHLD processing in response to its own wakeup fd being # written to. As long as the wakeup fd is registered on the IOLoop, # the loop will still wake up and everything should work. old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': # requires python 2.6+, unix. set_wakeup_fd exists but crashes # the python process on windows. try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: # Already set, restore previous value. This is a little racy, # but there's no clean get_wakeup_fd and in real use the # IOLoop is just started once at the beginning. signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: # Non-main thread, or the previous value of wakeup_fd # is no longer valid. old_wakeup_fd = None try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. ncallbacks = len(self._callbacks) # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that modify self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True self._waker.wake() def time(self): return self.time_func() def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): if self._closing: return # Blindly insert into self._callbacks. This is safe even # from signal handlers because deque.append is atomic. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if thread.get_ident() != self._thread_ident: # This will write one byte but Waker.consume() reads many # at once, so it's ok to write even when not strictly # necessary. self._waker.wake() else: # If we're on the IOLoop's thread, we don't need to wake anyone. pass def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): self.add_callback(callback, *args, **kwargs)
initialize
在PollIOLoop中,initialize方法得到了具体的实现def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl # 此处为epoll if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) # fork后关闭文件描述符 self.time_func = time_func or time.time self._handlers = {} # 存放每个fd对应的handler self._events = {} # 储存 epoll 返回的活跃的 fd event pairs self._callbacks = collections.deque() # 回调集 self._timeouts = [] # 关于超时任务的列表,在下面会发现是一个最小堆 self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() # Waker类,里面封装了一些对管道的操作,主要用于唤醒IOLoop self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # 将这个Waker加入epoll监听,并关注READ事件
epoll操作
def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
由前面epoll介绍可知道,epoll_create,epoll_ctl,epoll_wait,epoll_close四个api即可使用epoll。
epoll_create: epoll 的生成在EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create。
epoll_ctl: add_handler, update_handler, remove_handler三个方法对应epoll_ctl中的三个参数add 、 modify 、 del。
在add_handler中,self.split_fd方法将文件描述符或者file-like object包装成(文件描述符,object),self._handlers字段保存文件描述符对应的处理器,然后将需要监视的I/O事件注册到select中。在Tornado中只关心READ, WRITE, 和 ERROR事件,其中ERROR事件是自动添加的。
epoll_wait: epoll_wait 操作在下面要介绍的start() 中:event_pairs = self._impl.poll(poll_timeout)。
epoll_close:epoll 的 close 在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。
start
def start(self): if self._running: raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 获得当前线程ID self._running = True # set_wakeup_fd()用来设置当一个信号到来时要写入的fd,用于唤醒select或poll。在这里传入的是Waker管道的写端fd。 old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: # Non-main thread, or the previous value of wakeup_fd # is no longer valid. old_wakeup_fd = None try: while True: # 进入循环 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. ncallbacks = len(self._callbacks) due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务 if self._timeouts: # self._timeouts是一个最小堆,按照到期时间由近到远排序 now = self.time() while self._timeouts: # 循环直至为空 if self._timeouts[0].callback is None: # 无回调时,直接弹出,计数器减1 heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) # 过期任务加入过期列表 else: break # 当堆的大小超过一个数量后,对堆的整理将十分影响效率,于是在这里提前处理了堆,在计数器>512 并 >_timeouts长度的一半时,清零计数器,剔除无回调任务,重新整理堆 if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for i in range(ncallbacks): # 执行回调 self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: # 执行已过期任务的回调 if timeout.callback is not None: self._run_callback(timeout.callback) due_timeouts = timeout = None # 清空本次循环的超时 # poll_timeout是epoll的阻塞等待时间。 # 当前面的回调或超时又添加了回调任务时,设置为0即立即执行; # 有timeout需要处理时,计算第一个timeout(self._timeouts[0], # 最先超时需要处理的timeout)距离现在的超时间隔,取poll_timeout默认值与该间隔之间的最小值; # 什么都没有则取默认值 if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break # 进入poll之前调用signal.setitimer(signal.ITIMER_REAL, 0, 0)清理定时器, # 直到poll返回后重新设置定时器。 # set_blocking_signal_threshold()方法设置了当IOLoop阻塞超过多少秒后发送信号 if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) # 获取监听到的活跃事件 except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) self._events.update(event_pairs) # 将活跃事件加入 _events # 一个一个处理活跃事件 while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued # I/O循环结束重置_stopped状态,清理定时器,将当前IOLoop实例从当前线程移除绑定。 self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd)
参考:
IOLoop模块解析
深入理解 tornado 之 底层 ioloop 实现
相关文章推荐
- 为什么要阅读Tornado的源码?
- Tornado源码阅读总览
- tornado源码阅读--开篇
- tornado源码阅读--Application篇
- Tornado源码阅读
- tornado.gen 源码阅读
- tornado源码阅读--ioloop篇
- Tornado源码分析之IOLoop
- tornado源码阅读
- tornado源码学习之ioloop
- JDK7集合框架源码阅读(二) LinkedList
- memcached源码阅读笔记一
- Spark Shuffle数据处理过程与部分调优(源码阅读七)
- 阅读源码是和大师面对面交流的机会之ArrayList检查是否有重复元素
- Android源码在线阅读
- linux内核源码情景分析阅读笔记(1)
- Android系统源码阅读(11):Android的InputManagerService的工作过程
- [置顶] 源码阅读---Activity生命周期控制
- struts1源码阅读(2)
- Spark BlockManager的通信及内存占用分析(源码阅读九)