接续第一话的内容,事件循环在创建之后,又是如何运行协程任务以及异步IO任务的?
由asyncio.run
的代码可知,loop.run_until_complete
是运行协程的方法。其定义如下:
1 | # base_events.py |
run_until_complete
方法传入的协程会通过tasks.ensure_future
方法被封装成一个task
实例,从上述的代码可以看到,最终落实到了loop.create_task
方法。
1 | # base_events.py |
loop.create_task
方法最终会生成一个Task
实例。Task
实例封装了协程以及其它一系列变量,最终调用loop
的call_soon
方法,传入了实例的__step
函数。call_soon
方法传入的函数,会通过events.Handle
封装生成一个handle
实例,并加入到事件循环的_ready
队列中。
__step
方法会通过coro.send(None)
或是coro.throw(exc)
方法启动Task
实例内部的协程并获取协程的返回结果,对于一般协程而言coro.send(None)
会直接throw
一个StopIteration
异常,并在异常结果里附上协程返回值。当然,也有其它情况(比如await
了一个yield
多次的Awaitable
实例)可能要多次call_soon
协程Task
的__step
函数,相关的例子可以查看stackoverflow的这篇文章。
在这之后,我们再回到run_until_complete
方法,在ensure_future
后,便调用loop.run_forever
方法,启动事件循环。
1 | # windows_events.py |
ProactorEventLoop
在调用run_forever
时,首先会用call_soon
方法将_loop_self_reading
方法加入排期。_loop_self_reading
方法会读取proactor
中的future
,并且将自己加入future
的完成时回调,实现不间断地读取future
实例。
之后,ProactorEventLoop
调用了BaseEventLoop
的run_forever
方法,在其中会不断执行_run_once
方法去一遍遍地迭代事件循环。一轮_run_once
会做以下几件事情:
- 清理
_scheduled
中被取消的定时任务 select
出事件列表并进行处理- 从
_scheduled
取出到时的任务,加入到_ready
列表中- 由上面的逻辑也可知,
call_soon
的任务也会被加入到_ready
列表中
- 由上面的逻辑也可知,
- 从
_ready
列表中依次取出所有handle
,调用_run
方法运行
通过这种机制,事件循环就能持续不断地运行任务。
由上述_run_once
的定义也可知,在select
事件列表一步会出现IOCP
的身影,这是因为BaseProactorEventLoop
的selector
就是proactor
,实际传入的就是IOCP
实例,因此最终就是调用了IOCP
实例的select
方法。也是只有在这一步,才会去处理一些IO操作。
所以问题来了,针对IO操作,asyncio
是如何进行调度的呢?我们首先看IocpProactor.select
的实现:
1 | # windows_events.py |
在IocpProactor._poll
中,会调用GetQueuedCompletionStatus
去查询完成端口的结果。直到有结果出现,才会根据结果中缓存的address
数据pop
出缓存的回调并且执行。
我们通过剖析一个IO操作的例子,来观察其中具体的奥秘:
1 | from multiprocessing import Process |
这是一个很简单的echo server
的实现,client
发送给server
信息,server
返回信息的reverse
。我们以client
的写操作writer.write
为例,看下IO事件是如何在事件循环里被处理的。
首先,open_connection
函数创建了对特定host
、port
的连接,并返回连接流的reader
跟writer
。
1 | async def open_connection(host=None, port=None, *, |
针对reader
,首先会初始化一个StreamReader
实例,再用StreamReaderProtocol
对reader
做进一步的封装。
针对writer
,首先会通过loop
的create_connection
方法,针对本次连接创建transport
实例,相当于一个通信管道的封装。transport
实例会与先前创建的StreamReaderProtocol
实例进行绑定。然后,再将创建的transport
实例和writer
绑定。
在ProactorEventLoop
中,会这样创建transport
实例:
1 | # proactor_events.py |
_ProactorSocketTransport
实例会同时对_ProactorReadPipeTransport
以及_ProactorBaseWritePipeTransport
的初始化,因此会提供对管道读写的功能。其继承链如下:
1 | (<class 'asyncio.proactor_events._ProactorSocketTransport'>, |
之后,当客户端开始写操作,调用writer.write
时,实质是进行了以下操作:
1 | # proactor_events.py |
第一次write
时,write_future
以及buffer
为空,因此触发了_loop_writing
逻辑。在_loop_writing
中,调用了self._loop._proactor.send(self._sock, data)
生成了一个写操作的future
。而_proactor
,也就是在ProactorEventLoop
里的IocpProactor
实例了。
1 | # windows_events.py |
在send
方法中,做了如下操作:
- 通过
CreateIoCompletionPort
注册socket
到完成端口 - 创建一个
Overlapped
实例,通过WSASend
发送数据到socket
- 创建一个关联
Overlapped
实例的future
,并且判断Overlapped
实例如果不在pending
状态就直接执行回调。之后,缓存这个future
实例到_cache
中。
在先前已经提到,事件循环执行时,IocpProactor
实例会调用_poll
方法,其中会采用GetQueuedCompletionStatus
查询IO操作完成结果。如果发现有IO操作完成,就会从操作中提取ov.address
并在_cache
中pop
出回调然后执行。这样通过IOCP
模型加上事件循环(事件循环实质就是IOCP
里头的worker
),就把writer.write
的操作从开始到完成的步骤给串起来了。
之后就是await writer.drain
,实质是做了如下操作:
1 | # streams.py |
writer.drain
实质上是await
了StreamReaderProtocol
实例的_drain_helper
协程,其中做了一些前置检查,然后依据当前事件循环设置了一个_drain_waiter
的future
实例,并await
。为什么要这么做呢?
首先,我们可以观察得知,在_run_once
的逻辑中,如果_ready
队列有任务,或者是有_scheduled
里头的定时任务,那么之后IocpProactor._poll
里头的GetQueuedCompletionStatus
就会有timeout
,否则GetQueuedCompletionStatus
对应的timeout
就是INFINITE
,会一直阻塞直到有IO事件完成。有兴趣的同学可以创建一个协程任务,里头create_future
之后await
下,一试便知。
然后,回到_ProactorBaseWritePipeTransport
的_loop_writing
方法。_write_fut
被创建后,会直接添加_loop_writing
为自己的完成回调。当IocpProactor
实例由GetQueuedCompletionStatus
获得一个完成事件之后,会取出来执行ov.getresult()
(在send
方法的finish_send
里头)来获取结果,这个结果就会被放到_write_fut
作为其最终的返回结果。此时_write_fut
由于完成了,因此会调用自己的回调_loop_writing
,但这个时候因为buffer
里没有数据了,所以就会走到_maybe_resume_protocol
1 | # transports.py |
在writer.drain
中,我们实际上是一直在await
这个_drain_waiter
。在调用_maybe_resume_protocol
之后,实际是走到了StreamReaderProtocol
实例的resume_writing
方法,在FlowControlMixin
类被定义。这个方法执行了两个操作:
- 将
_paused
状态置为False
_loop_writing
中,如果数据没发完,就会另外走到_maybe_pause_protocol
,会把这个状态置为true
。此时调用await writer.drain
,就正好会走到了await _drain_waiter
- 将
_drain_waiter
设置完成。这样,await writer.drain
就能完成了
客户端的写操作,在事件循环里就是通过如上复杂的方式调度的。总的来说,是如下的步骤:
- 用户调用
writer.write
将数据传进transport
的写buffer
transport
的_loop_writing
发现buffer
有数据,创建一个写future
- 通过
CreateIoCompletionPort
绑定socket
跟完成端口 - 通过
WSASend
发送数据 - 设置一个回调用来取发送数据的结果
- 取出来的结果给到写
future
- 写
future
预先设置_loop_writing
为完成回调,得到结果后执行下一轮_loop_writing
- 通过
- 用户调用
await writer.drain
- 写
future
在创建后,发现写future
没有到完成状态,先调用_maybe_pause_protocol
设置protocol
的_paused
为True
- 在
writer.drain
里判断protocol
为_paused
,重置_drain_waiter
为新的实例并await
- 写操作完成,触发写
future
的回调_loop_writing
。下一轮_loop_writing
发现没有数据发送,调用_maybe_resume_protocol
,设置protocol
的_paused
为False
,并设置_drain_waiter
为完成 _drain_waiter
完成,退出await writer.drain
- 写
针对读操作,以及其它的IO操作,有兴趣的小伙伴可以深入研究^_^