【Python 协程系列】Python 原生协程

参考资料

前言

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PEP 255 -- Simple Generators
Python-Version: 2.2

PEP 342 -- Coroutines via Enhanced Generators
Python-Version: 2.5

PEP 380 -- Syntax for Delegating to a Subgenerator
Python-Version: 3.3

PEP 492 -- Coroutines with async and await syntax
Python-Version: 3.5

PEP 525 -- Asynchronous Generators
Python-Version: 3.6

emm…上面算是 Python 中 Coroutines 的发展史了,具体内容可以参考 PEP。

名词解释

Generator

1
2
3
4
5
6
7
8
9
10
11
def _g():
yield 1
GeneratorType = type(_g())

Generator functions
A function or method which uses the yield statement.
不能 yield from Native corotine
When called always returns an iterator object.

iterator
通过 iterator.__next__() 方法,执行 body 直到 yield

Native coroutine

1
2
3
4
5
6
7
8
9
10
11
async def _c():
pass
CoroutineType = type(_c())

Coroutine functions
A function or method which is defined using async def.
When called always returns an coroutine object.

Coroutine Objects
是 awaitable,可以调用 __await__()
不支持 __iter__ 、__next__ 、迭代

Asynchronous generator、

1
2
3
4
5
6
7
async def _ag():
yield
AsyncGeneratorType = type(ag())

Asynchronous generator functions
同时使用 async def + yield
When called always returns an asynchronous iterator object.

Asynchronous iterator

1
2
3
4
5
6
7
Asynchronous iterable
异步可迭代对象,定义了 __aiter__ 方法,可以用作 asycn for
返回一个异步迭代器

Asynchronous iterator
异步迭代器,定义了 __anext__ 方法,返回一个 awaitable
类似的 class 同时定义 aiter + anext,既是异步可迭代,又是异步迭代器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 定义对象
class AsyncIterable:
# 实现 __aiter__、__anext__方法
# 或者 C_API:tp_as_async.am_aiter/am_anext
# 停止迭代 must raise StopAsyncIteration

def __aiter__(self):
return self

async def __anext__(self):
data = await self.fetch_data()
if data:
return data
else:
raise StopAsyncIteration

async def fetch_data(self):
...

# 迭代,并且支持 async for-else 逻辑
async for TARGET in ITER:
BLOCK
else:
BLOCK2

# 执行逻辑
iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
try:
TARGET = await type(iter).__anext__(iter)
except StopAsyncIteration:
running = False
else:
BLOCK
else:
BLOCK2

Generator-based coroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
@types.coroutine
def gen():
yield 1

Generator-based coroutine function
被 @types.coroutine 装饰的 generaotr func
可以使用 yield from Native corotine
执行了 func.__code__.co_flags |= CO_ITERABLE_COROUTINE
When called 返回一个 Generator-based coroutine


Generator-based coroutine
Returned from a generator-based coroutine function.

Coroutine

1
2
3
4
Coroutine object
Co-routine 协作式例程,简称 协程
Either native coroutine object or generator-based coroutine object.
必须使用 an event loop or a scheduler to run coroutines.

Future-like object

1
2
3
4
5
6
7
Future-like object
定义了 __await__ 方法,或 tp_as_async->am_await 方法
returning an iterator.

可以在 coroutine 中被 await 消费
执行 await Future-like-obj 会一直挂起,直到 __await__ 方法完成
and returns the result.

Awaitable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Awaitable
def isawaitable(object):
""" 改动自 inspect.isawaitable() """

# Native coroutine
a = isinstance(object, types.CoroutineType)

# Generator-based coroutine
b = isinstance(object, types.GeneratorType) and
bool(object.gi_code.co_flags & CO_ITERABLE_COROUTINE)

# Future-like object
c = isinstance(object, collections.abc.Awaitable)

return a or b or c

Asynchronous context manager

