您的位置:首页 > 运维架构

tornado IOLoop源码阅读

2017-12-16 18:27 393 查看
tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。因此,有必要去了解一下关于epoll的预备知识。

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

select select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

poll pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

Linux IO模式及select,poll,epoll详解

epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);


1.int epoll_create(int size)

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议

当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

函数是对指定描述符fd执行op操作。

- epfd:是epoll_create()的返回值。

- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。

- fd:是需要监听的fd(文件描述符)

- epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:

struct epoll_event {
__uint32_t events;  /* Epoll events */
epoll_data_t data;  /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里


3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

等待epfd上的io事件,最多返回maxevents个事件。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

epoll工作模式

epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

  LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

  ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

Tornado.IOLoop

了解完epoll,可以开始阅读源码了。在IOLoop.py中,主要有两个类,IOLoop和PollIOLoop。可是在IOLoop中,发现很多关键的方法都是作为了接口交由子类实现,比如PollIOLoop,而这两者的关系由一个Configurable类联系起来。另外值得注意的是IOLoop没有__init__,这一点同样可以在Configurable类中找到解释。

class Configurable(object):
__impl_class = None  # type: type
__impl_kwargs = None  # type: Dict[str, Any]

def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic.  If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance

@classmethod
def configurable_base(cls):
# type: () -> Any
# TODO: This class needs https://github.com/python/typing/issues/107 # to be fully typeable.
raise NotImplementedError()

@classmethod
def configurable_default(cls):
# type: () -> type
raise NotImplementedError()

def initialize(self):
# type: () -> None

@classmethod
def configure(cls, impl, **kwargs):
# type: (Any, **Any) -> None
base = cls.configurable_base()
if isinstance(impl, (str, unicode_type)):
impl = import_object(impl)
if impl is not None and not issubclass(impl, cls):
raise ValueError("Invalid subclass of %s" % cls)
base.__impl_class = impl
base.__impl_kwargs = kwargs

@classmethod
def configured_class(cls):
# type: () -> type
"""Returns the currently configured class."""
base = cls.configurable_base()
if cls.__impl_class is None:
base.__impl_class = cls.configurable_default()
return base.__impl_class

@classmethod
def _save_configuration(cls):
# type: () -> Tuple[type, Dict[str, Any]]
base = cls.configurable_base()
return (base.__impl_class, base.__impl_kwargs)

@classmethod
def _restore_configuration(cls, saved):
# type: (Tuple[type, Dict[str, Any]]) -> None
base = cls.configurable_base()
base.__impl_class = saved[0]
base.__impl_kwargs = saved[1]


主要看Configurable类中的__new()__方法,这里base = cls.configurable_base(),但是cls.configurable_base()在这个类中是一个接口,显然是要在IOLoop中要实现的。

@classmethod
def configurable_base(cls):
return IOLoop


在IOLoop中,这个base类就是自己,于是走进了if判断中。于是关键就落在了impl这个变量上。接下来就轮到了cls.configured_class(),在configured_class()中又调用了configurable_default(),又是一个接口,再次回到IOLoop中查看

@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop


ok,到这里就清楚了,这是一个工厂函数,根据操作系统选择不同的event loop,找到EPollIOLoop

class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)


到这里已经理清了EPollIOLoop,PollIOLoop和IOLoop间的关系,可以进入关键方法阅读了。

IOLoop

from __future__ import absolute_import, division, print_function, with_statement

import datetime
import errno
import functools
import heapq # 最小堆
import itertools
import logging
import numbers
import os
import select
import sys
import threading
import time
import traceback
import math

from tornado.concurrent import TracebackFuture, is_future
from tornado.log import app_log, gen_log
from tornado.platform.auto import set_close_exec, Waker
from tornado import stack_context
from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds

try:
import signal
except ImportError:
signal = None

if PY3:
import _thread as thread
else:
import thread

_POLL_TIMEOUT = 3600.0

class TimeoutError(Exception):
pass

class IOLoop(Configurable):
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)

# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP

# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()

_current = threading.local()

@staticmethod
def instance():
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance

@staticmethod
def initialized():
"""Returns true if the singleton instance has been
created."""
return hasattr(IOLoop, "_instance")

def install(self):
assert not IOLoop.initialized()
IOLoop._instance = self

@staticmethod
def clear_instance():
"""Clear the global `IOLoop` instance.
.. versionadded:: 4.0
"""
if hasattr(IOLoop, "_instance"):
del IOLoop._instance

@staticmethod
def current(instance=True):
current = getattr(IOLoop._current, "instance", None)
if current is None and instance:
return IOLoop.instance()
return current

def make_current(self):
IOLoop._current.instance = self

@staticmethod
def clear_current():
IOLoop._current.instance = None

