美文网首页
Pythton 3.11中如何限制asyncio.gather的

Pythton 3.11中如何限制asyncio.gather的

作者: GloriousFool | 来源:发表于2023-08-01 21:45 被阅读0次
import asyncio
import time


async def say_hello_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def run_with_limited_number_of_tasks(concurrency):
    semaphore = asyncio.Semaphore(concurrency)

    async def sem_task(task):
        async with semaphore:
            return await task

    print(f"starts at {time.strftime('%X')}")
    await asyncio.gather(*(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)]))
    print(f"ends at {time.strftime('%X')}")

解析

这段代码通过使用Python的asyncio.Semaphore来限制同时运行的协程数量。Semaphore是一种计数信号量,用于控制对共享资源的并发访问。
在代码中,run_with_limited_number_of_tasks函数接受一个concurrency参数,它代表允许同时运行的协程数量。semaphore = asyncio.Semaphore(concurrency)创建了一个信号量对象,并设置了允许的最大计数为concurrency

当调用say_hello_after函数时,它会创建一个asyncio.sleep异步任务,并在延迟后打印一条消息。然后,run_with_limited_number_of_tasks通过调用asyncio.gather来同时运行多个say_hello_after协程。在这里,它通过列表推导式创建了6个say_hello_after协程,然后用sem_task函数将它们包装在信号量上下文管理器中。

现在,让我们详细解释为什么这段代码可以限制在concurrency个协程以内同时运行:

asyncio.gather开始运行时,它会依次调用sem_task来运行协程,并且每次调用都会获得一个信号量资源。由于初始时信号量对象的计数为concurrency,因此最多可以同时有concurrency个协程在运行。

如果有空闲的信号量资源(计数大于零),则sem_task会立即获得一个信号量资源,并立即启动相应的say_hello_after协程。

如果当前正在运行的协程数量已经达到concurrency个,那么当有新的say_hello_after协程要运行时,它将等待,直到有一个信号量资源可用。这就实现了对同时运行协程数量的限制。

一旦某个say_hello_after协程完成(asyncio.sleep延迟结束),它将释放占用的信号量资源,使得另一个等待中的sem_task可以获得该资源,并启动新的say_hello_after协程。

总结来说,asyncio.Semaphore的作用在于控制同时运行的协程数量。这在某些情况下是很有用的,特别是当你需要限制对共享资源或外部服务的并发访问时,以避免资源竞争和提高效率。

提问:(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)])里在调用say_hello_after(1, f"hello from {i}")时不就应该已经执行了这个函数了吗?为什么它还能被传入sem_task并且被await呢?而且此时await tasktask后面并没有加括号,说明它此时已经是一个asyncio.Task了,为什么呢?

在表达式(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)])中,的确会立即调用say_hello_after(1, f"hello from {i}"),这将返回一个coroutine对象而不是执行该协程。这是因为协程函数(函数定义中包含async关键字的函数)返回的是一个协程对象,而不是立即执行协程。

当我们用asyncio.gather调用这些协程时,它实际上会将这些协程对象包装在一个Task对象中。在Python的asyncio库中,Task是一个表示正在进行的协程的对象,它封装了协程对象的执行状态。

所以,即使在asyncio.gather的参数中调用了say_hello_after(1, f"hello from {i}"),实际上是在创建一个Task对象并将其包含在asyncio.gather中。asyncio.gather会同时运行这些Task对象,并在它们完成时返回结果(或抛出异常)。

总结起来,在这段代码中:

  • 列表推导式创建了6个say_hello_after协程对象并以他们为入参传递给了协程函数sem_task
  • asyncio.gather将这些协程对象封装为Task对象,并同时运行它们。
  • 通过await task等待每个协程对象的完成,实现了限制同时运行的协程数量在concurrency以内的目的。
  • 这种使用列表推导式和asyncio.gather来同时运行多个协程并限制并发数的方式在asyncio中是一种常见的技术。

提问:“当我们用asyncio.gather调用这些协程时,它实际上会将这些协程对象包装在一个Task对象中。”那也应该是sem_task的返回值被包装成Task,而不是sem_task函数的入参say_hello_after(1, f"hello from {i}")被转化为Task,为什么还是可以被await呢?

Python3的asyncio库并没有被要求被await的一定要是asyncio.Task对象,协程对象也是可以被await的。不过当一个Task对象被await的时候,事件循环(event loop)并没有被阻塞;而当一个协程对象被await的时候,包围此await语句的所有代码都将暂停执行直到await的协程对象执行完毕(可理解代码段此刻变为串行执行)。所以,当我们基于上面定义的两个函数来运行下面的代码时:

if __name__ == '__main__':
    asyncio.run(run_with_limited_number_of_tasks(3))

我们会得到下面的结果:

starts at 21:38:25
hello from 0
hello from 1
hello from 2
hello from 3
hello from 4
hello from 5
ends at 21:38:27

从上面的结果可以看出,我们的代码是在一定基础上“按顺序”执行的,但是实际执行所花的时间并不是6秒(所有任务各1秒),而是按3个一组将任务分为2组来运行的,因此耗时为2秒。

如果我们不是await的一个协程对象,而是一个Task对象:

if __name__ == '__main__':
    async def await_tasks():
        print(f"starts at {time.strftime('%X')}")
        await asyncio.gather(*[say_hello_after(1, f"hello from {i}") for i in range(6)])
        print(f"ends at {time.strftime('%X')}")
    asyncio.run(await_tasks())

那么它的结果则更为随机,而且6个任务的执行总时间为1秒:

starts at 21:45:11
hello from 0
hello from 2
hello from 5
hello from 4
hello from 1
hello from 3
ends at 21:45:12

相关文章

网友评论

      本文标题:Pythton 3.11中如何限制asyncio.gather的

      本文链接:https://www.haomeiwen.com/subject/pwdupdtx.html