【Hard Python】【第二章-异步IO】3、async/await的源码实现

说完了asyncio事件循环是如何运行异步任务的,接下来back to basic,我们一起看看asyncawait两个原语具体代表了什么含义。

首先是asyncasync通常用来修饰一个函数,表示这个函数会返回一个协程。比如说:

1
2
3
4
5
6
7
async def _coro_maker(i):
print(i + 1)


def test_async():
c = _coro_maker(1)
asyncio.run(c)

_coro_maker进行反编译,得到这样的结果:

1
2
3
4
5
6
7
8
9
10
11
Disassembly of _coro_maker:
0 GEN_START 1

7 2 LOAD_GLOBAL 0 (print)
4 LOAD_FAST 0 (i)
6 LOAD_CONST 1 (1)
8 BINARY_ADD
10 CALL_FUNCTION 1
12 POP_TOP
14 LOAD_CONST 0 (None)
16 RETURN_VALUE

可以看到,函数体内反编译的结果和普通def函数是一致的,唯一的不同是最开始加了GEN_START字节码。首先看GEN_START的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
case TARGET(GEN_START): {
PyObject *none = POP();
Py_DECREF(none);
if (!Py_IsNone(none)) {
if (oparg > 2) {
_PyErr_SetString(tstate, PyExc_SystemError,
"Illegal kind for GEN_START");
}
else {
static const char *gen_kind[3] = {
"generator",
"coroutine",
"async generator"
};
_PyErr_Format(tstate, PyExc_TypeError,
"can't send non-None value to a "
"just-started %s",
gen_kind[oparg]);
}
goto error;
}
DISPATCH();
}

GEN_START对于生成器、协程、async生成器都适用。对于async def函数其真实含义是调用send(None)以触发协程的开始。由于async def生成的协程没有yield值,因此会报StopIteration异常并给出协程的返回值。

总的来说,协程是一种特殊的generator,具备被await的效果。

那么接下来,该说await了。我们看一组代码示例:

1
2
3
4
5
6
7
8
9
10
11
async def _coro_maker(i):
print(i + 1)


async def _test_await():
c = _coro_maker(1)
await c


def test_await():
asyncio.run(_test_await())


反编译_test_await的结果,得到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Disassembly of _test_await:
0 GEN_START 1

27 2 LOAD_GLOBAL 0 (_coro_maker)
4 LOAD_CONST 1 (1)
6 CALL_FUNCTION 1
8 STORE_FAST 0 (c)

28 10 LOAD_FAST 0 (c)
12 GET_AWAITABLE
14 LOAD_CONST 0 (None)
16 YIELD_FROM
18 POP_TOP
20 LOAD_CONST 0 (None)
22 RETURN_VALUE

首先来看GET_AWAITABLE,其代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
case TARGET(GET_AWAITABLE): {
PREDICTED(GET_AWAITABLE);
PyObject *iterable = TOP();
PyObject *iter = _PyCoro_GetAwaitableIter(iterable);

if (iter == NULL) {
int opcode_at_minus_3 = 0;
if ((next_instr - first_instr) > 2) {
opcode_at_minus_3 = _Py_OPCODE(next_instr[-3]);
}
format_awaitable_error(tstate, Py_TYPE(iterable),
opcode_at_minus_3,
_Py_OPCODE(next_instr[-2]));
}

Py_DECREF(iterable);

if (iter != NULL && PyCoro_CheckExact(iter)) {
PyObject *yf = _PyGen_yf((PyGenObject*)iter);
if (yf != NULL) {
/* `iter` is a coroutine object that is being
awaited, `yf` is a pointer to the current awaitable
being awaited on. */
Py_DECREF(yf);
Py_CLEAR(iter);
_PyErr_SetString(tstate, PyExc_RuntimeError,
"coroutine is being awaited already");
/* The code below jumps to `error` if `iter` is NULL. */
}
}

SET_TOP(iter); /* Even if it's NULL */

if (iter == NULL) {
goto error;
}

PREDICT(LOAD_CONST);
DISPATCH();
}

GET_AWAITABLE做了这样几件事情:

  • 通过_PyCoro_GetAwaitableIter获取一个Awaitable对象的迭代器iter
  • 检查iter是否合法,检查当前Awaitable对象是否已经被await
  • iter置于栈顶

