您的位置:首页 > 运维架构

tornado源码阅读--ioloop篇

2014-04-14 18:40 281 查看
调用:
tornado.ioloop.IOLoop.instance().start()

ioloop
这个核心的
I/O 循环,作用不用说
就是,就是一个监听用户请求,然后映射具体的处理,
最后返回给用户想要的结果。

那首先从源码开始,看看它的定义和调用


class IOLoop(Configurable):
"""A level-triggered I/O loop.

/# 。。省略各种初始化操作。。#/

@staticmethod
def instance():
"""Returns a global `IOLoop` instance.

Most applications have a single,global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. To get the current thread's `IOLoop`,use `current()`.
"""
if not hasattr(IOLoop,"_instance"):
withIOLoop._instance_lock:
if not hasattr(IOLoop,"_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance

def start(self):


#。。。为空函数。。。#

raise NotImplementedError()

我们在上面的地方看到,每次程序运行的时候最后都掉用

tornado.ioloop.IOLoop.instance().start()

def instance():
if not hasattr(IOLoop,"_instance"):


withIOLoop._instance_lock:

if not hasattr(IOLoop,"_instance"):

# New instance after double check

IOLoop._instance = IOLoop()
#这是Tornado的一个设计比较好的地方,可以根据不同的系统选择同

#的IOLoop对象,由于我的是ubuntu系统,所以返回的是他下面生成的

#IOLoop()
#<tornado.platform.epoll.EPollIOLoop object at 0xa7c2e0c>
#EPollIOLoop 对象最终也是继承的IOLoop(),但是他的父类是
# class PollIOLoop(IOLoop)
所以真正调用的是这个类中的 def start()函数
#所以我们应该的去分析,这个函数里面的start()函数
return IOLoop._instance

然后我们来看源码的时候看到,start()函数居然是一个空函数,
难道我们看的什么地方有问题,但是也是好一阵惊吓,后面通过pdb的跟踪调试,
原来秘密就藏在IOLoop继承的Configurable类中,

在Configurable中会调用def __new__()这个函数,进行一些初始化,
tornado会根据你的系统类型自己选择异步I/O函数。我们可以来Configurable()中看一下

IOLoop()继承Configurable类,之后会调用,IOLoop继承的c()函数,得到你当前的系统类型,
然后选择你需要返回的IOLoop()

class IOLoop(Configurable):
@classmethod
def configurable_default(cls):if hasattr(select,"epoll"):from tornado.platform.epoll import EPollIOLoopreturn EPollIOLoop
if hasattr(select,"kqueue"):# Python 2.6+ on BSD or Macfrom tornado.platform.kqueue import KQueueIOLoopreturn KQueueIOLoop
from tornado.platform.select import SelectIOLoopreturn SelectIOLoop

#在这些类初始化的时候都是会先调用initialize()函数,进行初始化的操作。


class PollIOLoop(IOLoop):
def initialize(self,impl,time_func=None):

#。。。此处省略。。。#

# Create a pipe that we send bogus data to when we want to wake

# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd,events: self._waker.consume(),
self.READ)

重点来看start()函数的定义


IOLoop的start方法

IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用
start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。

为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。

超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback调用函数;如果没有超时需要重新设定 poll_timeout的值。

通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。

epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。

01
while
True
:
02
poll_timeout
=
3600.0
03
04
with
self
._callback_lock:
05
callbacks
=
self
._callbacks
06
self
._callbacks
=
[]
07
for
callback
in
callbacks:
08
self
._run_callback(callback)
09
10
#
超时处理
11
if
self
._timeouts:
12
now
=
time.time()
13
while
self
._timeouts:
14
if
self
._timeouts[
0
].callback
is
None
:
15
#
the timeoutwas cancelled
16
heapq.heappop(
self
._timeouts)
17
elif
self
._timeouts[
0
].deadline
<
=
now:
18
timeout
=
heapq.heappop(
self
._timeouts)
19
self
._run_callback(timeout.callback)
20
else
:
21
seconds
=
self
._timeouts[
0
].deadline
-
now
22
poll_timeout
=
min
(seconds,
poll_timeout)
23
break
24
25
if
self
._callbacks:
26
#
If any callbacksor timeouts called add_callback,
27
#
we don't want to wait in poll() before we run them.
28
poll_timeout
=
0.0
29
30
if
not
self
._running:
31
break
32
33
if
self
._blocking_signal_threshold
is
not
None
:
34
#
clear alarm so it doesn't fire while poll is waiting for events.
35
signal.setitimer(signal.ITIMER_REAL,
0
,
0
)
36
37
#
epoll阻塞,当有事件通知或超时返回event_pairs
38
try
:
39
event_pairs
=
self
._impl.poll(poll_timeout)
40
except
Exception,
e:
41
#
异常处理,省略
42
43
#
对epoll返回event_pairs事件的处理
44
self
._events.update(event_pairs)
45
while
self
._events:
46
fd,
events
=
self
._events.popitem()
47
try
:
48
self
._handlers[fd](fd,
events)
49
except
Exception
e:
50
#
异常处理,省略
其中最基础的接收到scoket.accecpt()函数,这里面的处理是在netutil.py里面。



可以自己跟踪。加上pdb进行调试。

这是自己第一次写技术Blog,没准有不入大神大婶发眼的,多多指正,欢迎大家多交流
QQ:625106674
Email: ky150@126.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: