前面讲了进程创建与进程通信的内容,接下来讲一下多进程编程最能发挥的地方。对于同时运行多个同质任务来讲,采用multiprocessing.Pool进程池去管理是最方便的。Pool的用法如下:
1 | from multiprocessing import Pool, process |
打印出来的结果,可能是这样子的:
1 | [<SpawnProcess name='SpawnPoolWorker-4' pid=7648 parent=2468 started daemon>, |
我们可以在结果中看到这样的景象:
- 当进入
with Pool代码段时,进程池中的进程已经被预先创建了 - 总共16个任务,最后却只在进程池里单独1个进程中运行(小概率在2个进程中运行)
具体缘由,我们一起来看Pool的代码实现。
1 | class Pool(object): |
一个Pool实例总共包含以下的内容:
self._processes:所有worker子进程实例self._worker_handler:管理worker子进程的线程self._task_handler:任务调度线程self._result_handler:结果收集线程
上述所有的worker子进程跟管理线程在初始化的时候,都会被启动。首先我们来看worker进程的情况:
1 | def _repopulate_pool(self): |
_repopulate_pool是启动所有worker进程的出发点,顺流而下,所有worker进程最终会执行worker函数。worker函数有如下的步骤:
- 执行
initializer(*initargs)。在多进程的场景下,有很多模块到了子进程可能是未初始化的状态,而initializer就提供了一个在子进程中初始化某些模块或者变量的途径。 - 从
inqueue中get一个task的实例,将其unpack - 执行
task中的func,得到结果 - 将结果
put到outqueue
然后我们再看下_task_handler,对应的函数是Pool._handle_tasks
1 |
|
task_handler负责从taskqueue中得到task实例,并put到inqueue中。当所有task实例推送完毕后,像result_handler和所有worker process推送了None实例。从先前worker的代码中可以知晓,当inqueue收到了None,就代表任务已经推送完毕,可以break退出了。而至于为何也给到result_handler一个None实例,我们就看下result_handler的代码,来分析其中具体的机制。
1 |
|
在handle_results的主循环中,会不断地从outqueue里get结果,然后放到cache中。参考worker进程的实现中我们可以知晓,从outqueue里获取的结果即是worker任务执行后的返回值。
当所有task在handle_tasks线程被消费完之后,handle_tasks线程会在outqueue里put一个None值。handle_results线程接收到None值后,直到cache为空或者Pool被终止(TERMINATE)为止,都会继续接收子进程任务执行结果并存到cache里。cache为空或者Pool被终止之后,handle_results线程会清空outqueue,然后退出。
从worker、task、result线程的作用可以看到,inqueue、outqueue、cache是连接用户业务线程和子进程之间的桥梁。inqueue和outqueue的作用在前面的叙述中已经非常清楚,一个用来上传任务,一个用来下发执行结果。因此最终,我们还是要深入研究一下cache的运行机制。
首先我们还是回顾一开始给的例子,test_pool函数:
1 | def test_pool(): |
test_pool函数调用了starmap,starmap方法需要给定一个任务函数以及一组参数,和map不同的是,starmap指代的参数是带星的(*args),因此在调用任务函数时会unpack对应的参数。
starmap之后,涉及到的方法定义如下:
1 | def starmapstar(args): |
各类map/map_async方法,最终都会落实到_map_async方法。在_map_async方法中会做以下几件事情:
- 计算
chunksize,即每batch子任务的数量 - 通过
_get_tasks函数,获取传递子任务batch的generator - 生成
MapResult实例result - 在
taskqueue中放入_guarded_task_generation的任务generator实例
每个子进程最终会调用mapper(task_batch),相当于是list(itertools.starmap(func, task_batch)),也就是单个子进程会执行一个batch的任务,然后返回一组这个batch的执行结果。
从这个角度推论,假设每个任务函数执行要1s,总共16个子任务,chunksize是14,pool的size是2,那么一执行起来,前2秒的话2个子进程都会打印执行结果,然后接下来12秒就只有第1个子进程打印结果了。这是因为,第1个子进程一批被分了14个,第2个子进程一批就被分了剩下2个。如果其它变量不变,pool的size是3,那么打印的效果也和size为2的时候一样,这是因为chunksize太大,前2个子进程已经瓜分了所有子任务(14、2),第3个子进程啥任务都分不到了。
所以chunksize的设定,也是一门学问。实际使用pool的时候要注意这个坑。
接下来注意力转到MapResult实例result上,也是在这个地方会对任务缓存cache做一些操作。首先我们看MapResult的定义:
1 | job_counter = itertools.count() |
针对starmap而言,在worker中,执行了一批子任务之后,会调用put((job, i, result))返回独立的job_id、任务组编号、子任务批次的结果集。我们需要注意到,在result初始化时侯,会通过[None] * length占位所有的结果(self._values),并且在缓存中设置本次job(ApplyResult中self._cache[self._job] = self),然后在handle_results线程中_set结果时,调用了MapResult._set,会根据任务组编号i把对应位置的结果替代。直到所有批次的结果集执行结果返回后,最终会清楚缓存中的这次starmap的job_id,然后调用self._event.set()
starmap函数会阻塞,直到所有结果返回。实现阻塞操作的方式,即是用了threading.Event()。starmap返回的是result.get(),在get的实现里,会调用self._event.wait,也就是阻塞,直到self._event.set。这样,只有所有结果返回,starmap才会返回。如果大家日常开发中,有这种等待直到执行成功的业务需求,不妨尝试用threading.Event(),比sleep轮询的方式优雅的多。
说到底,Pool为多进程编程提供了灵活的任务调度模型。日常如果需要用到进程池做并行操作,用原生的multiprocessing.Pool就是不二选择。
当然,并行任务还有一种选择方案实用ProcessPoolExecutor,比原生的Pool稍微轻量级一点。ProcessPoolExecutor的机理实现,也可以参考这篇文档。