您的位置:首页 > 其它

tornado.gen 源码阅读

2014-03-26 00:00 387 查看
<pre class="brush:python;toolbar: true; auto-links: false;">'''
import tornado.gen
import time
import pdb
import tornado.httpclient

@tornado.gen.engine
#@tornado.gen.coroutine
def genFun():
http_client = tornado.httpclient.AsyncHTTPClient()
# http_client.fetch('www.google.com', callback = (yield tornado.gen.Callback('key')))
# ret =yield tornado.gen.Wait('key')
ret = yield tornado.gen.Task(http_client.fetch,'www.google.com')
print ret
genFun()
tornado.ioloop.IOLoop.instance().start()
'''
'''
当调用getFun()时实际是getFun返回engine.wrapper,然后是wrapper()
wrapper并不是gen包的关键,Runner,YieldPoint 才是最重要的

'''
from __future__ import absolute_import, division, print_function, with_statement

import collections
import functools
import itertools
import sys
import types

from tornado.concurrent import Future, TracebackFuture
from tornado.ioloop import IOLoop
from tornado.stack_context import ExceptionStackContext, wrap
class KeyReuseError(Exception):
pass

class UnknownKeyError(Exception):
pass

class LeakedCallbackError(Exception):
pass

class BadYieldError(Exception):
pass

class ReturnValueIgnoredError(Exception):
pass

def engine(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
runner = None

def handle_exception(typ, value, tb):
# if the function throws an exception before its first "yield"
# (or is not a generator at all), the Runner won't exist yet.
# However, in that case we haven't reached anything asynchronous
# yet, so we can just let the exception propagate.
if runner is not None:
return runner.handle_exception(typ, value, tb)
return False
with ExceptionStackContext(handle_exception) as deactivate:
try:
'''
获取生成器,也就是上面的genFun,然后生成Runner对象,调用其run方法。

'''
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = getattr(e, 'value', None)
else:
if isinstance(result, types.GeneratorType):
def final_callback(value):
if value is not None:
raise ReturnValueIgnoredError(
"@gen.engine functions cannot return values: "
"%r" % (value,))
assert value is None
deactivate()
runner = Runner(result, final_callback)
runner.run()
return
if result is not None:
raise ReturnValueIgnoredError(
"@gen.engine functions cannot return values: %r" %
(result,))
deactivate()
# no yield, so we're done
return wrapper

def coroutine(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
runner = None
future = TracebackFuture()

if 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))

def handle_exception(typ, value, tb):
try:
if runner is not None and runner.handle_exception(typ, value, tb):
return True
except Exception:
typ, value, tb = sys.exc_info()
future.set_exc_info((typ, value, tb))
return True
with ExceptionStackContext(handle_exception) as deactivate:
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = getattr(e, 'value', None)
except Exception:
deactivate()
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, types.GeneratorType):
def final_callback(value):
deactivate()
future.set_result(value)
runner = Runner(result, final_callback)
runner.run()
return future
deactivate()
future.set_result(result)
return future
return wrapper

class Return(Exception):
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value
class YieldPoint(object):
def start(self, runner):
raise NotImplementedError()

def is_ready(self):
raise NotImplementedError()

def get_result(self):
raise NotImplementedError()

class Callback(YieldPoint):
def __init__(self, key):
self.key = key

def start(self, runner):
self.runner = runner
runner.register_callback(self.key)

def is_ready(self):
return True

def get_result(self):
return self.runner.result_callback(self.key)

class Wait(YieldPoint):
def __init__(self, key):
self.key = key

def start(self, runner):
self.runner = runner

def is_ready(self):
return self.runner.is_ready(self.key)

def get_result(self):
return self.runner.pop_result(self.key)

class WaitAll(YieldPoint):
def __init__(self, keys):
self.keys = keys

def start(self, runner):
self.runner = runner

def is_ready(self):
return all(self.runner.is_ready(key) for key in self.keys)

def get_result(self):
return [self.runner.pop_result(key) for key in self.keys]

class Task(YieldPoint):
def __init__(self, func, *args, **kwargs):
assert "callback" not in kwargs
self.args = args
self.kwargs = kwargs
self.func = func

def start(self, runner):
self.runner = runner
self.key = object()
runner.register_callback(self.key)
self.kwargs["callback"] = runner.result_callback(self.key)
self.func(*self.args, **self.kwargs)

def is_ready(self):
return self.runner.is_ready(self.key)

def get_result(self):
return self.runner.pop_result(self.key)

class YieldFuture(YieldPoint):
def __init__(self, future, io_loop=None):
self.future = future
self.io_loop = io_loop or IOLoop.current()

def start(self, runner):
if not self.future.done():
self.runner = runner
self.key = object()
runner.register_callback(self.key)
self.io_loop.add_future(self.future, runner.result_callback(self.key))
else:
self.runner = None
self.result = self.future.result()

def is_ready(self):
if self.runner is not None:
return self.runner.is_ready(self.key)
else:
return True

def get_result(self):
if self.runner is not None:
return self.runner.pop_result(self.key).result()
else:
return self.result