异步上下文管理求,定义了 __aenter__ and __aexit__ 方法,可以用作 async with。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 定义管理器
class AsyncContextManager:
async def __aenter__(self):
await log('entering context')

async def __aexit__(self, exc_type, exc, tb):
await log('exiting context')
# __aenter__ and __aexit__. Both must return an awaitable.

# 实现
async with EXPR as VAR:
BLOCK

# 执行逻辑
mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)

VAR = await aenter
try:
BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)

# It is a SyntaxError to use async with outside of an async def function.

# demo
async def commit(session, data):
...

async with session.transaction():
...
await session.update(data)
...

async with lock:
...

生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def gen_func():
result1 = yield 'yield-1'
print(f'result 1: {result1}')
result2 = yield 'yield-2'
print(f'result 2: {result2}')

gen = gen_func()

import inspect; assert inspect.isgeneratorfunction(gen_func)
import types; assert isinstance(gen, types.GeneratorType)

print(gen.send(None))
print(gen.send('send-1'))
try:
gen.throw(GeneratorExit)
except (StopIteration, GeneratorExit):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")

先看看数据结构吧。

PyGen_Type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# genobject.c.737
static PyMethodDef gen_methods[] = {
{"send",(PyCFunction)_PyGen_Send, METH_O, send_doc},
{"throw",(PyCFunction)gen_throw, METH_VARARGS, throw_doc},
{"close",(PyCFunction)gen_close, METH_NOARGS, close_doc},
{NULL, NULL} /* Sentinel */
};

PyTypeObject PyGen_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"generator", /* tp_name */
sizeof(PyGenObject), /* tp_basicsize */

0, /* tp_as_async */
PyObject_SelfIter, /* tp_iter */
(iternextfunc)gen_iternext, /* tp_iternext */
gen_methods, /* tp_methods */
};

PyGenObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* _PyGenObject_HEAD defines the initial segment of generator
and coroutine objects. */
#define _PyGenObject_HEAD(prefix) \
PyObject_HEAD \
/* Note: gi_frame can be NULL if the generator is "finished" */ \
struct _frame *prefix##_frame; \
/* True if generator is being executed. */ \
char prefix##_running; \
/* The code object backing the generator */ \
PyObject *prefix##_code; \
/* List of weak reference. */ \
PyObject *prefix##_weakreflist; \
/* Name of the generator. */ \
PyObject *prefix##_name; \
/* Qualified name of the generator. */ \
PyObject *prefix##_qualname;

typedef struct {
/* The gi_ prefix is intended to remind of generator-iterator. */
_PyGenObject_HEAD(gi)
} PyGenObject;

PyGen_New

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// genobject.c.833
PyObject * PyGen_New(PyFrameObject *f) {
return gen_new_with_qualname(&PyGen_Type, f, NULL, NULL);
}

static PyObject *
gen_new_with_qualname(PyTypeObject *type, PyFrameObject *f,
PyObject *name, PyObject *qualname)
{
PyGenObject *gen = PyObject_GC_New(PyGenObject, type);
gen->gi_frame = f;
f->f_gen = (PyObject *) gen; // 记录在案!
gen->gi_code = (PyObject *)(f->f_code);
gen->gi_running = 0; // 未执行
gen->gi_weakreflist = NULL;
if (name != NULL)
gen->gi_name = name;
else
gen->gi_name = ((PyCodeObject *)gen->gi_code)->co_name;
if (qualname != NULL)
gen->gi_qualname = qualname;
else
gen->gi_qualname = gen->gi_name;
_PyObject_GC_TRACK(gen); // 挂载到 GC 链
return (PyObject *)gen;
}

简单的把 Frame->code_obj,封装成 GenObj,同时关联到 Frame->f_gen 上。

这里必须复习下 PyFrameObject:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct _frame {
PyObject_VAR_HEAD
struct _frame *f_back; /* previous frame, or NULL */
PyCodeObject *f_code; /* PyCodeObject 对象 */

PyObject *f_builtins; /* builtin symbol table (PyDictObject) */
PyObject *f_globals; /* global symbol table (PyDictObject) */
PyObject *f_locals; /* local symbol table (any mapping) */

PyObject **f_valuestack; /* 运行时栈 栈底 */
PyObject **f_stacktop; /* 运行时栈 栈顶 */

PyObject *f_trace; /* Trace function */

/* 用于 generator 交换错误信息 */
PyObject *f_exc_type, *f_exc_value, *f_exc_traceback;
/* Borrowed reference to a generator, or NULL */
PyObject *f_gen;

int f_lasti; /* 上一条字节码指令在 f_code 中的偏移位置 */
int f_lineno; /* 当前字节码,对应源代码行号
通过 PyFrame_GetLineNumber() 调用*/
int f_iblock; /* index in f_blockstack */
char f_executing; /* whether the frame is still executing */
PyTryBlock f_blockstack[CO_MAXBLOCKS]; /* block 堆栈 */
PyObject *f_localsplus[1]; /* locals+stack, dynamically sized */
} PyFrameObject;

PyGen_Send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
PyObject * _PyGen_Send(PyGenObject *gen, PyObject *arg) {
return gen_send_ex(gen, arg, 0, 0);
}

static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc, int closing)
{
PyThreadState *tstate = PyThreadState_GET();
PyFrameObject *f = gen->gi_frame;
PyObject *result;
/* `gen` is either:
* a generator with CO_FUTURE_GENERATOR_STOP flag;
* a coroutine;
* a generator with CO_ITERABLE_COROUTINE flag
(decorated with types.coroutine decorator);
* an async generator.
*/
if (gen->gi_running) {
// 有意思,Gen / Coro / AsyncGen 都走这里
char *msg = "generator already executing";
if (PyCoro_CheckExact(gen)) {
msg = "coroutine already executing";
}
else if (PyAsyncGen_CheckExact(gen)) {
msg = "async generator already executing";
}
PyErr_SetString(PyExc_ValueError, msg);
return NULL;
}
if (f == NULL || f->f_stacktop == NULL) {
if (PyCoro_CheckExact(gen) && !closing) {
/* `gen` is an exhausted coroutine: raise an error,
except when called from gen_close(), which should
always be a silent method. */
PyErr_SetString(
PyExc_RuntimeError,
"cannot reuse already awaited coroutine");
}
else if (arg && !exc) {
/* `gen` is an exhausted generator:
only set exception if called from send(). */
if (PyAsyncGen_CheckExact(gen)) {
PyErr_SetNone(PyExc_StopAsyncIteration);
}
else {
PyErr_SetNone(PyExc_StopIteration);
}
}
return NULL;
}

if (f->f_lasti == -1) {
/* 上一条字节码指令在 f_code 中的偏移位置,即 gen 未曾执行 */
if (arg && arg != Py_None) {
// 第一次只能 send(None)
char *msg = "can't send non-None value to a "
"just-started generator";
if (PyCoro_CheckExact(gen)) {
msg = NON_INIT_CORO_MSG;
}
else if (PyAsyncGen_CheckExact(gen)) {
msg = "can't send non-None value to a "
"just-started async generator";
}
PyErr_SetString(PyExc_TypeError, msg);
return NULL;
}
} else {
/* Push arg onto the frame's value stack */
result = arg ? arg : Py_None;
Py_INCREF(result);
*(f->f_stacktop++) = result;
}

/* Generators always return to their most recent caller, not
* necessarily their creator. */
Py_XINCREF(tstate->frame);
assert(f->f_back == NULL);
f->f_back = tstate->frame; // 构建 frame 链,准备执行函数调用

/* 更改状态,在新的 Frame 下执行 */
gen->gi_running = 1;
result = PyEval_EvalFrameEx(f, exc); // 执行 gen 代码
gen->gi_running = 0;

/* Don't keep the reference to f_back any longer than necessary. It
* may keep a chain of frames alive or it could create a reference
* cycle. */
assert(f->f_back == tstate->frame);
Py_CLEAR(f->f_back);

if (result && f->f_stacktop == NULL) {
/* Case 1. 处理 return 而不是 yield */
if (result == Py_None) {
/* Delay exception instantiation if we can */
if (PyAsyncGen_CheckExact(gen)) {
PyErr_SetNone(PyExc_StopAsyncIteration);
}
else {
PyErr_SetNone(PyExc_StopIteration);
}
}
else {
/* Async generators 只能 return None */
assert(!PyAsyncGen_CheckExact(gen));
_PyGen_SetStopIterationValue(result);
}
Py_CLEAR(result);
}
else if (!result && PyErr_ExceptionMatches(PyExc_StopIteration)) {
...
/* Case 2. 处理 raise StopIteration
Check 是否 from __future__ import generator_stop
PEP479:
在 Gen 中 raise StopIteration 会被转换为 RuntimeError
Py3.5 利用 generator_stop 实现
Py3.6 会发出警告
Py3.7 会默认生效
*/
}
else if (PyAsyncGen_CheckExact(gen) && !result &&
PyErr_ExceptionMatches(PyExc_StopAsyncIteration)) {
/* Case 3. 处理 AsyncGen 中 raise StopAsyncIteration */
const char *msg = "async generator raised StopAsyncIteration";
_PyErr_FormatFromCause(PyExc_RuntimeError, "%s", msg);
}

if (!result || f->f_stacktop == NULL) {
/* 生成器结束,销毁 freame */
PyObject *t, *v, *tb;
t = f->f_exc_type;
v = f->f_exc_value;
tb = f->f_exc_traceback;
f->f_exc_type = NULL;
f->f_exc_value = NULL;
f->f_exc_traceback = NULL;
Py_XDECREF(t);
Py_XDECREF(v);
Py_XDECREF(tb);
gen->gi_frame->f_gen = NULL;
gen->gi_frame = NULL;
Py_DECREF(f);
}

return result;
}

CodeObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# t.py
def gen():
yield 1

>>> co = compile(open('t.py').read(),'t.py','exec'); import dis; dis.dis(co)
1 0 LOAD_CONST 0 (<code object gen at 0x0000028821955270, file "t.py", line 1>)
2 LOAD_CONST 1 ('gen')
4 MAKE_FUNCTION 0
6 STORE_NAME 0 (gen)
8 LOAD_CONST 2 (None)
10 RETURN_VALUE

>>> gen = co.co_consts[0]; dis.dis(gen)
2 0 LOAD_CONST 1 (1)
2 YIELD_VALUE
4 POP_TOP
6 LOAD_CONST 0 (None)
8 RETURN_VALUE

.. opcode:: YIELD_VALUE
Pops TOS and yields it from a :term:`generator`.

看看YIELD_VALUE会干些什么。

YIELD_VALUE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
PyThreadState *tstate = PyThreadState_GET();
return tstate->interp->eval_frame(f, throwflag);
}

PyObject *
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
/* 通过 f->f_lasti 记录了字节码执行位置
通过 f->f_stacktop 记录栈帧位置
实现,生成器的 send/yield
*/
next_instr = first_instr; // 指向 co->co_code
if (f->f_lasti >= 0) { // f->f_lasti 记录了上一条指令偏移量
next_instr += f->f_lasti / sizeof(_Py_CODEUNIT) + 1;
}
stack_pointer = f->f_stacktop;
f->f_stacktop = NULL; /* remains NULL unless yield suspends frame */
f->f_executing = 1;

