美文网首页
2.4、User’s guide (Coroutines)

2.4、User’s guide (Coroutines)

作者: 宝宝家的隔壁老王 | 来源:发表于2018-03-29 21:56 被阅读33次
Coroutines
协程在 tornado 的异步代码中是被推荐使用的。协程使用 python 的 yield 关键字去暂停和恢复执行,来替代回调链。(合作轻量级线程如 gevent 有时也被叫做协程,但是在 tornado 中,所有的协程使用显示的上下文切换被称为异步函数。)
协程和同步代码一样简单,但是不用花费一个线程的代价。通过减少上下文切换发生地方的数量,使并发更容易思考。

Example

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    return response.body
Python 3.5:async and wait
python 3.5 介绍了 async 和 await 关键字(使用这两个关键字的也被称为原生协程)。

从 tornado 4.3 版本,你也可以使用它们替代绝大多数的 yield-based 协程。简单的使用 async def foo() 代替使用 @gen.coroutine 装饰器,并且 await 代替 yield。

剩下的文档仍然使用 yield 风格以兼容旧版的 Python,但是 async 和 await 会运行的更快。
async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body
yield 关键字比 await 更通用;

例如在一个 yield-based 的协程,可以 yield 一个 Futures 列表,但是在原生的协程,你必须用 tornado.gen.multi 将他们包装起来。这也消除了同 concurrent.futures 的集成。

你可以使用 tornado.gen.convert_yielded 将 yield 的用法转换为 await 进行使用。
async def f():
    executor = concurrent.futures.ThreadPoolExecutor()
    await tornado.gen.convert_yielded(executor.submit(g))

How it works

一个函数包含了 yield,那么这个函数就是一个生成器。所有的生成器都是异步的,当调用时,他们返回一个生成器对象而不是执行完成。

@gen.coroutine 装饰器和生成器通过 yield 表达式,给协程的调用者返回一个 Future。
  • 简单版本的协程内部循环
# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)
装饰器从生成器获取了 Future 对象,等待(没有阻塞) Future 执行完成,然后将结果作为 yield 表达式传送给生成器。

大多数异步代码从未直接接触 Future 类,除了通过异步函数的 yield 表达式获取的 Future。

How to call a coroutine

协程不是通过正常的方式报出异常,他们提出的任何异常直到他被 yielded 将会被包装在 Future 里。

这意味着需要使用正确的方式去调用协程,否则可能忽略异常错误。
@gen.coroutine
def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    # 这里应该要报 ZeroDivisionError 错误
    divide(1, 0)
几乎所有的情况下,任何调用协程的函数本身也是协程,并且使用 yield 调用。

当你覆盖一个父类中定义的方法时,查阅文档确认协程是否被允许(文档应该说这个方法可能是一个协程或可能返回一个 Future)
@gen.coroutine
def good_call():
    # yield will unwrap the Future returned by divide() and raise
    # the exception.
    yield divide(1, 0)
有时候你想运行后放任不管一个协程(不等待它的运行结果),这种情况下,建议使用 IOLoop.spawn_callback,使 IOLoop 负责调用。

如果运行失败了,会将日志堆栈跟踪。
IOLoop.current().spawn_callback(divide, 1, 0)
使用 @gen.coroutine 的函数推荐使用 IOLoop.spawn_callback。

使用 async 的函数只能使用 IOLoop.spawn_callback (否则协程不会运行)
最后,在程序的顶层,如果 IOLoop 还没有运行,你可以通过 IOLoop.run_sync 开始 IOLoop,运行协程,然后停止 IOLoop 。通常用于启动一个面向批处理程序的主函数。
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

Coroutine patterns

  • Calling blocking functions
在协程里面调用阻塞函数最简单的方式是使用 IOLoop.run_in_executor,返回值是 Futures 兼容协程。
@gen.coroutine
def call_blocking():
    yield IOLoop.current().run_in_executor(blocking_func, args)
  • Parallelism
协程装饰器可以识别 value 值是 Futures 的列表和字典,并且等待所有的 Futures 并行
@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}
当使用 await 时,列表和字典必须使用 tornado.gen.multi 包装
async def parallel_fetch(url1, url2):
    resp1, resp2 = await gen.multi([http_client.fetch(url1),
                                    http_client.fetch(url2)])
  • Interleaving
有时候保存一个 Future 而不是马上使用 yield 是非常有效的,这样你可以在等待前开始其他的操作。
@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()
这种模式常在 @gen.coroutine 中使用。

如果 fetch_next_chunk() 是 async 函数,那么上述写法需变更为 fetch_future = tornado.gen.convert_yielded(self.fetch_next_chunk()) 来启动后台处理。
  • Looping
在原生协程中,async for 可以使用。

在旧版的 python 中,循环是棘手的,因为没有办法使用 yield 去迭代并且捕捉 for 或者 while 的循环的结果,你需要将循环条件拆解来获取结果,举例如下:
import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()
  • Running in the background
PeriodCallback 通常不用在协程,相反的,协程可以使用 while Ture: 循环并且使用 tornado.gen.sleep
@gen.coroutine
def minute_loop():
    while True:
        yield do_something()
        yield gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
上一个例子中,循环每 60+N 秒执行一次,N 是 do_something() 消耗的时间。如果需要每 60 秒执行一次,使用如下模式:
@gen.coroutine
def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        yield do_something()  # Run while the clock is ticking.
        yield nxt             # Wait for the timer to run out.

上一篇: 2.3、User’s guide (Queue)
下一篇: 2.5、User’s guide (Structure of a Tornado web application)

相关文章

网友评论

      本文标题:2.4、User’s guide (Coroutines)

      本文链接:https://www.haomeiwen.com/subject/bvivcftx.html