您的位置:首页 > 其它

利用tornado使请求实现异步非阻塞

2017-04-06 16:12 477 查看
基本IO模型

网上搜了很多关于同步异步,阻塞非阻塞的说法,理解还是不能很透彻,有必要买书看下。

参考:使用异步 I/O 大大提高应用程序的性能

怎样理解阻塞非阻塞与同步异步的区别?

同步和异步:主要关注消息通信机制(重点在B?)。

同步:A调用B,B处理直到获得结果,才返回给A。

异步:A调用B,B直接返回。无需等待结果,B通过状态,通知等来通知A或回调函数来处理。

阻塞和非阻塞:主要关注程序等待(重点在A?)。

阻塞:A调用B,A被挂起直到B返回结果给A,A继续执行。

非阻塞:A调用B,A不会被挂起,A可以执行其他操作(但可能A需要轮询检查B是否返回)。

同步阻塞:A调用B,A挂起,B处理直到获得结果,返回给A,A继续执行。

同步非阻塞:A调用B,A继续执行,B处理直到获得结果,处理的同时A轮询检查B是否返回结果。

异步阻塞:异步阻塞 I/O 模型的典型流程 (select)。

异步非阻塞:A调用B,B立即返回,A继续执行,B得到结果后通过状态,通知等通知A或回调函数处理。

tornado实现异步非阻塞

参考:

1. 使用tornado让你的请求异步非阻塞

2. 官方文档

利用异步方法(回调)和
@tornado.web.asynchronous


@tornado.web.asynchronous
并不能将一个同步方法变成异步,所以修饰在同步方法上是无效的,只是告诉框架,这个方法是异步的,且只能适用于HTTP verb方法(get、post、delete、put等)。
@tornado.web.asynchronous
装饰器适用于callback-style的异步方法,如果是协程则可以用
@tornado.gen.coroutine
来修饰。对于用
@tornado.web.asynchronous
修饰的异步方法,需要主动
self.finish()
来结束该请求,普通的方法(
get()
等)会自动结束请求在方法返回的时候。

最基本的使用callback-style的例子,直接使用异步方法,并定义callback方法。

# sync blocking
class SleepHandler(BaseHandler):

# no effective
@tornado.web.asynchronous
def get(self):
time.sleep(5)
self.write('sleep for 5s')

class SleepHandler(BaseHandler):
4000

@tornado.web.asynchronous
def get(self):
tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, callback=self.on_response)

def on_response(self):
self.write('sleep for 5s')
self.finish()

# call back
class MyRequestHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
http = httpclient.AsyncHTTPClient()
http.fetch('http://www.baidu.com', self._on_download)

def _on_download(self, response):
self.write(response.body)
self.finish()


利用ThreadPoolExecutor

利用ThreadPoolExecutor的submit和future对象的
add_done_callback
方法,将一个同步方法变成异步。如下例所示,若没有无参可以不用
partial()


class SleepHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
sleep_time = 5

def callback(future):
self.write(future.result())
self.finish()

EXECUTOR.submit(partial(self.get_sleep, sleep_time)).add_done_callback(
lambda future: tornado.ioloop.IOLoop.instance().add_callback(
partial(callback, future)))

def get_sleep(self, sleep_time):
time.sleep(sleep_time)
return "Awake! %s" % time.time()


分解一下:

1.
future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))
返回了一个
future
对象。

2.
future.add_done_callback()
future
添加一个完成回调函数
callback_func


3. 实际上这个回调函数是

callback_func = lambda future: tornado.ioloop.IOLoop.instance().add_callback(partial(callback, future))```
4. 结合起来就是:


class SleepHandler(BaseHandler):

@tornado.web.asynchronous

def get(self):

sleep_time = 5

def final_callback(future_param):
self.write(future_param.result())
self.finish()

def executor_callback(future_param):
tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future_param))

future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))
future.add_done_callback(executor_callback)

def get_sleep(self, sleep_time):
time.sleep(sleep_time)
return "Awake! %s" % time.time()


5. 没看文档,源码前本认为为什么要多此一举进行两次callback,直接


tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))

不就ok了吗,为何还要再加上一层


future.add_done_callback(executor_callback)

但发现直接这样是会阻塞IOLoop的。查阅相关文档后发现,```
tornado.ioloop.IOLoop.instance().add_callback
``` 这个方法会将控制权从其他线程转到IOLoop线程上,直接用

<div class="se-preview-section-delimiter"></div>


tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))

时,
final_callback()
中获取
future_param.result()
仍然会阻塞,所以需要
future.add_done_callback()“`,在该线程完成获取结果后在回callback给IOLoop。

另一种类似的写法,将上例的方法写成了一个装饰器,见下例,但个人认为利用这种方式异步时没有必要单独写一个装饰器吧,而且也不通用吧?

def unblock(f):
@tornado.web.asynchronous
@wraps(f)
def wrapper(*args, **kwargs):
self = args[0]

def callback(future):
self.write(future.result())
self.finish()
EXECUTOR.submit(
partial(f, *args, **kwargs)
).add_done_callback(
lambda future: tornado.ioloop.IOLoop.instance().add_callback(
partial(callback, future)))
return wrapper

class SleepHandler(BaseHandler):
@unblock
def get(self):
time.sleep(5)
return "Awake! %s" % time.time()


使用
tornado.concurrent.run_on_executor
简化


上述的两例都相当于开启了新的线程池?

除此之外还可以利用
@run_on_executor
装饰器将同步阻塞函数变成异步(或者说被tornado的装饰器理解和识别)。首先阅读一下
@run_on_executor
源码:

def run_on_executor(*args, **kwargs):
"""Decorator to run a synchronous method asynchronously on an executor.

The decorated method may be called with a ``callback`` keyword
argument and returns a future.

The `.IOLoop` and executor to be used are determined by the ``io_loop``
and ``executor`` attributes of ``self``. To use different attributes,
pass keyword arguments to the decorator::

@run_on_executor(executor='_thread_pool')
def foo(self):
pass

.. versionchanged:: 4.2
Added keyword arguments to use alternative attributes.
"""
def run_on_executor_decorator(fn):
executor = kwargs.get("executor", "executor")
io_loop = kwargs.get("io_loop", "io_loop")

@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
future = getattr(self, executor).submit(fn, self, *args, **kwargs)
if callback:
getattr(self, io_loop).add_future(
future, lambda future: callback(future.result()))
return future
return wrapper
if args and kwargs:
raise ValueError("cannot combine positional and keyword args")
if len(args) == 1:
return run_on_executor_decorator(args[0])
elif len(args) != 0:
raise ValueError("expected 1 argument, got %d", len(args))
return run_on_executor_decorator


可以很快发现在不存在
callback
关键字参数时,该装饰器返回了一个future对象,由此并结合上述两例子可以很快的利用
@run_on_executor
写出,感觉相当于简化了上述两例的代码并使之变得通用:

class SleepHandler(BaseHandler):

executor = ThreadPoolExecutor(2)

@tornado.web.asynchronous
def get(self):
def callback(future_param):
self.write('sleep %ss' % future_param.result())
self.finish()

future = self.sleep()
future.add_done_callback(lambda f: tornado.ioloop.IOLoop.instance().add_callback(
partial(callback, f)))

@run_on_executor
def sleep(self):
time.sleep(5)
return 5


当然也可以利用callback参数:

class SleepHandler(BaseHandler):
executor = ThreadPoolExecutor(2)
io_loop = tornado.ioloop.IOLoop.instance()

@tornado.web.asynchronous
def get(self):
def callback(res):
self.write('sleep %ss' % res)
self.finish()

self.sleep(callback=callback)

@run_on_executor
def sleep(self):
time.sleep(5)
return 5


协程方式:
@tornado.gen.coroutine
yield


除了上述的用利用
@tornado.web.asynchronous
和callback的方式,还可以用
@tornado.gen.coroutine
yield
,如下例子:

class GenRequestHandler(BaseHandler):
@tornado.gen.coroutine
def get(self):
http = httpclient.AsyncHTTPClient()
res = yield http.fetch('http://www.baidu.com')
self.write(res.body)


参考官方文档

Most asynchronous functions in Tornado return a Future

; yielding this object returns its result

.

利用
tornado.gen.Task
修饰一个callback型的异步函数使其能够与
yield
使用。

gen.Task is now a function that returns a Future

看一下例子(对于sleep可以直接用
yield tornado.gen.sleep(5)
):

# gen.Task
class SleepHandler(BaseHandler):

@tornado.gen.coroutine
def get(self):
yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 5)
# yield tornado.gen.sleep(5)
self.write('sleep for 5s')


还可以用结合celery使用(但并不是最好的方案)。

Last

https://github.com/tornadoweb/tornado/wiki/Links tornado的wiki上有很多异步相关支持的库。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: