Python 协程深入理解

从语法上来看,协程和生成器类似,都是定义体中包含yield关键字的函数。 yield在协程中的用法:

  • 在协程中yield通常出现在表达式的右边,例如:datum = yield,可以产出值,也可以不产出--如果yield关键字后面没有表达式,那么生成器产出None.
  • 协程可能从调用方接受数据,调用方是通过send(datum)的方式把数据提供给协程使用,而不是next(...)函数,通常调用方会把值推送给协程。
  • 协程可以把控制器让给中心调度程序,从而激活其他的协程

    所以总体上在协程中把yield看做是控制流程的方式

了解协程的过程

def simple_coroutine():
    print('coroutine start')
    x = yield
    print('coroutine received :', x)


my_coro = simple_coroutine()
next(my_coro)
my_coro.send(10)
运行结果
coroutine start
coroutine received : 10
Traceback (most recent call last):
  File "d:\Code\Python\day6\app.py", line 19, in <module>
    my_coro.send(10)
StopIteration

yield右边没有没有表达式,默认产出的值是None,刚开始调用next()方法,目的在于激活协程,程序就会运行到x = yield处,这里需要注意,这里程序运行到x = yield处,并没有将值赋值给x,而是计算yield 后面的值表达式,然后返回给next()方法,当这个生成器send()一个值给协程之后,从暂定处yieldsend的这个值赋值给x,然后继续运行,直到运行到下一个yield处。

当程序运行到最后,会自动抛出一个StopIteration的异常,当捕获异常之后,可以找到这个生成器最后的值

def simple_coroutine():
    print('coroutine start')
    x = yield
    print('coroutine received :', x)


try:
    my_coro = simple_coroutine()
    next(my_coro)
    my_coro.send(10)
except StopIteration as e:
    print('执行完毕之后的值:', e.value)
运行结果
coroutine start
coroutine received : 10
执行完毕之后的值: None

JavaScript Generator类似,看一个例子


def simple_coroutine(x, y):
    z = yield x + y
    x = yield z * x
    y = yield x + y + z
    return y


try:
    my_coro = simple_coroutine(5, 6)
    print(next(my_coro))
    print(my_coro.send(30))
    print(my_coro.send(8))
    print(my_coro.send('Done'))
except StopIteration as e:
    print('执行完毕之后的值:', e.value)
运行结果
11
150
44
执行完毕之后的值: (8, 'Done', 30)

当预激活传入x = 5,y =6时,第一次调用next()当遇到yield关键字,则交出函数的控制权,将yield后面的表达式计算出并返回给next(my_coro)中,所以当print(next(my_coro))的时候,值是x + y = 11

第二步send(30)即恢复函数的执行权,并将30赋值给第一次交出函数控制权的地方,即 z = yield x + y处,此时send的值为30,则将z = 30简单讲就是将 yield x + y 替换成 传入的值30,所以z = 30 继续执行,此时x = 5,y = 6 z= 30遇到yield关键字,交出函数的控制权,并计算yield后面的表达式返回,此时表达式为z * x,当前z = 30, x = 5,所以计算出值为150

第三步,send(8) 恢复函数执行权,并将8赋值给上一次交出函数控制权的地方,将 8 赋值给 x ,此时x = 8,y = 6, z= 30,继续运行程序,当遇到yield x + y +z继续交出函数控制权,返回x + y + z表达式的值44

第四步,send('Done') 继续恢复函数执行权,将Done 赋值给上一次交出函数控制权的地方,将Done 赋值给y ,此时x = 8, y = 'Done', z = 30,继续执行,知道执行到return处,整个控制流程结束,Python抛出StopIteration异常,捕获异常可以得到return的值(8, 'Done', 30)

运行过程

协程的运行过程中有4个状态

  • GEN_CREATE:等待开始执行
  • GEN_RUNNING:解释器正在执行,这个状态一般看不到
  • GEN_SUSPENDED:在yield表达式处暂停
  • GEN_CLOSED:执行结束

通过导入from inspect import getgeneratorstate来获取协程状态

from inspect import getgeneratorstate


def simple_coroutine(x, y):
    z = yield x + y
    x = yield z * x
    y = yield x + y + z
    return (x, y, z)


try:
    my_coro = simple_coroutine(5, 6)
    print(getgeneratorstate(my_coro))
    print(next(my_coro))
    print(getgeneratorstate(my_coro))
    print(my_coro.send(30))
    print(my_coro.send(8))
    print(my_coro.send('Done'))
except StopIteration as e:
    print(getgeneratorstate(my_coro))
    print('执行完毕之后的值:', e.value)
运行结果
GEN_CREATED
11
GEN_SUSPENDED
150
44
GEN_CLOSED
执行完毕之后的值: (8, 'Done', 30)

可以看到在未调用next()方法时,协程的状态为GUN_CREATED,在开始执行的时候协程的状态为GEN_SUSPENDED,最后执行完毕之后状态为GEN_CLOSED

预激协程的装饰器

from functools import wraps


def coroutine(func):
    @wraps(func)
    def prime(*args, **kwargs):
        gen = func(*args, **kwargs)
        print(next(gen))
        return gen
    return prime


@coroutine
def simple_coroutine(x, y):
    z = yield x + y
    x = yield z * x
    y = yield x + y + z
    return (x, y, z)


try:
    coro_arg = simple_coroutine(5, 6)
    print(coro_arg.send(30))
    print(coro_arg.send(8))
    coro_arg.send(None)
except StopIteration as e:
    print(e.value)

关于预激,在使用yield from句法调用协程的时候,会自动预激活,这样其实与我们上面定义的coroutine装饰器是不兼容的,在python3.4里面的asyncio.coroutine装饰器不会预激协程,因此兼容yield from

关于yield from

yield from 是在Python3.3才出现的语法。所以这个特性在Python2中是没有的

yield from 后面需要加的是可迭代对象,它可以是普通的可迭代对象,也可以是迭代器,甚至是生成器。

简单应用:拼接可迭代对象

使用yield和使用yield from的例子来对比

myStr = 'abc'
myList = [1, 2, 3]
mydict = {'name': 'aaron', 'age': '21'}
mygen = (i for i in range(4, 9))


def gen(*args):
    for item in args:
        for i in item:
            yield i


newList = gen(myStr, myList, mydict, mygen)
print(list(newList))
# ['a', 'b', 'c', 1, 2, 3, 'name', 'age', 4, 5, 6, 7, 8]
myStr = 'abc'
myList = [1, 2, 3]
mydict = {'name': 'aaron', 'age': '21'}
mygen = (i for i in range(4, 9))


def gen(*args):
    for item in args:
        yield from item


newList = gen(myStr, myList, mydict, mygen)
print(list(newList))
# ['a', 'b', 'c', 1, 2, 3, 'name', 'age', 4, 5, 6, 7, 8]

由上面两种方式对比,可以看出,yield from后面加上可迭代对象,他可以把可迭代对象里的每个元素一个一个的yield出来,对比yield来说代码更加简洁,结构更加清晰

复杂应用:生成器的嵌套

yield from 后面加上一个生成器后,就实现了生成的嵌套。

当然实现生成器的嵌套,并不是一定必须要使用yield from,而是使用yield from可以让我们避免让我们自己处理各种料想不到的异常,而让我们专注于业务代码的实现。

  • 调用方:调用委派生成器的客户端(调用方)代码
  • 委托生成器: 包含yield from表达式的生成器函数
  • 子生成器: yield from后面加的生成器函数
# 委托生成器
def gen():
    while True:
        yield from averger_gen()

# 子生成器


def averger_gen():
    total = 0
    count = 0
    averger = 0
    while True:
        averger = yield averger
        total += averger
        count += 1
        averger = total / count


gen = gen()
next(gen)
print(gen.send(10))
print(gen.send(20))
print(gen.send(30))

委托生成器的作用是:在调用方与子生成器之间建立一个双向通道

调用方可以通过send()直接发送消息给子生成器,而子生成器yield的值,也是直接返回给调用方

委托生成器,只起一个桥梁作用,它建立的是一个双向通道,它并没有权利也没有办法,对子生成器yield回来的内容做拦截。

# 委托生成器
def gen():
    while True:
        # 只有子生成器要结束(return)了,yield from左边的变量才会被赋值,后面的代码才会执行。
        total, count, averger = yield from averger_gen()
        print('计算完成!总共计算:{}个,总和:{}分,平均分:{}'.format(count, total, averger))