class Multi(YieldPoint):
def __init__(self, children):
self.keys = None
if isinstance(children, dict):
self.keys = list(children.keys())
children = children.values()
self.children = []
for i in children:
if isinstance(i, Future):
i = YieldFuture(i)
self.children.append(i)
assert all(isinstance(i, YieldPoint) for i in self.children)
self.unfinished_children = set(self.children)

def start(self, runner):
for i in self.children:
i.start(runner)

def is_ready(self):
finished = list(itertools.takewhile(
lambda i: i.is_ready(), self.unfinished_children))
self.unfinished_children.difference_update(finished)
return not self.unfinished_children

def get_result(self):
result = (i.get_result() for i in self.children)
if self.keys is not None:
return dict(zip(self.keys, result))
else:
return list(result)

class _NullYieldPoint(YieldPoint):
def start(self, runner):
pass

def is_ready(self):
return True

def get_result(self):
return None

_null_yield_point = _NullYieldPoint()

class Runner(object):
def __init__(self, gen, final_callback):
self.gen = gen
self.final_callback = final_callback
self.yield_point = _null_yield_point
self.pending_callbacks = set()
self.results = {}
self.running = False
self.finished = False
self.exc_info = None
self.had_exception = False

def register_callback(self, key):
"""Adds ``key`` to the list of callbacks."""
if key in self.pending_callbacks:
raise KeyReuseError("key %r is already pending" % (key,))
self.pending_callbacks.add(key)

def is_ready(self, key):
"""Returns true if a result is available for ``key``."""
if key not in self.pending_callbacks:
raise UnknownKeyError("key %r is not pending" % (key,))
return key in self.results     #key的结果是在results中

def set_result(self, key, result):
"""Sets the result for ``key`` and attempts to resume the generator."""
self.results[key] = result
self.run()

def pop_result(self, key):
"""Returns the result for ``key`` and unregisters it."""
self.pending_callbacks.remove(key)
return self.results.pop(key)

def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
if self.exc_info is None:
try:
if not self.yield_point.is_ready():    ''' 初始的yield_point 为_NullYieldPoint,因此while的第一次循环这里总是成立,'''
return
next = self.yield_point.get_result()   #get_result 返回None ,用None 来激活 生成器的y。
self.yield_point = None
except Exception:
self.exc_info = sys.exc_info()
try:
if self.exc_info is not None:
self.had_exception = True
exc_info = self.exc_info
self.exc_info = None
yielded = self.gen.throw(*exc_info)
else:
yielded = self.gen.send(next)  #这里是激活生成器,获得生成器中yield返回的对象,按照上面给的例子这里返回的是Task的对象。
except (StopIteration, Return) as e:
self.finished = True
self.yield_point = _null_yield_point
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning).  If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.final_callback(getattr(e, 'value', None))
self.final_callback = None
return
except Exception:
self.finished = True
self.yield_point = _null_yield_point
raise
if isinstance(yielded, (list, dict)):
yielded = Multi(yielded)
elif isinstance(yielded, Future):
yielded = YieldFuture(yielded)
if isinstance(yielded, YieldPoint):
self.yield_point = yielded
try:
self.yield_point.start(self)  #调用YieldPoint的start方法。执行真正的工作。
except Exception:
self.exc_info = sys.exc_info()
else:
self.exc_info = (BadYieldError(
"yielded unknown object %r" % (yielded,)),)
finally:
self.running = False

def result_callback(self, key):
def inner(*args, **kwargs):
if kwargs or len(args) > 1:
result = Arguments(args, kwargs)
elif args:
result = args[0]
else:
result = None
self.set_result(key, result)
return wrap(inner)

def handle_exception(self, typ, value, tb):
if not self.running and not self.finished:
self.exc_info = (typ, value, tb)
self.run()
return True
else:
return False

Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])

'''
整个核心的部分应该是Runner,还YieldPoint.
在Runner.run 方法中,构造出相应的YieldPoint子类的对象,yieldpoint的start方法中,

def start(self, runner):
self.runner = runner
self.key = object()
runner.register_callback(self.key)  ----------------------------->  runner.pending_callbacks.add(key)
self.kwargs["callback"] = runner.result_callback(self.key)-------------------------->生成一个callback函数,然后将callback传给真正的工作函数func,也就是开始例子中的fetch。如果返回了,就调用runer.set_result()
self.func(*self.args, **self.kwargs)

class  Runner:
def result_callback(self, key):
def inner(*args, **kwargs):
if kwargs or len(args) > 1:
result = Arguments(args, kwargs)
elif args:
result = args[0]
else:
result = None
self.set_result(key, result)
return wrap(inner)

执行完成以上操作后然后接着执行while的第二次循环,当验证 if not self.yield_point.is_ready():
如果返回true,run方法结束。
yield_point 的func执行完毕,调用runer.set_result()。runner.run(),进入while循环缺德取的结果next,send给genFun的yield,返回结果,进入下一次while循环,验证if not self.yield_point.is_ready()为True,结束循环。

如果是false,则表明已经拿到了最终的结果,send给genFun的yield,返回结果,进入下一次while循环,验证if not self.yield_point.is_ready()为True,结束循环。
'''</pre>
<p>
<br/>
</p>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: