【Hard Python】【第二章-异步IO】2、异步任务在事件循环中的执行

接续第一话的内容,事件循环在创建之后,又是如何运行协程任务以及异步IO任务的?

asyncio.run的代码可知,loop.run_until_complete是运行协程的方法。其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
self._check_closed()
self._check_running()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()


# tasks.py
def ensure_future(coro_or_future, *, loop=None):
return _ensure_future(coro_or_future, loop=loop)


def _ensure_future(coro_or_future, *, loop=None):
if futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future

if not coroutines.iscoroutine(coro_or_future):
if inspect.isawaitable(coro_or_future):
coro_or_future = _wrap_awaitable(coro_or_future)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
'is required')

if loop is None:
loop = events._get_event_loop(stacklevel=4)
return loop.create_task(coro_or_future)

run_until_complete方法传入的协程会通过tasks.ensure_future方法被封装成一个task实例,从上述的代码可以看到,最终落实到了loop.create_task方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro, *, name=None):
self._check_closed()
if self._task_factory is None:
task = tasks.Task(coro, loop=self, name=name)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
tasks._set_task_name(task, name)

return task


# task.py
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
if not coroutines.iscoroutine(coro):
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")

if name is None:
self._name = f'Task-{_task_name_counter()}'
else:
self._name = str(name)

self._must_cancel = False
self._fut_waiter = None
self._coro = coro
self._context = contextvars.copy_context()

self._loop.call_soon(self.__step, context=self._context)
_register_task(self)


# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle

def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle

loop.create_task方法最终会生成一个Task实例。Task实例封装了协程以及其它一系列变量,最终调用loopcall_soon方法,传入了实例的__step函数。call_soon方法传入的函数,会通过events.Handle封装生成一个handle实例,并加入到事件循环的_ready队列中。

__step方法会通过coro.send(None)或是coro.throw(exc)方法启动Task实例内部的协程并获取协程的返回结果,对于一般协程而言coro.send(None)会直接throw一个StopIteration异常,并在异常结果里附上协程返回值。当然,也有其它情况(比如await了一个yield多次的Awaitable实例)可能要多次call_soon协程Task__step函数,相关的例子可以查看stackoverflow的这篇文章

在这之后,我们再回到run_until_complete方法,在ensure_future后,便调用loop.run_forever方法,启动事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# windows_events.py
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def run_forever(self):
try:
assert self._self_reading_future is None
self.call_soon(self._loop_self_reading)
super().run_forever()
finally:
if self._self_reading_future is not None:
ov = self._self_reading_future._ov
self._self_reading_future.cancel()
if ov is not None:
self._proactor._unregister(ov)
self._self_reading_future = None


# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
def _loop_self_reading(self, f=None):
try:
if f is not None:
f.result() # may raise
if self._self_reading_future is not f:
return
f = self._proactor.recv(self._ssock, 4096)
except exceptions.CancelledError:
return
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self.call_exception_handler({
'message': 'Error on reading from the event loop self pipe',
'exception': exc,
'loop': self,
})
else:
self._self_reading_future = f
f.add_done_callback(self._loop_self_reading)


# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_forever(self):
self._check_closed()
self._check_running()
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()

old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)

def _run_once(self):
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)

heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False

timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

event_list = self._selector.select(timeout)
self._process_events(event_list)

end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
# debug模式下代码,加了时间统计
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.

ProactorEventLoop在调用run_forever时,首先会用call_soon方法将_loop_self_reading方法加入排期。_loop_self_reading方法会读取proactor中的future,并且将自己加入future的完成时回调,实现不间断地读取future实例。

之后,ProactorEventLoop调用了BaseEventLooprun_forever方法,在其中会不断执行_run_once方法去一遍遍地迭代事件循环。一轮_run_once会做以下几件事情:

  • 清理_scheduled中被取消的定时任务
  • select出事件列表并进行处理
  • _scheduled取出到时的任务,加入到_ready列表中
    • 由上面的逻辑也可知,call_soon的任务也会被加入到_ready列表中
  • _ready列表中依次取出所有handle,调用_run方法运行

