【Hard Python】【第一章-多进程】2、Pipe和Queue,进程间通信

第一话详细讲解了Process新进程是如何被创建的,接下来就来讲一下进程之间有什么通信的方法。

要在multiprocessing中实现进程间通信,最直接的方法是采用Pipe或者Queue。其用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from multiprocessing import Process, Pipe, Queue
import time
from mp_module import log, seg


def _test_queue(q):
while True:
msg = q.get()
if msg == 'quit':
break
else:
log(f'recv: {msg}')
log('child process end~')


def test_queue():
seg('test queue start')
q = Queue()
p = Process(target=_test_queue, args=(q,))
p.start()
cmds = ['helloworld', 'testmsg', 'quit']
for cmd in cmds:
log(f'send: {cmd}')
q.put(cmd)
time.sleep(1)
assert not p.is_alive()
seg('test queue end')


def _test_pipe(r):
while True:
msg = r.recv()
if msg == 'quit':
break
else:
log(f'recv: {msg}')
log('child process end~')


def test_pipe():
seg('test pipe start')
r, w = Pipe()
p = Process(target=_test_pipe, args=(r,))
p.start()
cmds = ['helloworld', 'testmsg', 'quit']
for cmd in cmds:
log(f'send: {cmd}')
w.send(cmd)
time.sleep(1)
assert not p.is_alive()
seg('test pipe end')

形式上非常简单。Pipe创建了一对readerwriter,将reader传入子进程,主进程在writer写入数据,子进程即能通过reader读取到;Queue则更为方便,其实例能够直接传入子进程。主进程调用put即可写入数据,子进程调用get即可获取数据。

首先我们先看Pipe在windows下的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = _winapi.PIPE_ACCESS_INBOUND
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE

h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
_winapi.SetNamedPipeHandleState(
h2, _winapi.PIPE_READMODE_MESSAGE, None, None
)

overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
_, err = overlapped.GetOverlappedResult(True)
assert err == 0

c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)

return c1, c2

h1h2互为服务端/客户端的关系。h1通过CreateNamedPipe创建,之后h2通过CreateFile连接到h1NamedPipe上。之后用PipeConnection封装h1h2两端,返回readerwriter。当然,两个管道入口是否双工也是可选项。

经过PipeConnection的封装,管道即拥有了发送或接收python对象的方法。python对象的序列化/反序列化会用内置库pickle来支持。被pickle的对象会保留特定的信息,比如某个模块def的函数,在pickle时除了对函数本身进行序列化外,也会封存函数所属模块的信息。在unpickle时,如果找不到对应模块的信息,就会报错。因此多进程之间通信python对象时,需要留意序列化/反序列化后对应对象的取值/模块环境等情况。pickle的官方文档给到了我们足够的信息去了解这些机制。

接下来我们转向Queue的实现。相对于PipeQueue是对其的封装,并提供了更多的功能。这里我们完整列举一下Queue的关键代码::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class Queue(object):
def __init__(self, maxsize=0, *, ctx):
if maxsize <= 0:
from .synchronize import SEM_VALUE_MAX as maxsize
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)
self._ignore_epipe = False
self._reset()
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._reset()

def _reset(self, after_fork=False):
if after_fork:
self._notempty._at_fork_reinit()
else:
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send_bytes = self._writer.send_bytes
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
return _ForkingPickler.loads(res)

def close(self):
self._closed = True
try:
self._reader.close()
finally:
close = self._close
if close:
self._close = None
close()

def _start_thread(self):
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe,
self._on_queue_feeder_error, self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
self._thread.start()
if not self._joincancelled:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)

@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
bpopleft = buffer.popleft
sentinel = _sentinel
if sys.platform != 'win32':
wacquire = writelock.acquire
wrelease = writelock.release
else:
wacquire = None

while 1:
try:
nacquire()
try:
if not buffer:
nwait()
finally:
nrelease()
try:
while 1:
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
return
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
wacquire()
try:
send_bytes(obj)
finally:
wrelease()
except IndexError:
pass
except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return
if is_exiting():
info('error in queue thread: %s', e)
return
else:
queue_sem.release()
onerror(e, obj)

Queue__init__函数中,构造了这么些对象:

  • self._maxsizeQueue队列的最大长度
  • self._reader, self._writerpipe的两端
  • self._rlockself._reader的读锁
  • self._wlockself._writer的写锁
  • self._sem:以self._maxsize为基准的信号量,用来记录队列填满情况
  • self._notempty:队列非空的条件变量。如果队列为空,需要wait
  • self._buffer:存储python对象的队列deque
  • self._thread:消费线程,用于消费self._buffer中的内容并发送到pipe
  • self._jointhread:关闭消费线程的finalizer
  • self._close:关闭pipefinalizer

其中还要留意一点是,Queue实例本身是要传给Process实例,并在另一个进程被反序列化一次。因此为了保证序列化/反序列化之后部分状态得到保留(比如pipe),Queue的类定义中采用了__getstate____setstate__两个钩子去实现实例内部状态的存储与读取。这个特性在pickle的官方文档内有详细的说明。

大致了解了这些对象的含义后,接下来,就详细把Queue的工作流程列一下:

  • 用户调用put,信号量self._sem增加一个占位,之后发现消费线程未启动,通过self._start_thread启动消费线程Queue._feed
  • 消费线程进入循环,发现队列self._buffer为空,条件变量self._notempty进入wait
  • self._start_thread之后,将python对象推入self._buffer,并notify条件变量self._notempty
    • 这一步很有概率发生在上一步之前,不过无所谓了
  • _feed退出wait状态,poppython对象,然后将其pickle,最后调用self._writer._send_bytes发送序列化之后的数据到pipe
    • 这里注意,如果python对象是object(),会触发self._writer.close。因此实际业务代码中最好不要出现发送object()的情况
  • 用户调用get,通过self._reader读取pipe中的数据,并且让信号量self._sem释放一个占位。之后对数据进行反序列化,得到发送过来的对象
  • 用户调用close,首先关闭self._reader,然后在self._writer中发送一个object()_feed会一直消费队列,直到检测到最后的object(),终于触发self._writer的关闭。这样pipe的两端就都关闭,并且buffer里也没有任何其它数据了。
  • 用户调用join_thread,触发self._thread.join(),关闭消费线程

multiprocessing当中,Queue还有两种变体,分别为SimpleQueueJoinableQueueSimpleQueue没有提供blockingtimeout的功能,只是简单创建一对pipe交换序列化的数据。JoinableQueue则是在Queue的基础上增加了join的功能,其实现上是增加了一个初始值0的信号量_unfinished_tasks以及一个条件变量_condJoinableQueue在调用join时,如果_unfinished_tasks信号量不为0会进入_cond.wait,这是因为每次put的时候_unfinished_tasks信号量会release一次,只有用户每次get之后显式调用JoinableQueue.task_done才能acquire一次信号量,最终使得_unfinished_tasks信号量归零并notify_all所有join的调用。

最后,进程间通信的方法说到底,除了PipeQueue外,采用Manager共享内存或者直接用socket网络通信都是ok的方式。当然,如果是在单节点上面,并且是一个内聚的python项目的话,Queue是不二选择。

版权声明
本文为博客HiKariのTechLab原创文章,转载请标明出处,谢谢~~~