if (co->co_flags & (CO_GENERATOR | CO_COROUTINE | CO_ASYNC_GENERATOR)) {
// 生成器在 except 中
if (!throwflag && f->f_exc_type != NULL && f->f_exc_type != Py_None) {
// 从 caller 中,获取 exc 信息
swap_exc_state(tstate, f);
}
else
save_exc_state(tstate, f);
}
for (;;) {
switch (opcode) {
TARGET(YIELD_VALUE) {
retval = POP();

if (co->co_flags & CO_ASYNC_GENERATOR) {
PyObject *w = _PyAsyncGenValueWrapperNew(retval);
retval = w;
}

f->f_stacktop = stack_pointer;
why = WHY_YIELD;
goto fast_yield;
}
}
fast_yield:
if (co->co_flags & (CO_GENERATOR | CO_COROUTINE | CO_ASYNC_GENERATOR)) {
/* 遍历所有 block, 查找是否在 except 中 */
int i;
for (i = 0; i < f->f_iblock; i++) {
if (f->f_blockstack[i].b_type == EXCEPT_HANDLER) {
break;
}
}
if (i == f->f_iblock)
// generator 不保存 caller 的 exc 信息
restore_and_clear_exc_state(tstate, f);
else
swap_exc_state(tstate, f);
}
if (tstate->use_tracing) { ... }

/* pop frame */
exit_eval_frame:
if (PyDTrace_FUNCTION_RETURN_ENABLED())
dtrace_function_return(f);
Py_LeaveRecursiveCall();
f->f_executing = 0;
tstate->frame = f->f_back;

return _Py_CheckFunctionResult(NULL, retval, "PyEval_EvalFrameEx");
}

小结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
PyGen_New(FrameObject f)
gen->gi_frame = f;
f->f_gen = (PyObject *) gen; // 记录在案!
gen->gi_code = (PyObject *)(f->f_code);
创建 Generator 时:
会将当前 Frame 作为参数传入
并且将 Frame 固定在新创建的 gen 上

PyGen_Send(PyGenObject *gen, PyObject *arg)
PyFrameObject *f = gen->gi_frame;
result = PyEval_EvalFrameEx(f, exc);
PyEval_EvalFrameEx -> PyEval_EvalFrameDefault:
if (f->f_lasti >= 0) {
next_instr += f->f_lasti / sizeof(_Py_CODEUNIT) + 1;
}
yield:
tstate->frame = f->f_back;
retrun result
if (result && f->f_stacktop == NULL) { 释放资源 }
return result
执行 gen.send() :
会提取 gen->gi_frame,调用 EvalFrame 执行字节码
通过 f->f_lasti,记录当前执行的位置
遇到 yield,会将线程对应的 Frame 切换到 gen 的 caller
if gen 消耗完,会在 PyGen_Send 中释放 gen->frame, gen自身
从 PyGen_Send 中 return result 到 caller

Native 协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import collections, inspect

async def func():
print(1)

async def native_co_func():
print(0)
await func()

native_co = native_co_func()
assert isinstance(native_co, collections.Coroutine)
assert inspect.isawaitable(native_co)

try:
native_co.send(None)
except StopIteration:
pass

Native Coroutines(原生协程),源自 PEP 492,从 Py3.5 开始引入。不惜增加关键字async awite来打造属于 Python 协程。

老样子,先看数据结构。

PyCoro_Type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# genobject.c.983
static PyMethodDef coro_methods[] = {
/* 注意跟 gen_methods[] 对比,目标函数都是同一个 */
{"send",(PyCFunction)_PyGen_Send, METH_O, coro_send_doc},
{"throw",(PyCFunction)gen_throw, METH_VARARGS, coro_throw_doc},
{"close",(PyCFunction)gen_close, METH_NOARGS, coro_close_doc},
{NULL, NULL} /* Sentinel */
};

static PyAsyncMethods coro_as_async = {
(unaryfunc)coro_await, /* am_await */
0, /* am_aiter */
0 /* am_anext */
};

