第一话详细讲解了Process新进程是如何被创建的,接下来就来讲一下进程之间有什么通信的方法。
要在multiprocessing
中实现进程间通信,最直接的方法是采用Pipe
或者Queue
。其用法如下:
1 | from multiprocessing import Process, Pipe, Queue |
形式上非常简单。Pipe
创建了一对reader
跟writer
,将reader
传入子进程,主进程在writer
写入数据,子进程即能通过reader
读取到;Queue
则更为方便,其实例能够直接传入子进程。主进程调用put
即可写入数据,子进程调用get
即可获取数据。
首先我们先看Pipe
在windows下的实现:
1 | def Pipe(duplex=True): |
h1
与h2
互为服务端/客户端的关系。h1
通过CreateNamedPipe
创建,之后h2
通过CreateFile
连接到h1
的NamedPipe
上。之后用PipeConnection
封装h1
和h2
两端,返回reader
跟writer
。当然,两个管道入口是否双工也是可选项。
经过PipeConnection
的封装,管道即拥有了发送或接收python
对象的方法。python
对象的序列化/反序列化会用内置库pickle
来支持。被pickle
的对象会保留特定的信息,比如某个模块def
的函数,在pickle
时除了对函数本身进行序列化外,也会封存函数所属模块的信息。在unpickle
时,如果找不到对应模块的信息,就会报错。因此多进程之间通信python对象时,需要留意序列化/反序列化后对应对象的取值/模块环境等情况。pickle的官方文档给到了我们足够的信息去了解这些机制。
接下来我们转向Queue
的实现。相对于Pipe
,Queue
是对其的封装,并提供了更多的功能。这里我们完整列举一下Queue
的关键代码::
1 | class Queue(object): |
在Queue
的__init__
函数中,构造了这么些对象:
self._maxsize
:Queue
队列的最大长度self._reader
,self._writer
:pipe
的两端self._rlock
:self._reader
的读锁self._wlock
:self._writer
的写锁self._sem
:以self._maxsize
为基准的信号量,用来记录队列填满情况self._notempty
:队列非空的条件变量。如果队列为空,需要wait
self._buffer
:存储python
对象的队列deque
self._thread
:消费线程,用于消费self._buffer
中的内容并发送到pipe
中self._jointhread
:关闭消费线程的finalizer
self._close
:关闭pipe
的finalizer
其中还要留意一点是,Queue
实例本身是要传给Process
实例,并在另一个进程被反序列化一次。因此为了保证序列化/反序列化之后部分状态得到保留(比如pipe
),Queue
的类定义中采用了__getstate__
和__setstate__
两个钩子去实现实例内部状态的存储与读取。这个特性在pickle
的官方文档内有详细的说明。
大致了解了这些对象的含义后,接下来,就详细把Queue
的工作流程列一下:
- 用户调用
put
,信号量self._sem
增加一个占位,之后发现消费线程未启动,通过self._start_thread
启动消费线程Queue._feed
- 消费线程进入循环,发现队列
self._buffer
为空,条件变量self._notempty
进入wait
self._start_thread
之后,将python
对象推入self._buffer
,并notify
条件变量self._notempty
- 这一步很有概率发生在上一步之前,不过无所谓了
_feed
退出wait
状态,pop
出python
对象,然后将其pickle
,最后调用self._writer._send_bytes
发送序列化之后的数据到pipe
内- 这里注意,如果
python
对象是object()
,会触发self._writer.close
。因此实际业务代码中最好不要出现发送object()
的情况
- 这里注意,如果
- 用户调用
get
,通过self._reader
读取pipe
中的数据,并且让信号量self._sem
释放一个占位。之后对数据进行反序列化,得到发送过来的对象 - 用户调用
close
,首先关闭self._reader
,然后在self._writer
中发送一个object()
。_feed
会一直消费队列,直到检测到最后的object()
,终于触发self._writer
的关闭。这样pipe
的两端就都关闭,并且buffer
里也没有任何其它数据了。 - 用户调用
join_thread
,触发self._thread.join()
,关闭消费线程
在multiprocessing
当中,Queue
还有两种变体,分别为SimpleQueue
和JoinableQueue
。SimpleQueue
没有提供blocking
或timeout
的功能,只是简单创建一对pipe
交换序列化的数据。JoinableQueue
则是在Queue
的基础上增加了join
的功能,其实现上是增加了一个初始值0的信号量_unfinished_tasks
以及一个条件变量_cond
。JoinableQueue
在调用join
时,如果_unfinished_tasks
信号量不为0会进入_cond.wait
,这是因为每次put
的时候_unfinished_tasks
信号量会release
一次,只有用户每次get
之后显式调用JoinableQueue.task_done
才能acquire
一次信号量,最终使得_unfinished_tasks
信号量归零并notify_all
所有join
的调用。
最后,进程间通信的方法说到底,除了Pipe
跟Queue
外,采用Manager
共享内存或者直接用socket
网络通信都是ok的方式。当然,如果是在单节点上面,并且是一个内聚的python
项目的话,Queue
是不二选择。