Awaitableiter到底是什么东西?我们来看_PyCoro_GetAwaitableIter的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
PyObject *
_PyCoro_GetAwaitableIter(PyObject *o)
{
unaryfunc getter = NULL;
PyTypeObject *ot;

if (PyCoro_CheckExact(o) || gen_is_coroutine(o)) {
/* 'o' is a coroutine. */
Py_INCREF(o);
return o;
}

ot = Py_TYPE(o);
if (ot->tp_as_async != NULL) {
getter = ot->tp_as_async->am_await;
}
if (getter != NULL) {
PyObject *res = (*getter)(o);
if (res != NULL) {
if (PyCoro_CheckExact(res) || gen_is_coroutine(res)) {
/* __await__ must return an *iterator*, not
a coroutine or another awaitable (see PEP 492) */
PyErr_SetString(PyExc_TypeError,
"__await__() returned a coroutine");
Py_CLEAR(res);
} else if (!PyIter_Check(res)) {
PyErr_Format(PyExc_TypeError,
"__await__() returned non-iterator "
"of type '%.100s'",
Py_TYPE(res)->tp_name);
Py_CLEAR(res);
}
}
return res;
}

PyErr_Format(PyExc_TypeError,
"object %.100s can't be used in 'await' expression",
ot->tp_name);
return NULL;
}

其中有重要的几句代码:

  • if (PyCoro_CheckExact(o) || gen_is_coroutine(o)) return o
  • getter = ot->tp_as_async->am_await
  • PyObject *res = (*getter)(o)

可以知晓,如果对象是协程的话会直接返回,不是协程的话看有无ot->tp_as_async->am_await接口支持。如果再追究的话,对于一般的生成器PyGen_Type,是没有这个接口的,所以是无法被await的。

GET_AWAITABLE之后,接下来是load了一个None,然后YIELD_FROMYIELD_FROM实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
case TARGET(YIELD_FROM): {
PyObject *v = POP();
PyObject *receiver = TOP();
PySendResult gen_status;
if (tstate->c_tracefunc == NULL) {
gen_status = PyIter_Send(receiver, v, &retval);
} else {
// 省略一些代码
}
Py_DECREF(v);
if (gen_status == PYGEN_ERROR) {
assert (retval == NULL);
goto error;
}
if (gen_status == PYGEN_RETURN) {
assert (retval != NULL);

Py_DECREF(receiver);
SET_TOP(retval);
retval = NULL;
DISPATCH();
}
assert (gen_status == PYGEN_NEXT);
/* receiver remains on stack, retval is value to be yielded */
/* and repeat... */
assert(f->f_lasti > 0);
f->f_lasti -= 1;
f->f_state = FRAME_SUSPENDED;
f->f_stackdepth = (int)(stack_pointer - f->f_valuestack);
goto exiting;
}

通过YIELD_FROM操作,实际上调用了PyIter_Send(coro, None, &retval)。我们来看PyIter_Send的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
PySendResult
PyIter_Send(PyObject *iter, PyObject *arg, PyObject **result)
{
_Py_IDENTIFIER(send);
assert(arg != NULL);
assert(result != NULL);
if (Py_TYPE(iter)->tp_as_async && Py_TYPE(iter)->tp_as_async->am_send) {
PySendResult res = Py_TYPE(iter)->tp_as_async->am_send(iter, arg, result);
assert(_Py_CheckSlotResult(iter, "am_send", res != PYGEN_ERROR));
return res;
}
if (arg == Py_None && PyIter_Check(iter)) {
*result = Py_TYPE(iter)->tp_iternext(iter);
}
else {
*result = _PyObject_CallMethodIdOneArg(iter, &PyId_send, arg);
}
if (*result != NULL) {
return PYGEN_NEXT;
}
if (_PyGen_FetchStopIterationValue(result) == 0) {
return PYGEN_RETURN;
}
return PYGEN_ERROR;
}

针对AwaitableIter,实际调用了Py_TYPE(iter)->tp_as_async->am_send(iter, arg, result),对应的函数是这个:

1
2
3
4
5
static PySendResult
PyGen_am_send(PyGenObject *gen, PyObject *arg, PyObject **result)
{
return gen_send_ex2(gen, arg, result, 0, 0);
}

看到这里就很令人熟悉了,没错,gen_send_ex2(coro, None, result, 0, 0)就是coro.send(None)的逻辑

gen_send_ex2中,函数体的返回值会正好赋到result上。再看YIELD_FROMif (gen_status == PYGEN_RETURN)分支,最终的返回值就会放到栈顶。

当我们调用xx = await Awaitable时,我们也就能够把Awaitable的返回值赋给xx了。这样,await原语就实现了它的作用。

版权声明
本文为博客HiKariのTechLab原创文章,转载请标明出处,谢谢~~~