通过这种机制,事件循环就能持续不断地运行任务。

由上述_run_once的定义也可知,在select事件列表一步会出现IOCP的身影,这是因为BaseProactorEventLoopselector就是proactor,实际传入的就是IOCP实例,因此最终就是调用了IOCP实例的select方法。也是只有在这一步,才会去处理一些IO操作。

所以问题来了,针对IO操作,asyncio是如何进行调度的呢?我们首先看IocpProactor.select的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# windows_events.py
class IocpProactor:
def select(self, timeout=None):
if not self._results:
self._poll(timeout)
tmp = self._results
self._results = []
return tmp

def _poll(self, timeout=None):
if timeout is None:
ms = INFINITE
elif timeout < 0:
raise ValueError("negative timeout")
else:
ms = math.ceil(timeout * 1e3)
if ms >= INFINITE:
raise ValueError("timeout too big")

while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
if self._loop.get_debug():
self._loop.call_exception_handler({
'message': ('GetQueuedCompletionStatus() returned an '
'unexpected event'),
'status': ('err=%s transferred=%s key=%#x address=%#x'
% (err, transferred, key, address)),
})
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
elif not f.done():
try:
value = callback(transferred, key, ov)
except OSError as e:
f.set_exception(e)
self._results.append(f)
else:
f.set_result(value)
self._results.append(f)

# Remove unregistered futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()

IocpProactor._poll中,会调用GetQueuedCompletionStatus去查询完成端口的结果。直到有结果出现,才会根据结果中缓存的address数据pop出缓存的回调并且执行。

我们通过剖析一个IO操作的例子,来观察其中具体的奥秘:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from multiprocessing import Process
import asyncio
import time


HOST, PORT = '127.0.0.1', 31077


async def _svr_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
data = await reader.read(1024)
msg = data.decode()
print(f'[Server] recv: {msg}')

msg_back = ''.join([msg[i] for i in range(len(msg) - 1, 0, -1)])
print(f'[Server] send: {msg_back}')
writer.write(msg_back.encode())
await writer.drain()

writer.close()


async def _svr_task():
svr = await asyncio.start_server(_svr_handler, host=HOST, port=PORT)
async with svr:
await svr.serve_forever()


def _svr():
asyncio.run(_svr_task())


async def _test_cli(msg: str):
reader, writer = await asyncio.open_connection(HOST, PORT)

print(f'[Client] send: {msg}')
writer.write(msg.encode())
await writer.drain()

data = await reader.read(1024)
print(f'[Client] recv: {data.decode()}')

writer.close()
await writer.wait_closed()


def test_cli():
p = Process(target=_svr, daemon=True)
p.start()
time.sleep(0.5)
_msg = 'helloworld'
asyncio.run(_test_cli(_msg))
p.kill()


if __name__ == '__main__':
test_cli()

这是一个很简单的echo server的实现,client发送给server信息,server返回信息的reverse。我们以client的写操作writer.write为例,看下IO事件是如何在事件循环里被处理的。

首先,open_connection函数创建了对特定hostport的连接,并返回连接流的readerwriter

1
2
3
4
5
6
7
8
9
async def open_connection(host=None, port=None, *,
limit=_DEFAULT_LIMIT, **kwds):
loop = events.get_running_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer

针对reader,首先会初始化一个StreamReader实例,再用StreamReaderProtocolreader做进一步的封装。

针对writer,首先会通过loopcreate_connection方法,针对本次连接创建transport实例,相当于一个通信管道的封装。transport实例会与先前创建的StreamReaderProtocol实例进行绑定。然后,再将创建的transport实例和writer绑定。

ProactorEventLoop中,会这样创建transport实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
return _ProactorSocketTransport(self, sock, protocol, waiter,
extra, server)


class _ProactorSocketTransport(_ProactorReadPipeTransport,
_ProactorBaseWritePipeTransport,
transports.Transport):

def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
base_events._set_nodelay(sock)