@classmethod def configurable_base(cls): return IOLoop

@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop

def initialize(self, make_current=None):
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
elif make_current:
if IOLoop.current(instance=False) is not None:
raise RuntimeError("current IOLoop already exists")
self.make_current()

def close(self, all_fds=False):
raise NotImplementedError()

def add_handler(self, fd, handler, events):
raise NotImplementedError()

def update_handler(self, fd, events):
raise NotImplementedError()

def remove_handler(self, fd):
raise NotImplementedError()

def set_blocking_signal_threshold(self, seconds, action):
raise NotImplementedError()

def set_blocking_log_threshold(self, seconds):
self.set_blocking_signal_threshold(seconds, self.log_stack)

def log_stack(self, signal, frame):
gen_log.warning('IOLoop blocked for %f seconds in\n%s',
self._blocking_signal_threshold,
''.join(traceback.format_stack(frame)))

def start(self):
raise NotImplementedError()

def _setup_logging(self):
if not any(
[logging.getLogger().handlers,
logging.getLogger('tornado').handlers,
logging.getLogger('tornado.application').handlers]):
logging.basicConfig()

def stop(self):
raise NotImplementedError()

def run_sync(self, func, timeout=None):
future_cell = [None]

def run():
try:
result = func()
if result is not None:
from tornado.gen import convert_yielded
result = convert_yielded(result)
except Exception:
future_cell[0] = TracebackFuture()
future_cell[0].set_exc_info(sys.exc_info())
else:
if is_future(result):
future_cell[0] = result
else:
future_cell[0] = TracebackFuture()
future_cell[0].set_result(result)
self.add_future(future_cell[0], lambda future: self.stop())
self.add_callback(run)
if timeout is not None:
timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
self.start()
if timeout is not None:
self.remove_timeout(timeout_handle)
if not future_cell[0].done():
raise TimeoutError('Operation timed out after %s seconds' % timeout)
return future_cell[0].result()

def time(self):
return time.time()
...


类中首先定义了epoll 监听的事件,并重点对EPOLLIN 、 EPOLLOUT 、 EPOLLERR定义了处理事件。

READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP


instance 、 initialized、 install、 clear_instance、 current、 make_current、 clear_current 这些方法都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop。

initialize,close,add_handler,update_handler,remove_handler,start,stop等比较核心的方法都在下面的PollIOLoop中实现。

值得一提的是run_sync方法,其作用是提供给利用gen.coroutine的自定义协程方法使用IOLoop执行。

tornado.ioloop.PollIOLoop

class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.

For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = collections.deque()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()

# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)

def close(self, all_fds=False):
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in list(self._handlers.values()):
self.close_fd(fd)
self._waker.close()
self._impl.close()
self._callbacks = None
self._timeouts = None

def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)

def start(self):
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True

# signal.set_wakeup_fd closes a race condition in event loops:
# a signal may arrive at the beginning of select/poll/etc
# before it goes into its interruptible sleep, so the signal
# will be consumed without waking the select.  The solution is
# for the (C, synchronous) signal handler to write to a pipe,
# which will then be seen by select.
#
# In python's signal handling semantics, this only matters on the
# main thread (fortunately, set_wakeup_fd only works on the main
# thread and will raise a ValueError otherwise).
#
# If someone has already set a wakeup fd, we don't want to
# disturb it.  This is an issue for twisted, which does its
# SIGCHLD processing in response to its own wakeup fd being
# written to.  As long as the wakeup fd is registered on the IOLoop,
# the loop will still wake up and everything should work.
old_wakeup_fd = None
if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
# requires python 2.6+, unix.  set_wakeup_fd exists but crashes
# the python process on windows.
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -1:
# Already set, restore previous value.  This is a little racy,
# but there's no clean get_wakeup_fd and in real use the
# IOLoop is just started once at the beginning.
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
# Non-main thread, or the previous value of wakeup_fd
# is no longer valid.
old_wakeup_fd = None

try:
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
ncallbacks = len(self._callbacks)

# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
# schedule anything in this iteration.
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# The timeout was cancelled.  Note that the
# cancellation check is repeated below for timeouts
# that are cancelled by another timeout or callback.
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)

for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
due_timeouts = timeout = None

if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
poll_timeout = 0.0
elif self._timeouts:
# If there are any timeouts, schedule the first one.
# Use self.time() instead of 'now' to account for time
# spent running callbacks.
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT

if not self._running:
break

if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)

try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise

if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)

# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that modify self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None

finally:
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)

def stop(self):
self._running = False
self._stopped = True
self._waker.wake()

def time(self):
return self.time_func()

def call_at(self, deadline, callback, *args, **kwargs):
timeout = _Timeout(
deadline,
functools.partial(stack_context.wrap(callback), *args, **kwargs),
self)
heapq.heappush(self._timeouts, timeout)
return timeout