# 子生成器


def averger_gen():
    total = 0
    count = 0
    averger = 0
    while True:
        term = yield averger
        if term is None:
            break
        total += term
        count += 1
        averger = total / count
    # 每一次return,都意味着当前协程结束。
    return total, count, averger


gen = gen()
next(gen)
print(gen.send(10))
print(gen.send(20))
print(gen.send(30))
gen.send(None)  # 结束协程
运行结果
10.0
15.0
20.0
计算完成!总共计算:3个,总和:60分,平均分:20.0

python3.9 的协程

通过 async/await 语法来声明 协程 是编写 asyncio 应用的推荐方式,例如,以下代码段会打印 "hello",等待 1 秒,再打印 "world":

import asyncio


async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')


if __name__ == '__main__':
    asyncio.run(main())

[!NOTE]

注意:简单地调用一个协程并不会使其被调度执行

要真正运行一个协程,asyncio 提供了三种主要机制:

  • asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数

  • 对协程执行 await。以下代码段会在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
started at 15:43:07
hello
world
finished at 15:43:10

Process finished with exit code 0

让我们修改以上示例,并发 运行两个 say_after 协程:

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

[!NOTE]

注意,预期的输出显示代码段的运行时间比之前快了 1 秒:

started at 15:45:11
hello
world
finished at 15:45:13

Process finished with exit code 0

可等待的协程

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程, 任务Future.

协程

Python 协程属于 可等待 对象,因此可以在其他协程中被等待:

mport asyncio


async def nested():
    return 42


async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".


asyncio.run(main())

[!TIP]

在本文档中 "协程" 可用来表示两个紧密关联的概念:

  • 协程函数: 定义形式为 async def 的函数
  • 协程对象: 调用 协程函数 所返回的对象

任务

任务 被用来“并行的”调度协程

当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

在 asyncio 中需要 Future 对象,以便允许通过 async/await 使用基于回调的代码

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一个很好的返回对象的低层级函数的示例是 loop.run_in_executor()

运行asyncio程序

asyncio.run(coro, *, debug=False)

执行 coroutine coro 并返回结果。

此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池

当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用

如果 debugTrue,事件循环将以调试模式运行

此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

3.7 新版功能.

在 3.9 版更改: 更新为使用 loop.shutdown_default_executor()

[!TIP]

asyncio.run() 的源代码可以在 Lib/asyncio/runners.py 中找到。

创建任务

asyncio.create_task(coro, *, name=None)

coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象

name 不为 None,它将使用 Task.set_name() 来设为任务的名称

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

[!TIP]

保存一个指向此函数的结果的引用,以避免任务在执行期间消失

3.7 新版功能.

在 3.8 版更改: 添加了 name 形参

休眠

