zoukankan      html  css  js  c++  java
  • Python-subprocess执行命令并将输出劫持实现实时记录到日志

    Python-subprocess执行命令并将输出劫持实现实时记录到日志

    前言

    在写我自己的练手项目的时候,需要写一系列Python脚本来帮助我进行运维/环境配置,我希望这些脚本能够有比较好的日志记录。

    这一篇博客中,我实现了日志同时向控制台和日志中进行输出,并且二者的日志等级、日志格式不相同。

    这一篇博客中,我通过自定义日志的格式描述符,实现了定长的日志等级。

    解决完日志问题后,我需要解决这些脚本的另一个问题:执行命令。

    我写这一系列Python脚本就是因为我不擅长写shell脚本,而且Python脚本在我的开发环境(Windows)和运行环境(Linux)都可以运行。其需要担当的很重要的一个功能就是执行一些命令,如kubectl applymvn test等。

    我希望执行这些命令的时候,能够实时地将命令的输出,记录到日志中。

    本博客,我实现了执行命令,并将命令执行的输出进行劫持,实时记录到日志中。

    参考

    subprocess

    关于多进程协同,Python主要有三个手段:

    1. os.system()函数;
    2. multiprocessing模块;
    3. subprocess模块;

    其中multiprocessing主要用于解决计算密集型计算中,Python由于GIL(全局解释器锁)的存在,多线程无法提升性能,而需要创建多个进程来加快计算的场景。

    os.system()则是阻塞式的,而且似乎无法将其创建的子进程的输出进行劫持。

    subprocess模块几乎是唯一的选择。

    Popen

    subprocess虽然提供了简易使用的subprocess.run()方法,但是这个方法无法做到实时输出,也就是命令执行完成之后才能一次性获取命令的输出,即使传入了stdout=subprocess.PIPE要求其创建一个管道,仍旧是阻塞式的读取。stdout也可以指定为一个流,因此我们可以将其直接重定向到本程序的标准输出,但是logger并不是一个流。

    所以我们只能使用更为底层的subprocess.Popen对象来进行操作。

    process = subprocess.Popen(
        cmd,
        shell=True,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        )
    

    通过这样的参数,创建子进程运行cmd指定的命令,同时创建与子进程关联的process对象,此时process.stdoutprocess.stderr都是可以调用readline方法来一行行读取字符串的管道。

    实现

    通过subprocess.Popen()创建Popen对象后,子进程开始运行,我们可以通过管道来读取子进程的输出。因为有两个管道都需要读取,并且它们没有提供检查管道是否读取的方法,只能阻塞地尝试进行读取,所以我们需要创建读取线程来分别读取这两个管道。

    实现如下:

    import logging
    import subprocess
    import shlex
    import threading
    
    class CommandExecutionException(Exception):
        def __init__(self, command: str, exit_code: int) -> None:
            super().__init__(f"command executed fail with exit-code={exit_code}: {command}")
    
    _logger = logging.getLogger(__name__)
    
    class TextReadLineThread(threading.Thread):
        def __init__(self, readline, callback, *args, **kargs) -> None:
            super().__init__(*args, **kargs)
            self.readline = readline
            self.callback = callback
    
        def run(self):
            for line in iter(self.readline, ""):
                if len(line) == 0:
                    break
                self.callback(line)
    
    def cmd_exec(command: str, ensure_success: bool=True) -> int:
        _logger.info("executing command: {}".format(command))
    
        cmd = shlex.split(command)
    
        process = subprocess.Popen(
            cmd,
            shell=True,
            text=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            )
    
        _logger.debug("started command")
    
        def log_warp(func):
            def _wrapper(line: str):
                return func("	" + line.strip())
            return _wrapper
    
        read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info))
        read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning))
        read_stdout.start()
        read_stderr.start()
    
        read_stdout.join()
        _logger.debug("stdout reading finish")
        read_stderr.join()
        _logger.debug("stderr reading finish")
        ret = process.wait()
        _logger.debug("process finish")
    
        _logger.info("executed command with exit-code={}".format(ret))
        if ensure_success and ret != 0:
            raise CommandExecutionException(command=command, exit_code=ret)
        return ret
    
    if __name__ == '__main__':
        _logger_trans = {
                "DEBUG": "DBG",
                "INFO": "INF",
                "WARNING": "WAR",
                "CRITICAL": "ERR"
            }
        _old_factory = logging.getLogRecordFactory()
        def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord:
            record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs)
            record.shortlevelname = _logger_trans[record.levelname]
            return record
        logging.setLogRecordFactory(factory)
        logging.basicConfig(
            level=logging.DEBUG,
            format='[%(asctime)s %(shortlevelname)s] %(message)s',
            datefmt="%Y/%m/%d %H:%M:%S"
        )
        cmd_exec("ping localhost", ensure_success=False)
    

    TextReadLineThread的run方法中,我们使用了内置函数iter()的两个参数的形式,其第一个参数为一个可调用对象,第二个参数为该可调用对象可能的输出,通过iter(func, sentinel)函数产生的迭代器,每次迭代时都无参数地调用func,直到func()返回sentinel时结束迭代。

    关于main中日志配置,参考这一篇博客这一篇博客,对日志格式进行增强,使得我们能够明确看到子进程的输出被劫持了。

    这里我创建了两个线程,一个用于读取stdout,一个用于读取stderr,然后阻塞掉当前线程。其实可以只创建一个新的线程用于读取stderr,用当前线程读取stdout

    我使用了ping命令来进行测试,因为ping明显是隔一段时间输出一段,可以明显看出是不是实时进行日志记录。但是linux上ping默认是不会停止的,需要注意一下。

  • 相关阅读:
    推荐系统和业务系统对比:
    认识事物的过程是:
    思考:面向对象源码的解析和阅读需要注意和把握的点
    推荐系统中ES使用过程中遇到的问题:
    使用缓存功能要掌握住(心里有数)的点:
    思考:延迟决策是非常重要的
    压力测试中tps上不去的原因
    mysql 高级查询二
    mysql 高级查询
    fiddler设置https抓包
  • 原文地址:https://www.cnblogs.com/SnowPhoenix/p/15201426.html
Copyright © 2011-2022 走看看