tornado源码阅读--ioloop篇
2014-04-14 18:40
281 查看
调用:
tornado.ioloop.IOLoop.instance().start()
I/O 循环,作用不用说
就是,就是一个监听用户请求,然后映射具体的处理,
最后返回给用户想要的结果。
那首先从源码开始,看看它的定义和调用
class IOLoop(Configurable):
"""A level-triggered I/O loop.
/# 。。省略各种初始化操作。。#/
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single,global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. To get the current thread's `IOLoop`,use `current()`.
"""
if not hasattr(IOLoop,"_instance"):
withIOLoop._instance_lock:
if not hasattr(IOLoop,"_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
def start(self):
#。。。为空函数。。。#
raise NotImplementedError()
我们在上面的地方看到,每次程序运行的时候最后都掉用
tornado.ioloop.IOLoop.instance().start()
def instance():
if not hasattr(IOLoop,"_instance"):
withIOLoop._instance_lock:
if not hasattr(IOLoop,"_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
#这是Tornado的一个设计比较好的地方,可以根据不同的系统选择同
#的IOLoop对象,由于我的是ubuntu系统,所以返回的是他下面生成的
#IOLoop()
#<tornado.platform.epoll.EPollIOLoop object at 0xa7c2e0c>
#EPollIOLoop 对象最终也是继承的IOLoop(),但是他的父类是
# class PollIOLoop(IOLoop)
所以真正调用的是这个类中的 def start()函数
#所以我们应该的去分析,这个函数里面的start()函数
return IOLoop._instance
然后我们来看源码的时候看到,start()函数居然是一个空函数,
难道我们看的什么地方有问题,但是也是好一阵惊吓,后面通过pdb的跟踪调试,
原来秘密就藏在IOLoop继承的Configurable类中,
在Configurable中会调用def __new__()这个函数,进行一些初始化,
tornado会根据你的系统类型自己选择异步I/O函数。我们可以来Configurable()中看一下
IOLoop()继承Configurable类,之后会调用,IOLoop继承的c()函数,得到你当前的系统类型,
然后选择你需要返回的IOLoop()
class IOLoop(Configurable):
@classmethod
def configurable_default(cls):if hasattr(select,"epoll"):from tornado.platform.epoll import EPollIOLoopreturn EPollIOLoop
if hasattr(select,"kqueue"):# Python 2.6+ on BSD or Macfrom tornado.platform.kqueue import KQueueIOLoopreturn KQueueIOLoop
from tornado.platform.select import SelectIOLoopreturn SelectIOLoop
#在这些类初始化的时候都是会先调用initialize()函数,进行初始化的操作。
class PollIOLoop(IOLoop):
def initialize(self,impl,time_func=None):
#。。。此处省略。。。#
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd,events: self._waker.consume(),
self.READ)
重点来看start()函数的定义
IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用
start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。
为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。
超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback调用函数;如果没有超时需要重新设定 poll_timeout的值。
通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
其中最基础的接收到scoket.accecpt()函数,这里面的处理是在netutil.py里面。
可以自己跟踪。加上pdb进行调试。
这是自己第一次写技术Blog,没准有不入大神大婶发眼的,多多指正,欢迎大家多交流
QQ:625106674
Email: ky150@126.com
tornado.ioloop.IOLoop.instance().start()
ioloop这个核心的
I/O 循环,作用不用说
就是,就是一个监听用户请求,然后映射具体的处理,
最后返回给用户想要的结果。
那首先从源码开始,看看它的定义和调用
class IOLoop(Configurable):
"""A level-triggered I/O loop.
/# 。。省略各种初始化操作。。#/
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single,global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. To get the current thread's `IOLoop`,use `current()`.
"""
if not hasattr(IOLoop,"_instance"):
withIOLoop._instance_lock:
if not hasattr(IOLoop,"_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
def start(self):
#。。。为空函数。。。#
raise NotImplementedError()
我们在上面的地方看到,每次程序运行的时候最后都掉用
tornado.ioloop.IOLoop.instance().start()
def instance():
if not hasattr(IOLoop,"_instance"):
withIOLoop._instance_lock:
if not hasattr(IOLoop,"_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
#这是Tornado的一个设计比较好的地方,可以根据不同的系统选择同
#的IOLoop对象,由于我的是ubuntu系统,所以返回的是他下面生成的
#IOLoop()
#<tornado.platform.epoll.EPollIOLoop object at 0xa7c2e0c>
#EPollIOLoop 对象最终也是继承的IOLoop(),但是他的父类是
# class PollIOLoop(IOLoop)
所以真正调用的是这个类中的 def start()函数
#所以我们应该的去分析,这个函数里面的start()函数
return IOLoop._instance
然后我们来看源码的时候看到,start()函数居然是一个空函数,
难道我们看的什么地方有问题,但是也是好一阵惊吓,后面通过pdb的跟踪调试,
原来秘密就藏在IOLoop继承的Configurable类中,
在Configurable中会调用def __new__()这个函数,进行一些初始化,
tornado会根据你的系统类型自己选择异步I/O函数。我们可以来Configurable()中看一下
IOLoop()继承Configurable类,之后会调用,IOLoop继承的c()函数,得到你当前的系统类型,
然后选择你需要返回的IOLoop()
class IOLoop(Configurable):
@classmethod
def configurable_default(cls):if hasattr(select,"epoll"):from tornado.platform.epoll import EPollIOLoopreturn EPollIOLoop
if hasattr(select,"kqueue"):# Python 2.6+ on BSD or Macfrom tornado.platform.kqueue import KQueueIOLoopreturn KQueueIOLoop
from tornado.platform.select import SelectIOLoopreturn SelectIOLoop
#在这些类初始化的时候都是会先调用initialize()函数,进行初始化的操作。
class PollIOLoop(IOLoop):
def initialize(self,impl,time_func=None):
#。。。此处省略。。。#
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd,events: self._waker.consume(),
self.READ)
重点来看start()函数的定义
IOLoop的start方法
IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。
为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。
超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback调用函数;如果没有超时需要重新设定 poll_timeout的值。
通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
01 | while True : |
02 | poll_timeout = 3600.0 |
03 |
04 | with self ._callback_lock: |
05 | callbacks = self ._callbacks |
06 | self ._callbacks = [] |
07 | for callback in callbacks: |
08 | self ._run_callback(callback) |
09 |
10 | # 超时处理 |
11 | if self ._timeouts: |
12 | now = time.time() |
13 | while self ._timeouts: |
14 | if self ._timeouts[ 0 ].callback is None : |
15 | # the timeoutwas cancelled |
16 | heapq.heappop( self ._timeouts) |
17 | elif self ._timeouts[ 0 ].deadline < = now: |
18 | timeout = heapq.heappop( self ._timeouts) |
19 | self ._run_callback(timeout.callback) |
20 | else : |
21 | seconds = self ._timeouts[ 0 ].deadline - now |
22 | poll_timeout = min (seconds, poll_timeout) |
23 | break |
24 |
25 | if self ._callbacks: |
26 | # |
27 | # we don't want to wait in poll() before we run them. |
28 | poll_timeout = 0.0 |
29 |
30 | if not self ._running: |
31 | break |
32 |
33 | if self ._blocking_signal_threshold is not None : |
34 | # clear alarm so it doesn't fire while poll is waiting for events. |
35 | signal.setitimer(signal.ITIMER_REAL, 0 , 0 ) |
36 |
37 | # epoll阻塞,当有事件通知或超时返回event_pairs |
38 | try : |
39 | event_pairs = self ._impl.poll(poll_timeout) |
40 | except Exception, e: |
41 | # 异常处理,省略 |
42 |
43 | # 对epoll返回event_pairs事件的处理 |
44 | self ._events.update(event_pairs) |
45 | while self ._events: |
46 | fd, events = self ._events.popitem() |
47 | try : |
48 | self ._handlers[fd](fd, events) |
49 | except Exception e: |
50 | # 异常处理,省略 |
可以自己跟踪。加上pdb进行调试。
这是自己第一次写技术Blog,没准有不入大神大婶发眼的,多多指正,欢迎大家多交流
QQ:625106674
Email: ky150@126.com
相关文章推荐
- tornado源码阅读
- 为什么要阅读Tornado的源码?
- Tornado源码阅读总览
- tornado源码阅读--开篇
- tornado源码阅读--Application篇
- tornado.gen 源码阅读
- Tornado源码阅读
- tornado IOLoop源码阅读
- Struts源码阅读心得之html:cancel篇
- 源码阅读分析 - Window底层原理与系统架构
- [hadoop源码阅读][6]-org.apache.hadoop.ipc-ipc总体结构和RPC
- Spring源码阅读4.2-Aspecjt AOP之代理对象的创建
- 源码阅读--Retrofit
- java阅读源码工具
- 阅读Sofia-SIP源码 - su模块 - su_configure.h/su_config.h/su_types.h
- tomcat源码阅读5
- 快速阅读源码方法
- nsq源码阅读 nsqlookupd源码三 tcp.go tcp_server.go
- tomcat源码阅读17
- 献给新手,如何阅读Linux源码(转)