_ProactorSocketTransport实例会同时对_ProactorReadPipeTransport以及_ProactorBaseWritePipeTransport的初始化,因此会提供对管道读写的功能。其继承链如下:

1
2
3
4
5
6
7
8
9
10
(<class 'asyncio.proactor_events._ProactorSocketTransport'>,
<class 'asyncio.proactor_events._ProactorReadPipeTransport'>,
<class 'asyncio.proactor_events._ProactorBaseWritePipeTransport'>,
<class 'asyncio.proactor_events._ProactorBasePipeTransport'>,
<class 'asyncio.transports._FlowControlMixin'>,
<class 'asyncio.transports.Transport'>,
<class 'asyncio.transports.ReadTransport'>,
<class 'asyncio.transports.WriteTransport'>,
<class 'asyncio.transports.BaseTransport'>,
<class 'object'>)

之后,当客户端开始写操作,调用writer.write时,实质是进行了以下操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# proactor_events.py
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
transports.WriteTransport):
def write(self, data):
# 省略一些判断逻辑
# Observable states:
# 1. IDLE: _write_fut and _buffer both None
# 2. WRITING: _write_fut set; _buffer None
# 3. BACKED UP: _write_fut set; _buffer a bytearray
# We always copy the data, so the caller can't modify it
# while we're still waiting for the I/O to happen.
if self._write_fut is None: # IDLE -> WRITING
assert self._buffer is None
# Pass a copy, except if it's already immutable.
self._loop_writing(data=bytes(data))
elif not self._buffer: # WRITING -> BACKED UP
# Make a mutable copy which we can extend.
self._buffer = bytearray(data)
self._maybe_pause_protocol()
else: # BACKED UP
# Append to buffer (also copies).
self._buffer.extend(data)
self._maybe_pause_protocol()

def _loop_writing(self, f=None, data=None):
try:
if f is not None and self._write_fut is None and self._closing:
return
assert f is self._write_fut
self._write_fut = None
self._pending_write = 0
if f:
f.result()
if data is None:
data = self._buffer
self._buffer = None
if not data:
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
self._maybe_resume_protocol()
else:
self._write_fut = self._loop._proactor.send(self._sock, data)
if not self._write_fut.done():
assert self._pending_write == 0
self._pending_write = len(data)
self._write_fut.add_done_callback(self._loop_writing)
self._maybe_pause_protocol()
else:
self._write_fut.add_done_callback(self._loop_writing)
if self._empty_waiter is not None and self._write_fut is None:
self._empty_waiter.set_result(None)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal write error on pipe transport')

第一次write时,write_future以及buffer为空,因此触发了_loop_writing逻辑。在_loop_writing中,调用了self._loop._proactor.send(self._sock, data)生成了一个写操作的future。而_proactor,也就是在ProactorEventLoop里的IocpProactor实例了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# windows_events.py
class IocpProactor:
def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
if isinstance(conn, socket.socket):
ov.WSASend(conn.fileno(), buf, flags)
else:
ov.WriteFile(conn.fileno(), buf)

def finish_send(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_send)

def _register_with_iocp(self, obj):
if obj not in self._registered:
self._registered.add(obj)
_overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)

def _register(self, ov, obj, callback):
self._check_closed()

f = _OverlappedFuture(ov, loop=self._loop)
if f._source_traceback:
del f._source_traceback[-1]
if not ov.pending:
try:
value = callback(None, None, ov)
except OSError as e:
f.set_exception(e)
else:
f.set_result(value)

self._cache[ov.address] = (f, ov, obj, callback)
return f

send方法中,做了如下操作:

  • 通过CreateIoCompletionPort注册socket到完成端口
  • 创建一个Overlapped实例,通过WSASend发送数据到socket
  • 创建一个关联Overlapped实例的future,并且判断Overlapped实例如果不在pending状态就直接执行回调。之后,缓存这个future实例到_cache中。


在先前已经提到,事件循环执行时,IocpProactor实例会调用_poll方法,其中会采用GetQueuedCompletionStatus查询IO操作完成结果。如果发现有IO操作完成,就会从操作中提取ov.address并在_cachepop出回调然后执行。这样通过IOCP模型加上事件循环(事件循环实质就是IOCP里头的worker),就把writer.write的操作从开始到完成的步骤给串起来了。