coroutine asyncio.sleep(delay, result=None, , *loop=None

阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者

sleep() 总是会挂起当前任务,以允许其他任务运行

将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行。 这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环

[!TIP]

Deprecated since version 3.8, will be removed in version 3.10: loop 形参

以下协程示例运行 5 秒,每秒显示一次当前日期

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

并发运行任务

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

并发 运行 aws 序列中的 可等待对象

如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致

如果 return_exceptionsFalse (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行

如果 return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消

如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 -- 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消

[!TIP]

Deprecated since version 3.8, will be removed in version 3.10: loop 形参

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f


async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

[!TIP]

如果 return_exceptions 为 False,则在 gather() 被标记为已完成后取消它将不会取消任何已提交的可等待对象。 例如,在将一个异常传播给调用者之后,gather 可被标记为已完成,因此,在从 gather 捕获一个(由可等待对象所引发的)异常之后调用 gather.cancel() 将不会取消任何其他可等待对象

在 3.7 版更改: 如果 gather 本身被取消,则无论 return_exceptions 取值为何,消息都会被传播

屏蔽取消操作

awaitable asyncio.shield(aw, *, loop=None)

保护一个 可等待对象 防止其被 取消

如果 aw 是一个协程,它将自动被作为任务调度

以下语句:

res = await shield(something())

相当于:

res = await something()

不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发CancelledError

如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消

如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段,如下所示

try:
    res = await shield(something())
except CancelledError:
    res = None

[!TIP]

Deprecated since version 3.8, will be removed in version 3.10: loop 形参。

超时

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

等待 aw 可等待对象 完成,指定 timeout 秒数后超时

如果 aw 是一个协程,它将自动被作为任务调度

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeoutNone,则等待直到完成

如果发生超时,任务将取消并引发 asyncio.TimeoutError

要避免任务 取消,可以加上 shield()

此函数将等待直到 Future 确实被取消,所以总等待时间可能超过 timeout。 如果在取消期间发生了异常,异常将会被传播

如果等待被取消,则 aw 指定的对象也会被取消

[!TIP]

Deprecated since version 3.8, will be removed in version 3.10: loop 形参。

示例:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

[!TIP]

在 3.7 版更改:aw 因超时被取消,wait_for 会等待 aw 被取消。之前版本则将立即引发 asyncio.TimeoutError

简单等待

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发地运行 aws 可迭代对象中的 可等待对象 并进入阻塞状态直到满足 return_when 所指定的条件

aws 可迭代对象必须不为空

返回两个 Task/Future 集合: (done, pending)

done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常量 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

wait_for() 不同,wait() 在超时发生时不会取消可等待对象

3.8 版后已移除: 如果 aws 中的某个可等待对象为协程,它将自动被作为任务调度。直接向 wait() 传入协程对象已弃用,因为这会导致 令人迷惑的行为

[!TIP]

Deprecated since version 3.8, will be removed in version 3.10: loop 形参

wait() 会自动以任务的形式调度协程,之后将以 (done, pending) 集合形式返回显式创建的任务对象。因此以下代码并不会有预期的行为

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!
async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

[!TIP]

Deprecated since version 3.8, will be removed in version 3.11: 直接向 wait() 传入协程对象的方式已弃用

asyncio.as_completed(aws, *, loop=None, timeout=None)

并发地运行 aws 可迭代对象中的 可等待对象。 返回一个协程的迭代器。 所返回的每个协程可被等待以从剩余的可等待对象的可迭代对象中获得最早的下一个结果

如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError

[!TIP]

Deprecated since version 3.8, will be removed in version 3.10: loop 形参

示例:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

在线程中运行

coroutine asyncio.to_thread(func, /, *args, **kwargs)

在不同的线程中异步地运行函数 func

向此函数提供的任何 *args**kwargs 会被直接传给 func。 并且,当前 contextvars.Context 会被传播,允许在不同的线程中访问来自事件循环的上下文变量

返回一个可被等待以获取 func 的最终结果的协程

这个协程函数主要是用于执行在其他情况下会阻塞事件循环的 IO 密集型函数/方法。 例如

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 15:59:22
# start blocking_io at 15:59:22
# blocking_io complete at 15:59:23
# finished main at 15:59:23

[!IMPORTANT]

在任何协程中直接调用 blocking_io() 将会在调用期间阻塞事件循环,导致额外的 1 秒运行时间。 而通过改用 asyncio.to_thread(),我们可以在不同的线程中运行它从而不会阻塞事件循环

由于 GIL 的存在,asyncio.to_thread() 通常只能被用来将 IO 密集型函数变为非阻塞的。 但是,对于会释放 GIL 的扩展模块或无此限制的替代性 Python 实现来说,asyncio.to_thread() 也可被用于 CPU 密集型函数

跨线程调度

asyncio.run_coroutine_threadsafe(coro, loop)

向指定事件循环提交一个协程。(线程安全)

返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

此函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在协程内产生了异常,将会通知返回的 Future 对象。它也可被用来取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

参见concurrency and multithreading 部分的文档。

不同于其他 asyncio 函数,此函数要求显式地传入 loop 参数。

3.5.1 新版功能.

内省

asyncio.current_task(loop=None)

返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None

如果 loopNone 则会使用 get_running_loop() 获取当前事件循环

asyncio.all_tasks(loop=None)

返回事件循环所运行的未完成的 Task 对象的集合

如果 loopNone,则会使用 get_running_loop() 获取当前事件循环。

Copyright © aaron 2023 all right reserved,powered by Gitbook该文章修订时间: 2024-09-12 16:22:41

results matching ""

    No results matching ""