【Python随笔】掌握子进程subprocess模块的使用方法

python开发期间,很多时候我们会需要执行一段cmd终端命令,或者是执行其他程序返回stdout或者文件输出结果。这种时候,我们就需要用到subprocess模块。虽然我们用os.system也可以达到执行命令的需求,但用os.system只是干发一段命令,对于执行命令的程序,我们没有办法跟踪它的内部状态以及执行结果,因此从稳定性的角度来讲不是一个好的选择。因此,本篇文章讲解下subprocess子进程模块的的基础应用,让没用过这个模块或是经常踩坑的同学都避避坑。

subprocess模块的官方文档在这里,最核心的单位是subprocess.Popen类,它描述了一个正在运行中的进程。subprocess最基础的用法是subprocess.run,我们入参一段cmd终端命令,run方法内部就会启动一个Popen对象执行这个命令,等待命令执行结束后,返回这个命令执行的退出码retcode,标准输出流内容stdout以及标准错误流内容stderr。我们可以从源码中详细捋一下subprocess.run的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def run(*popenargs,
input=None, capture_output=False, timeout=None, check=False, **kwargs):
# 忽略上面参数处理部分
with Popen(*popenargs, **kwargs) as process: # 新起一个Popen的context
try:
# 通过communicate方法拉取最终stdout、stderr的所有数据
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired as exc:
# 超时处理,向os申请杀进程,等待进程结束
process.kill()
if _mswindows:
exc.stdout, exc.stderr = process.communicate() # communicate也可用来模拟键盘输入
else:
process.wait()
raise
except: # Including KeyboardInterrupt, communicate handled that.
# 向os申请杀进程
process.kill()
# We don't call process.wait() as .__exit__ does that for us.
raise
retcode = process.poll() # 获取exitcode
if check and retcode:
raise CalledProcessError(retcode, process.args,
output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)

subprocess.run是一个阻塞方法,执行了这个接口后,需要等待run入参的命令执行完才能返回。而有些时候,我们需要单独起一个(进程执行)cmd命令,然后周期性每几秒钟去检查命令执行的状态,检查完之后我们还可以在主进程干别的事情,也就是搞一个独立出来的进程。这种情况下,subprocess.run就无法满足,必须直接开subprocess.Popen

一个基本的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import subprocess
import platform
import os
import signal


def _decode_bytes(_bytes):
encoding = 'gbk'
return _bytes.decode(encoding)


def _decode_stream(stream):
"""windows下解码stdout/stderr的数据"""
if not stream:
return ''
return _decode_bytes(stream.read())


# set params
args = ['ping', '127.0.0.1'] # windows这里不用timeout,因为不支持stdin的重定向,用ping大概3~4s的时间总共
working_directory = '.' # 支持设置命令的工作目录
wait_timeout = 1 # 命令每周期等待时间
cnt, maxcnt = 0, 4 # 等待次数

# run process
print(f'platform system: {platform.system()}')
p = subprocess.Popen(args,
cwd=working_directory,
# 设置subprocess.PIPE,这样执行完后可以从p.stdout/p.stderr获取输出数据
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print(f'process args: {args}')
print(f'process pid: {p.pid}')
while cnt < maxcnt:
try:
p.wait(timeout=wait_timeout)
except Exception as e:
print(f'attempt {cnt} -> wait err: {e}')
cnt += 1
finally:
if p.returncode is not None: # 看是否有退出码,来判断进程是否执行结束
break

# check retcode
if p.returncode is None:
print('[ERROR] retcode is None, maybe timeout, try kill process...')
if platform.system() == 'Windows': # windows下,强杀进程用taskkill,因为没有SIGKILL
kill_proc_ret = subprocess.run(['taskkill', '/f', '/pid', str(p.pid)], capture_output=True)
print(f'[KILLPROC] {_decode_bytes(kill_proc_ret.stdout)}')
else: # 其他情况下可以发送SIGKILL
os.kill(p.pid, signal.SIGKILL)
else: # 打印返回数据
retcode, stdout, stderr = p.returncode, _decode_stream(p.stdout), _decode_stream(p.stderr)
print(f'[OK] retcode: {retcode}\n\tstdout: {stdout}\n\tstderr: {stderr}')

这段程序模拟了周期性等待子进程执行完成的场景,执行完成后拉取stdoutstderr打印,执行超时就强杀进程。基本上关键的地方都有注释,如果有其他类似的场景,可以直接照搬代码。

最后我们也能看到,subprocess本质也是多进程,但和multiprocessing有所不同,multiprocessing是多个python进程,着重于管理多个python进程的运行时环境以及之间的通信;而subprocess则是侧重于去跟踪python程序启动的任意类型进程的状态。两者也有共同点,就是主进程都会持有子进程的handle,只要没调用类似subprocess.run这种阻塞获取子进程状态/结果的接口,在起了新进程后,主进程内都能够随时随地去获取子进程的状态信息。

版权声明
本文为博客HiKariのTechLab原创文章,转载请标明出处,谢谢~~~