之后就是await writer.drain,实质是做了如下操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# streams.py
class StreamWriter:
async def drain(self):
await self._protocol._drain_helper()


class FlowControlMixin(protocols.Protocol): # 这个会被StreamReaderProtocol继承
async def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
waiter = self._drain_waiter
assert waiter is None or waiter.cancelled()
waiter = self._loop.create_future()
self._drain_waiter = waiter
await waiter

writer.drain实质上是awaitStreamReaderProtocol实例的_drain_helper协程,其中做了一些前置检查,然后依据当前事件循环设置了一个_drain_waiterfuture实例,并await。为什么要这么做呢?

首先,我们可以观察得知,在_run_once的逻辑中,如果_ready队列有任务,或者是有_scheduled里头的定时任务,那么之后IocpProactor._poll里头的GetQueuedCompletionStatus就会有timeout,否则GetQueuedCompletionStatus对应的timeout就是INFINITE,会一直阻塞直到有IO事件完成。有兴趣的同学可以创建一个协程任务,里头create_future之后await下,一试便知。

然后,回到_ProactorBaseWritePipeTransport_loop_writing方法。_write_fut被创建后,会直接添加_loop_writing为自己的完成回调。当IocpProactor实例由GetQueuedCompletionStatus获得一个完成事件之后,会取出来执行ov.getresult()(在send方法的finish_send里头)来获取结果,这个结果就会被放到_write_fut作为其最终的返回结果。此时_write_fut由于完成了,因此会调用自己的回调_loop_writing,但这个时候因为buffer里没有数据了,所以就会走到_maybe_resume_protocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# transports.py
class _FlowControlMixin(Transport):
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self.get_write_buffer_size() <= self._low_water):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.resume_writing() failed',
'exception': exc,
'transport': self,
'protocol': self._protocol,
})


# streams.py
class FlowControlMixin(protocols.Protocol):
def resume_writing(self):
assert self._paused
self._paused = False
if self._loop.get_debug():
logger.debug("%r resumes writing", self)

waiter = self._drain_waiter
if waiter is not None:
self._drain_waiter = None
if not waiter.done():
waiter.set_result(None)

writer.drain中,我们实际上是一直在await这个_drain_waiter。在调用_maybe_resume_protocol之后,实际是走到了StreamReaderProtocol实例的resume_writing方法,在FlowControlMixin类被定义。这个方法执行了两个操作:

  • _paused状态置为False
    • _loop_writing中,如果数据没发完,就会另外走到_maybe_pause_protocol,会把这个状态置为true。此时调用await writer.drain,就正好会走到了await _drain_waiter
  • _drain_waiter设置完成。这样,await writer.drain就能完成了

客户端的写操作,在事件循环里就是通过如上复杂的方式调度的。总的来说,是如下的步骤:

  • 用户调用writer.write将数据传进transport的写buffer
  • transport_loop_writing发现buffer有数据,创建一个写future
    • 通过CreateIoCompletionPort绑定socket跟完成端口
    • 通过WSASend发送数据
    • 设置一个回调用来取发送数据的结果
    • 取出来的结果给到写future
    • future预先设置_loop_writing为完成回调,得到结果后执行下一轮_loop_writing
  • 用户调用await writer.drain
    • future在创建后,发现写future没有到完成状态,先调用_maybe_pause_protocol设置protocol_pausedTrue
    • writer.drain里判断protocol_paused,重置_drain_waiter为新的实例并await
    • 写操作完成,触发写future的回调_loop_writing。下一轮_loop_writing发现没有数据发送,调用_maybe_resume_protocol,设置protocol_pausedFalse,并设置_drain_waiter为完成
    • _drain_waiter完成,退出await writer.drain

针对读操作,以及其它的IO操作,有兴趣的小伙伴可以深入研究^_^

版权声明
本文为博客HiKariのTechLab原创文章,转载请标明出处,谢谢~~~