第一话详细讲解了Process新进程是如何被创建的,接下来就来讲一下进程之间有什么通信的方法。
要在multiprocessing中实现进程间通信,最直接的方法是采用Pipe或者Queue。其用法如下:
1 | from multiprocessing import Process, Pipe, Queue |
形式上非常简单。Pipe创建了一对reader跟writer,将reader传入子进程,主进程在writer写入数据,子进程即能通过reader读取到;Queue则更为方便,其实例能够直接传入子进程。主进程调用put即可写入数据,子进程调用get即可获取数据。
首先我们先看Pipe在windows下的实现:
1 | def Pipe(duplex=True): |
h1与h2互为服务端/客户端的关系。h1通过CreateNamedPipe创建,之后h2通过CreateFile连接到h1的NamedPipe上。之后用PipeConnection封装h1和h2两端,返回reader跟writer。当然,两个管道入口是否双工也是可选项。
经过PipeConnection的封装,管道即拥有了发送或接收python对象的方法。python对象的序列化/反序列化会用内置库pickle来支持。被pickle的对象会保留特定的信息,比如某个模块def的函数,在pickle时除了对函数本身进行序列化外,也会封存函数所属模块的信息。在unpickle时,如果找不到对应模块的信息,就会报错。因此多进程之间通信python对象时,需要留意序列化/反序列化后对应对象的取值/模块环境等情况。pickle的官方文档给到了我们足够的信息去了解这些机制。
接下来我们转向Queue的实现。相对于Pipe,Queue是对其的封装,并提供了更多的功能。这里我们完整列举一下Queue的关键代码::
1 | class Queue(object): |
在Queue的__init__函数中,构造了这么些对象:
self._maxsize:Queue队列的最大长度self._reader,self._writer:pipe的两端self._rlock:self._reader的读锁self._wlock:self._writer的写锁self._sem:以self._maxsize为基准的信号量,用来记录队列填满情况self._notempty:队列非空的条件变量。如果队列为空,需要waitself._buffer:存储python对象的队列dequeself._thread:消费线程,用于消费self._buffer中的内容并发送到pipe中self._jointhread:关闭消费线程的finalizerself._close:关闭pipe的finalizer
其中还要留意一点是,Queue实例本身是要传给Process实例,并在另一个进程被反序列化一次。因此为了保证序列化/反序列化之后部分状态得到保留(比如pipe),Queue的类定义中采用了__getstate__和__setstate__两个钩子去实现实例内部状态的存储与读取。这个特性在pickle的官方文档内有详细的说明。
大致了解了这些对象的含义后,接下来,就详细把Queue的工作流程列一下:
- 用户调用
put,信号量self._sem增加一个占位,之后发现消费线程未启动,通过self._start_thread启动消费线程Queue._feed - 消费线程进入循环,发现队列
self._buffer为空,条件变量self._notempty进入wait self._start_thread之后,将python对象推入self._buffer,并notify条件变量self._notempty- 这一步很有概率发生在上一步之前,不过无所谓了
_feed退出wait状态,pop出python对象,然后将其pickle,最后调用self._writer._send_bytes发送序列化之后的数据到pipe内- 这里注意,如果
python对象是object(),会触发self._writer.close。因此实际业务代码中最好不要出现发送object()的情况
- 这里注意,如果
- 用户调用
get,通过self._reader读取pipe中的数据,并且让信号量self._sem释放一个占位。之后对数据进行反序列化,得到发送过来的对象 - 用户调用
close,首先关闭self._reader,然后在self._writer中发送一个object()。_feed会一直消费队列,直到检测到最后的object(),终于触发self._writer的关闭。这样pipe的两端就都关闭,并且buffer里也没有任何其它数据了。 - 用户调用
join_thread,触发self._thread.join(),关闭消费线程
在multiprocessing当中,Queue还有两种变体,分别为SimpleQueue和JoinableQueue。SimpleQueue没有提供blocking或timeout的功能,只是简单创建一对pipe交换序列化的数据。JoinableQueue则是在Queue的基础上增加了join的功能,其实现上是增加了一个初始值0的信号量_unfinished_tasks以及一个条件变量_cond。JoinableQueue在调用join时,如果_unfinished_tasks信号量不为0会进入_cond.wait,这是因为每次put的时候_unfinished_tasks信号量会release一次,只有用户每次get之后显式调用JoinableQueue.task_done才能acquire一次信号量,最终使得_unfinished_tasks信号量归零并notify_all所有join的调用。
最后,进程间通信的方法说到底,除了Pipe跟Queue外,采用Manager共享内存或者直接用socket网络通信都是ok的方式。当然,如果是在单节点上面,并且是一个内聚的python项目的话,Queue是不二选择。