接续第一话的内容,事件循环在创建之后,又是如何运行协程任务以及异步IO任务的?
由asyncio.run的代码可知,loop.run_until_complete是运行协程的方法。其定义如下:
1 | # base_events.py |
run_until_complete方法传入的协程会通过tasks.ensure_future方法被封装成一个task实例,从上述的代码可以看到,最终落实到了loop.create_task方法。
1 | # base_events.py |
loop.create_task方法最终会生成一个Task实例。Task实例封装了协程以及其它一系列变量,最终调用loop的call_soon方法,传入了实例的__step函数。call_soon方法传入的函数,会通过events.Handle封装生成一个handle实例,并加入到事件循环的_ready队列中。
__step方法会通过coro.send(None)或是coro.throw(exc)方法启动Task实例内部的协程并获取协程的返回结果,对于一般协程而言coro.send(None)会直接throw一个StopIteration异常,并在异常结果里附上协程返回值。当然,也有其它情况(比如await了一个yield多次的Awaitable实例)可能要多次call_soon协程Task的__step函数,相关的例子可以查看stackoverflow的这篇文章。
在这之后,我们再回到run_until_complete方法,在ensure_future后,便调用loop.run_forever方法,启动事件循环。
1 | # windows_events.py |
ProactorEventLoop在调用run_forever时,首先会用call_soon方法将_loop_self_reading方法加入排期。_loop_self_reading方法会读取proactor中的future,并且将自己加入future的完成时回调,实现不间断地读取future实例。
之后,ProactorEventLoop调用了BaseEventLoop的run_forever方法,在其中会不断执行_run_once方法去一遍遍地迭代事件循环。一轮_run_once会做以下几件事情:
- 清理
_scheduled中被取消的定时任务 select出事件列表并进行处理- 从
_scheduled取出到时的任务,加入到_ready列表中- 由上面的逻辑也可知,
call_soon的任务也会被加入到_ready列表中
- 由上面的逻辑也可知,
- 从
_ready列表中依次取出所有handle,调用_run方法运行
通过这种机制,事件循环就能持续不断地运行任务。
由上述_run_once的定义也可知,在select事件列表一步会出现IOCP的身影,这是因为BaseProactorEventLoop的selector就是proactor,实际传入的就是IOCP实例,因此最终就是调用了IOCP实例的select方法。也是只有在这一步,才会去处理一些IO操作。
所以问题来了,针对IO操作,asyncio是如何进行调度的呢?我们首先看IocpProactor.select的实现:
1 | # windows_events.py |
在IocpProactor._poll中,会调用GetQueuedCompletionStatus去查询完成端口的结果。直到有结果出现,才会根据结果中缓存的address数据pop出缓存的回调并且执行。
我们通过剖析一个IO操作的例子,来观察其中具体的奥秘:
1 | from multiprocessing import Process |
这是一个很简单的echo server的实现,client发送给server信息,server返回信息的reverse。我们以client的写操作writer.write为例,看下IO事件是如何在事件循环里被处理的。
首先,open_connection函数创建了对特定host、port的连接,并返回连接流的reader跟writer。
1 | async def open_connection(host=None, port=None, *, |
针对reader,首先会初始化一个StreamReader实例,再用StreamReaderProtocol对reader做进一步的封装。
针对writer,首先会通过loop的create_connection方法,针对本次连接创建transport实例,相当于一个通信管道的封装。transport实例会与先前创建的StreamReaderProtocol实例进行绑定。然后,再将创建的transport实例和writer绑定。
在ProactorEventLoop中,会这样创建transport实例:
1 | # proactor_events.py |
_ProactorSocketTransport实例会同时对_ProactorReadPipeTransport以及_ProactorBaseWritePipeTransport的初始化,因此会提供对管道读写的功能。其继承链如下:
1 | (<class 'asyncio.proactor_events._ProactorSocketTransport'>, |
之后,当客户端开始写操作,调用writer.write时,实质是进行了以下操作:
1 | # proactor_events.py |
第一次write时,write_future以及buffer为空,因此触发了_loop_writing逻辑。在_loop_writing中,调用了self._loop._proactor.send(self._sock, data)生成了一个写操作的future。而_proactor,也就是在ProactorEventLoop里的IocpProactor实例了。
1 | # windows_events.py |
在send方法中,做了如下操作:
- 通过
CreateIoCompletionPort注册socket到完成端口 - 创建一个
Overlapped实例,通过WSASend发送数据到socket - 创建一个关联
Overlapped实例的future,并且判断Overlapped实例如果不在pending状态就直接执行回调。之后,缓存这个future实例到_cache中。
在先前已经提到,事件循环执行时,IocpProactor实例会调用_poll方法,其中会采用GetQueuedCompletionStatus查询IO操作完成结果。如果发现有IO操作完成,就会从操作中提取ov.address并在_cache中pop出回调然后执行。这样通过IOCP模型加上事件循环(事件循环实质就是IOCP里头的worker),就把writer.write的操作从开始到完成的步骤给串起来了。
之后就是await writer.drain,实质是做了如下操作:
1 | # streams.py |
writer.drain实质上是await了StreamReaderProtocol实例的_drain_helper协程,其中做了一些前置检查,然后依据当前事件循环设置了一个_drain_waiter的future实例,并await。为什么要这么做呢?
首先,我们可以观察得知,在_run_once的逻辑中,如果_ready队列有任务,或者是有_scheduled里头的定时任务,那么之后IocpProactor._poll里头的GetQueuedCompletionStatus就会有timeout,否则GetQueuedCompletionStatus对应的timeout就是INFINITE,会一直阻塞直到有IO事件完成。有兴趣的同学可以创建一个协程任务,里头create_future之后await下,一试便知。
然后,回到_ProactorBaseWritePipeTransport的_loop_writing方法。_write_fut被创建后,会直接添加_loop_writing为自己的完成回调。当IocpProactor实例由GetQueuedCompletionStatus获得一个完成事件之后,会取出来执行ov.getresult()(在send方法的finish_send里头)来获取结果,这个结果就会被放到_write_fut作为其最终的返回结果。此时_write_fut由于完成了,因此会调用自己的回调_loop_writing,但这个时候因为buffer里没有数据了,所以就会走到_maybe_resume_protocol
1 | # transports.py |
在writer.drain中,我们实际上是一直在await这个_drain_waiter。在调用_maybe_resume_protocol之后,实际是走到了StreamReaderProtocol实例的resume_writing方法,在FlowControlMixin类被定义。这个方法执行了两个操作:
- 将
_paused状态置为False_loop_writing中,如果数据没发完,就会另外走到_maybe_pause_protocol,会把这个状态置为true。此时调用await writer.drain,就正好会走到了await _drain_waiter
- 将
_drain_waiter设置完成。这样,await writer.drain就能完成了
客户端的写操作,在事件循环里就是通过如上复杂的方式调度的。总的来说,是如下的步骤:
- 用户调用
writer.write将数据传进transport的写buffer transport的_loop_writing发现buffer有数据,创建一个写future- 通过
CreateIoCompletionPort绑定socket跟完成端口 - 通过
WSASend发送数据 - 设置一个回调用来取发送数据的结果
- 取出来的结果给到写
future - 写
future预先设置_loop_writing为完成回调,得到结果后执行下一轮_loop_writing
- 通过
- 用户调用
await writer.drain- 写
future在创建后,发现写future没有到完成状态,先调用_maybe_pause_protocol设置protocol的_paused为True - 在
writer.drain里判断protocol为_paused,重置_drain_waiter为新的实例并await - 写操作完成,触发写
future的回调_loop_writing。下一轮_loop_writing发现没有数据发送,调用_maybe_resume_protocol,设置protocol的_paused为False,并设置_drain_waiter为完成 _drain_waiter完成,退出await writer.drain
- 写
针对读操作,以及其它的IO操作,有兴趣的小伙伴可以深入研究^_^