def remove_timeout(self, timeout):
# Removing from a heap is complicated, so just leave the defunct
# timeout object in the queue (see discussion in
# http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
self._cancellations += 1

def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
# Blindly insert into self._callbacks. This is safe even
# from signal handlers because deque.append is atomic.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if thread.get_ident() != self._thread_ident:
# This will write one byte but Waker.consume() reads many
# at once, so it's ok to write even when not strictly
# necessary.
self._waker.wake()
else:
# If we're on the IOLoop's thread, we don't need to wake anyone.
pass

def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():
self.add_callback(callback, *args, **kwargs)


initialize

在PollIOLoop中,initialize方法得到了具体的实现

def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl  # 此处为epoll
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())  # fork后关闭文件描述符
self.time_func = time_func or time.time
self._handlers = {}  # 存放每个fd对应的handler
self._events = {}  # 储存 epoll 返回的活跃的 fd event pairs
self._callbacks = collections.deque()  # 回调集
self._timeouts = []  # 关于超时任务的列表,在下面会发现是一个最小堆
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()

# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()  # Waker类,里面封装了一些对管道的操作,主要用于唤醒IOLoop
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)  # 将这个Waker加入epoll监听,并关注READ事件


epoll操作

def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)


由前面epoll介绍可知道,epoll_create,epoll_ctl,epoll_wait,epoll_close四个api即可使用epoll。

epoll_create: epoll 的生成在EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create。

epoll_ctl: add_handler, update_handler, remove_handler三个方法对应epoll_ctl中的三个参数add 、 modify 、 del。

在add_handler中,self.split_fd方法将文件描述符或者file-like object包装成(文件描述符,object),self._handlers字段保存文件描述符对应的处理器,然后将需要监视的I/O事件注册到select中。在Tornado中只关心READ, WRITE, 和 ERROR事件,其中ERROR事件是自动添加的。

epoll_wait: epoll_wait 操作在下面要介绍的start() 中:event_pairs = self._impl.poll(poll_timeout)。

epoll_close:epoll 的 close 在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。

start

def start(self):
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident() # 获得当前线程ID
self._running = True

# set_wakeup_fd()用来设置当一个信号到来时要写入的fd,用于唤醒select或poll。在这里传入的是Waker管道的写端fd。
old_wakeup_fd = None
if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -1:
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
# Non-main thread, or the previous value of wakeup_fd
# is no longer valid.
old_wakeup_fd = None
try:
while True:  # 进入循环
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
ncallbacks = len(self._callbacks)
due_timeouts = []  # 用于存放这个周期内已过期( 已超时 )的任务
if self._timeouts:  # self._timeouts是一个最小堆,按照到期时间由近到远排序
now = self.time()
while self._timeouts:  # 循环直至为空
if self._timeouts[0].callback is None:  # 无回调时,直接弹出,计数器减1
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))  # 过期任务加入过期列表
else:
break
# 当堆的大小超过一个数量后,对堆的整理将十分影响效率,于是在这里提前处理了堆,在计数器>512 并 >_timeouts长度的一半时,清零计数器,剔除无回调任务,重新整理堆
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)

for i in range(ncallbacks):  # 执行回调
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts: # 执行已过期任务的回调
if timeout.callback is not None:
self._run_callback(timeout.callback)
due_timeouts = timeout = None  # 清空本次循环的超时

# poll_timeout是epoll的阻塞等待时间。
# 当前面的回调或超时又添加了回调任务时,设置为0即立即执行;
# 有timeout需要处理时,计算第一个timeout(self._timeouts[0],
# 最先超时需要处理的timeout)距离现在的超时间隔,取poll_timeout默认值与该间隔之间的最小值;
# 什么都没有则取默认值
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
poll_timeout = 0.0
elif self._timeouts:
# If there are any timeouts, schedule the first one.
# Use self.time() instead of 'now' to account for time
# spent running callbacks.
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT

if not self._running:
break

# 进入poll之前调用signal.setitimer(signal.ITIMER_REAL, 0, 0)清理定时器,
# 直到poll返回后重新设置定时器。
# set_blocking_signal_threshold()方法设置了当IOLoop阻塞超过多少秒后发送信号
if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)

try:
event_pairs = self._impl.poll(poll_timeout)  # 获取监听到的活跃事件
except Exception as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise

if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
self._events.update(event_pairs)  # 将活跃事件加入 _events
# 一个一个处理活跃事件
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None

finally:
# reset the stopped flag so another start/stop pair can be issued
#  I/O循环结束重置_stopped状态,清理定时器,将当前IOLoop实例从当前线程移除绑定。
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)


参考:

IOLoop模块解析

深入理解 tornado 之 底层 ioloop 实现
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息