PyTypeObject PyCoro_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"coroutine", /* tp_name */
sizeof(PyCoroObject), /* tp_basicsize */
# 生成器,并未定义 tp_as_async 方法
&coro_as_async, /* tp_as_async */
# 原生协程,不支持迭代器协议
0, /* tp_iter */
0, /* tp_iternext */

coro_methods, /* tp_methods */
};

从定义可以看出,协程和生成器一样,都有同样的send/throw/close方法,并且协程支持await方法。

PyCoroObject

1
2
3
4
5
6
7
/* _PyGenObject_HEAD defines the initial segment of generator
and coroutine objects. */

# genobject.h.51
typedef struct {
_PyGenObject_HEAD(cr)
} PyCoroObject;

从上可以看出,协程和生成器,在C层面结构体定义是一模一样。或者说,在 Python 中生成器是协程的基础。

Chain coroutines

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

调用链

小结

Python在编译时,通过对 fun.__code__.co_flags 字段,进行区分:

  • CO_GENERATOR 用于 def + yield

    • inspect.CO_GENERATOR == 32
    • isinstance(gen(), types.GeneratorType)
  • CO_COROUTINE 用于 async def

    • inspect.CO_COROUTINE == 128
    • isinstance(coro(), types.CoroutineType)
  • CO_ITERABLE_COROUTINE 用于 @types.coroutine+def+yield

    • inspect.CO_ITERABLE_COROUTINE == 256
    • isinstance(co_gen(), types.GeneratorType)
    • bool(co_gen.__code__.co_flags & 256) == True
  • CO_ASYNC_GENERATOR 用于 async def + yield [+ await]

    • inspect.CO_ASYNC_GENERATOR == 512
    • isinstance(async_gen(), types.AsyncGeneratorType)
    • bool(async_gen.__code__.co_flags & 512) == True
  1. 原生协程没有实现 __iter__ and __next__methods,因此
    1. 不能用于 iter(), list(), tuple() and other built-ins.
    2. 不能用于 for..in 迭代
  2. 普通生成器不能 yield from 原生协程,否则 TypeError.
  3. 生成器协程 (即:@types.coroutine) 可以使用 yield from 原生协程对象

异步生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)

async def run():
async for i in ticker(1, 10):
print(i)

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
finally:
loop.close()

Asynchronous Generators(异步生成器),源自PEP 525,从 Py3.6 开始引入:

  • 简单的说,就是 async + yield
  • 支持异步迭代协议:aiter、anext,可以用作 async for
  • 跟协程类似,也需要 event loop 来执行
  • 性能上,同常规生成器类似,比异步迭代器快

PyAsyncGen_Type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# genobject.h.1420
static PyMethodDef async_gen_methods[] = {
{"asend", (PyCFunction)async_gen_asend, METH_O, async_asend_doc},
{"athrow",(PyCFunction)async_gen_athrow, METH_VARARGS, async_athrow_doc},
{"aclose", (PyCFunction)async_gen_aclose, METH_NOARGS, async_aclose_doc},
{NULL, NULL} /* Sentinel */
};

static PyAsyncMethods async_gen_as_async = {
0, /* am_await */
# 支持 异步迭代协议
PyObject_SelfIter, /* am_aiter */
(unaryfunc)async_gen_anext /* am_anext */
};

PyTypeObject PyAsyncGen_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"async_generator", /* tp_name */
&async_gen_as_async, /* tp_as_async */

# 异步生成器,本身不支持 普通迭代协议
0, /* tp_iter */
0, /* tp_iternext */
async_gen_methods, /* tp_methods */
_PyGen_Finalize, /* tp_finalize */
};

属性

1
2
3
4
5
6
7
8
9
10
11
12
async def genfunc():
yield 1
yield 2

agen = genfunc()

assert agen.__aiter__() is gen

assert await agen.__anext__() == 1
assert await agen.__anext__() == 2

await agen.__anext__() # This line will raise StopAsyncIteration.
1
2
3
4
5
6
7
8
9
10
11
12
13
# agen.__aiter__(): Returns agen.
# agen.__anext__(): Returns an awaitable,

# agen.asend(val): Returns an awaitable,
# agen.athrow(typ, [val, [tb]]): Returns an awaitable,
# agen.aclose(): Returns an awaitable,

# agen.__name__ and agen.__qualname__: 可读可写

# agen.ag_await: 正等待的对象(None). 类似当前可用的 gi_yieldfrom for generators and cr_await for coroutines.
# agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一样

# StopIteration and StopAsyncIteration 被替换为 RuntimeError,并且不上抛。

终止

当 异步生成器,若在 async with 语句块内使用 yield ,生成器被gc时,会导致占用的资源无法释放。

为此:

  1. 引入了 aclose 方法,类似于普通生成器的 close(),只是需要 event loop 来执行 aclose()。
  2. try finally 语句块下,不能使用 yield,否则 Raise a RuntimeError。注意 async with 等效于 try。
  3. sys module添加两个方法:set_asyncgen_hooks() 、 get_asyncgen_hooks()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # asyncio/base_events.py
    class BaseEventLoop:
    def run_forever(self):
    ...
    old_hooks = sys.get_asyncgen_hooks()
    # returns a namedtuple-like structure with firstiter and finalizer fields.

    sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
    # set_asyncgen_hooks is thread-specific
    try:
    ...
    finally:
    sys.set_asyncgen_hooks(*old_hooks)
    ...

    def _finalize_asyncgen(self, gen):
    self.create_task(gen.aclose())

sys.set_asyncgen_hooks() accepts two arguments:

  • firstiter: a callable which will be called when an asynchronous generator is iterated for the first time.
  • finalizer: a callable which will be called when an asynchronous generator is about to be GCed.

过程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def foo():
return 1
def bar():
print(foo())

import dis
dis.dis(bar)

4 0 LOAD_GLOBAL 0 (print)
2 LOAD_GLOBAL 1 (foo)
4 CALL_FUNCTION 0
6 CALL_FUNCTION 1
8 POP_TOP
10 LOAD_CONST 0 (None)
12 RETURN_VALUE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def foo():
return 1
async def bar():
print(await foo())

import dis
dis.dis(bar)

4 0 LOAD_GLOBAL 0 (print)
2 LOAD_GLOBAL 1 (foo)
4 CALL_FUNCTION 0
6 GET_AWAITABLE
8 LOAD_CONST 0 (None)
10 YIELD_FROM
12 CALL_FUNCTION 1
14 POP_TOP
16 LOAD_CONST 0 (None)
18 RETURN_VALUE

对比两个版本,发现多出:GET_AWAITABLE、YIELD_FROM

  • CALL_FUNCTION:执行 foo(),获取到 coroutine object,并入栈
  • GET_AWAITABLE:从栈顶弹出一个对象,获取到 awaitable 对象并入栈
  • YIELD_FROM:获取栈顶元素 v=POP(),调用 _PyGen_Send(…, v),并返回结果

GET_AWAITABLE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ceval.c.2040
TARGET(GET_AWAITABLE) {
PyObject *iterable = TOP();
PyObject *iter = _PyCoro_GetAwaitableIter(iterable);

if (iter != NULL && PyCoro_CheckExact(iter)) {
PyObject *yf = _PyGen_yf((PyGenObject*)iter);
/* `iter` is a coroutine object that is being awaited,
`yf` is a pointer to the current awaitable being awaited on.

asert yf == NULL
raise RuntimeError("coroutine is being awaited already")
*/
}
SET_TOP(iter); /* Even if it's NULL */

if (iter == NULL) { goto error; }

PREDICT(LOAD_CONST);
DISPATCH();
}

调用 GetAwaitableIter,传入一个 iterable,得到一个 awaitable对象,否则 会报错。

_PyCoro_GetAwaitableIter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# genobject.c.876
/*
* 核心逻辑:returns an awaitable for `o`:
* - `o` if `o` is a coroutine-object;
* - `type(o)->tp_as_async->am_await(o)`
*
* Raises a TypeError if it's not possible to return
* an awaitable and returns NULL.
*/
PyObject * _PyCoro_GetAwaitableIter(PyObject *o)
{
unaryfunc getter = NULL;
PyTypeObject *ot;

if (PyCoro_CheckExact(o) || gen_is_coroutine(o)) {
return o; /* 'o' is a coroutine. */
}

ot = Py_TYPE(o);
if (ot->tp_as_async != NULL) {
getter = ot->tp_as_async->am_await;
}
if (getter != NULL) {
PyObject *res = (*getter)(o);
return res;
}
return NULL;
}

# genobject.c.933
static PyObject * coro_await(PyCoroObject *coro)
{
PyCoroWrapper *cw = PyObject_GC_New(PyCoroWrapper, &_PyCoroWrapper_Type);
cw->cw_coroutine = coro;
return (PyObject *)cw;
}

如上,coroutine 是天然的 awaitable对象。或者调用 type(o)->tp_as_async->am_await(o) 得到一个 awaitable对象。

实现了 tp_as_async 方法的有:coroutine、aiter_wrapper、async_generator_asend、async_generator_athrow。

PyCoroWrapper_Type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# genobject.c.1088
static PyMethodDef coro_wrapper_methods[] = {
{"send",(PyCFunction)coro_wrapper_send, METH_O, coro_send_doc},
{"throw",(PyCFunction)coro_wrapper_throw, METH_VARARGS, coro_throw_doc},
{"close",(PyCFunction)coro_wrapper_close, METH_NOARGS, coro_close_doc},
{NULL, NULL} /* Sentinel */
};

# genobject.c.859
typedef struct {
PyObject_HEAD
PyCoroObject *cw_coroutine;
} PyCoroWrapper;

# genobject.c.1095
PyTypeObject _PyCoroWrapper_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"coroutine_wrapper",
sizeof(PyCoroWrapper), /* tp_basicsize */

# 实现 迭代器协议
PyObject_SelfIter, /* tp_iter */
(iternextfunc)coro_wrapper_iternext, /* tp_iternext */

# 实现 协程的method
coro_wrapper_methods, /* tp_methods */
};

YIELD_FROM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# ceval.c.2076
TARGET(YIELD_FROM) {
PyObject *v = POP();
PyObject *receiver = TOP();
int err;
if (PyGen_CheckExact(receiver) || PyCoro_CheckExact(receiver)) {
// 如果是 generaotr / corotine,调用 Send()
retval = _PyGen_Send((PyGenObject *)receiver, v);
} else {
// 其他 Awaitable 对象,迭代下一个元素
_Py_IDENTIFIER(send);
if (v == Py_None)
retval = Py_TYPE(receiver)->tp_iternext(receiver);
else
retval = _PyObject_CallMethodIdObjArgs(receiver, &PyId_send, v, NULL);
}
Py_DECREF(v);
if (retval == NULL) {
// 进入销毁流程
PyObject *val;
if (tstate->c_tracefunc != NULL
&& PyErr_ExceptionMatches(PyExc_StopIteration))
call_exc_trace(tstate->c_tracefunc, tstate->c_traceobj, tstate, f);
err = _PyGen_FetchStopIterationValue(&val);
if (err < 0)
goto error;
Py_DECREF(receiver);
SET_TOP(val);
DISPATCH();
}
/* receiver remains on stack, retval is value to be yielded */
f->f_stacktop = stack_pointer;
why = WHY_YIELD;
/* and repeat... */
assert(f->f_lasti >= (int)sizeof(_Py_CODEUNIT));
f->f_lasti -= sizeof(_Py_CODEUNIT);
goto fast_yield;
}