Python-subprocess执行命令并将输出劫持实现实时记录到日志
前言
在写我自己的练手项目的时候,需要写一系列Python脚本来帮助我进行运维/环境配置,我希望这些脚本能够有比较好的日志记录。
这一篇博客中,我实现了日志同时向控制台和日志中进行输出,并且二者的日志等级、日志格式不相同。
这一篇博客中,我通过自定义日志的格式描述符,实现了定长的日志等级。
解决完日志问题后,我需要解决这些脚本的另一个问题:执行命令。
我写这一系列Python脚本就是因为我不擅长写shell脚本,而且Python脚本在我的开发环境(Windows)和运行环境(Linux)都可以运行。其需要担当的很重要的一个功能就是执行一些命令,如kubectl apply
、mvn test
等。
我希望执行这些命令的时候,能够实时地将命令的输出,记录到日志中。
本博客,我实现了执行命令,并将命令执行的输出进行劫持,实时记录到日志中。
参考
subprocess
关于多进程协同,Python主要有三个手段:
- os.system()函数;
- multiprocessing模块;
- subprocess模块;
其中multiprocessing
主要用于解决计算密集型计算中,Python由于GIL(全局解释器锁)的存在,多线程无法提升性能,而需要创建多个进程来加快计算的场景。
而os.system()
则是阻塞式的,而且似乎无法将其创建的子进程的输出进行劫持。
subprocess
模块几乎是唯一的选择。
Popen
subprocess
虽然提供了简易使用的subprocess.run()
方法,但是这个方法无法做到实时输出,也就是命令执行完成之后才能一次性获取命令的输出,即使传入了stdout=subprocess.PIPE
要求其创建一个管道,仍旧是阻塞式的读取。stdout
也可以指定为一个流,因此我们可以将其直接重定向到本程序的标准输出,但是logger并不是一个流。
所以我们只能使用更为底层的subprocess.Popen对象来进行操作。
process = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
通过这样的参数,创建子进程运行cmd
指定的命令,同时创建与子进程关联的process
对象,此时process.stdout
和process.stderr
都是可以调用readline
方法来一行行读取字符串的管道。
实现
通过subprocess.Popen()
创建Popen
对象后,子进程开始运行,我们可以通过管道来读取子进程的输出。因为有两个管道都需要读取,并且它们没有提供检查管道是否读取的方法,只能阻塞地尝试进行读取,所以我们需要创建读取线程来分别读取这两个管道。
实现如下:
import logging
import subprocess
import shlex
import threading
class CommandExecutionException(Exception):
def __init__(self, command: str, exit_code: int) -> None:
super().__init__(f"command executed fail with exit-code={exit_code}: {command}")
_logger = logging.getLogger(__name__)
class TextReadLineThread(threading.Thread):
def __init__(self, readline, callback, *args, **kargs) -> None:
super().__init__(*args, **kargs)
self.readline = readline
self.callback = callback
def run(self):
for line in iter(self.readline, ""):
if len(line) == 0:
break
self.callback(line)
def cmd_exec(command: str, ensure_success: bool=True) -> int:
_logger.info("executing command: {}".format(command))
cmd = shlex.split(command)
process = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_logger.debug("started command")
def log_warp(func):
def _wrapper(line: str):
return func(" " + line.strip())
return _wrapper
read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info))
read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning))
read_stdout.start()
read_stderr.start()
read_stdout.join()
_logger.debug("stdout reading finish")
read_stderr.join()
_logger.debug("stderr reading finish")
ret = process.wait()
_logger.debug("process finish")
_logger.info("executed command with exit-code={}".format(ret))
if ensure_success and ret != 0:
raise CommandExecutionException(command=command, exit_code=ret)
return ret
if __name__ == '__main__':
_logger_trans = {
"DEBUG": "DBG",
"INFO": "INF",
"WARNING": "WAR",
"CRITICAL": "ERR"
}
_old_factory = logging.getLogRecordFactory()
def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord:
record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs)
record.shortlevelname = _logger_trans[record.levelname]
return record
logging.setLogRecordFactory(factory)
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s %(shortlevelname)s] %(message)s',
datefmt="%Y/%m/%d %H:%M:%S"
)
cmd_exec("ping localhost", ensure_success=False)
在TextReadLineThread
的run方法中,我们使用了内置函数iter()
的两个参数的形式,其第一个参数为一个可调用对象,第二个参数为该可调用对象可能的输出,通过iter(func, sentinel)
函数产生的迭代器,每次迭代时都无参数地调用func
,直到func()
返回sentinel
时结束迭代。
关于main
中日志配置,参考这一篇博客和这一篇博客,对日志格式进行增强,使得我们能够明确看到子进程的输出被劫持了。
这里我创建了两个线程,一个用于读取stdout
,一个用于读取stderr
,然后阻塞掉当前线程。其实可以只创建一个新的线程用于读取stderr
,用当前线程读取stdout
。
我使用了ping
命令来进行测试,因为ping
明显是隔一段时间输出一段,可以明显看出是不是实时进行日志记录。但是linux上ping
默认是不会停止的,需要注意一下。