zoukankan      html  css  js  c++  java
  • python使用subprocess及delegator调用第三方程序

    前言

      python里调用第三方程序一般用subprocess模块都可以满足了,但是同程序的交互方面使用subprocess没有到合适的方法,这里使用了一个第三方模块delegator.py。它实际上是对subprocess和pexpect.popen_spawn封装。但其subprocess封方式无法达我的要求(比如无法获取到实时的return_code), 因此主要使用了它里面的与程序交互的部分。其他部分直接使用subprocess

      安装delegator.py需要使用以下命令:

           pip install delegator.py 

      文件头:

    #/user/bin/env python3
    import delegator
    import time
    import subprocess
    from multiprocessing import Process
    import logging
    import shlex

    subprocess 等待调用

      执行命令并等待返回。此种场景推荐使用subprocess.run方法,很全面。

    if __name__ == '__main__':
    
        logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S',
                    filename='main.log',
                    filemode='w')
        console = logging.StreamHandler()
        console.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        console.setFormatter(formatter)
        logging.getLogger('').addHandler(console)
    
        #shell命令执行
        ls = subprocess.run('ls -lht', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True, cwd = None, timeout = 5)
        logging.info(ls.args)
        logging.info(ls.returncode)
        logging.info(ls.stdout)
       
        #非shell命令执行
        try:
            ffprobe = subprocess.run(shlex.split('ffprobe -show_streams -i rtmp://rtmp-push.source.test/live/i3jMugm01'), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=False, universal_newlines=True, cwd = '/usr/local/bin', timeout = 3)
        except Exception as e:
            logging.exception(e)
        else:
            logging.info(ffprobe.args)
            logging.info(ffprobe.returncode)
            logging.info(ffprobe.stdout)

    参数说明

    args

    待执行的命令,可以试字符串或者是命令列表。

    stdin,stdout,stderr

    默认均为None,如果需要与程序进行交互,则需要设置stdin,如需获取输出和错误,则需要设置stdout和stderr。stdout和stderr一般可设置为subprocess.PIPE,文件对象或subprocess.DEVNULL,  另外stderr可设置为subprocess.STDOUT, 表示将错误重定向到stderr制定的管道。  

     shell

     subprocess.run里默认为False。设置为True时,可以认为就和在linux控制执行命令一样,支持grep类似操作。此时args参数可以是复杂字符串。设置为False时,如果args命令中是包含参数的,则必须以命令列表方式传入。一般通过shlex.split(string)处理即可。

     universal_newlines

     设置为True时,stdin,stdout,stderr均已字符串方式读写,否则以字节流方式读写。

     cwd

     设置程序的执行路径,运行非shell命令时,可以用来制定程序的所在路径。

     timeout

     命令超时时间,当程序超过制定时间还没有返回时,会抛出超时异常

    subprocess 非等待调用

      执行命令后不等待程序返回。这里使用的是subprocess.Popen对象。这里给出的例子加上了守护程序,适合需要长时间保证程序正常执行的情况。

      可以设定检查间隔时间,根据程序的returncode(None表示正常运行,0为正常退出,其他均为非正常结束)判断是否正常执行。下面代码中是只有kill -9或者是程序自己正常退出时才会结束,其他异常退出的情况程序会重新启动(包括kill命令)

      同时为了防止无限重启,加入健康检查机制,超过一定次数连续运行失败则停止运行

      这里使用多进程方式进行调用,方便按照此方式启动多个程序

        #longtime task
        def runapp(command, logfile, check_interval=5, health_check=3):
            applog= open(logfile, 'w') #使用日志文件记录app输出
            app = subprocess.Popen(command, stdout=applog, stderr=subprocess.STDOUT,universal_newlines=True) #错误重新向到输出一并记录
            fail_count = 0
            while True:
                time.sleep(check_interval)  #检查间隔
                logging.debug("args {}".format(app.args))
                logging.debug("pid {}".format(app.pid))
                app.poll() #通过poll方法设置程序的returncode,即调用poll方法后获取到的returncode才是实时的
                if app.returncode != None:
                    if app.returncode != 0:
                        fail_count +=1  #非0表示程序异常退出
                    if fail_count >= health_check :
                        logging.debug("health check fail,stop app!")
                        break
                    if app.returncode != 0 and app.returncode != -9: #正常结束或是被kill -9方式杀掉后退出,否则重启程序
                        logging.debug("app restart!")
                        app.__init__(command, stdout=applog, stderr=subprocess.STDOUT,universal_newlines=True)
                    else:
                        logging.debug("app eixt, code {}".format(app.returncode))
                        break
                else:
                    fail_count = 0
        #命令参数处理
        command = shlex.split("ffmpeg -i rtmp://rtmp-push.source.test/live/i3jMugm0 -c:v copy -c:a copy -f flv rtmp://rtmp-push.source.test/live/pwtest12345")
        #子进程方式
        p = Process(target = runapp, args=(command, "ffmpeg.log"))
        p.start()
        #p.join()

    delegator 与程序进行交互(非阻塞)

      这里使用vi编辑器作为例子,演示了使用v编辑文件的方式。可应用在类似场景中,但在某些操作上可能需要加些延时才能保证执行的正确性。

      需要注意block参数必须设置为False,send方法带上参数withend=False时不发送回车(这里的send方法默认会加上一个回车,因此这里对delegator.py中的send方法进行了修改,兼容不需要发送回车的情况。修改的部分放在最后。)。

      另外下面倒数第二条发送的其实是一个ESC字符(chr(27),显示退格符号),但这里显示为空字符了。

        #send string
        command = ['vi','test.txt']
        editer = delegator.run(command, block=False) #注意block参数要设置为False
        time.sleep(1)                                #程序启动后,首次发送信息时,加入一点延迟
        editer.send('i', withend=False)
        editer.send('12345', withend=False)
        editer.send('', withend=False)
        time.sleep(1)                                #程序退出编辑模式进入命令模式时,加入一点延迟
        editer.send(':wq!')
        editer.kill()

    delegator 与程序进行交互(阻塞式)

      调用程序的另外一个场景,就是等待程序出现某提示时,再输入信息。下面通过一个延时设置密码的脚本执行进程进程演示。

      主要注意expect方法的使用,第一个参数为等待出现的信息,可以使用正则表达式,第二个参数为超时时间,超过时间但没有出现逾期的内容,则会抛出异常

      passwd.sh内容:

    #!/bin/sh
    sleep 3
    passwd

      设置密码脚本:

        command = ['./passwd.sh']
        input =  delegator.run(command, block=False) #注意block参数要设置为False
        try:
            input.expect('New password:',timeout=5)  #脚本延迟3秒运行passwd命令,这里等待5秒 出现'New password:'
        except Exception as e:
            logging.exception(e)
        else:
            input.send("123")
        try:
            input.expect('Retype password:',timeout=2)
        except Exception as e:
            logging.exception(e)        
        else:
            input.send("456")
        input.kill()
        logging.info(input.out)

    对delegator.py 的修改部分

        def send(self, s, end=os.linesep, signal=False, withend=True):
            """Sends the given string or signal to std_in."""
    
            if self.blocking:
                raise RuntimeError("send can only be used on non-blocking commands.")
    
            if not signal:
                if self._uses_subprocess:
                    return self.subprocess.communicate((s + end) if withend else s)
                else:
                    return self.subprocess.send((s + end) if withend else s)
            else:
                self.subprocess.send_signal(s)
  • 相关阅读:
    vue---mixins的用法
    vue---slot,slot-scoped,以及2.6版本之后插槽的用法
    Java实现DDD中UnitOfWork
    redis基础及redis特殊场景使用描述
    网易一千零一夜 读后初感
    产品经理与众不同的思维方式与“职业病”——《人人都是产品经理》
    【Ubuntu14】Nginx+PHP5+Mysql记录
    A标签/按钮防止重复提交&页面Loading制作
    PHPCMS v9 二次开发_验证码结合Session开发
    eclipse 编码设置【转】
  • 原文地址:https://www.cnblogs.com/wp2ypy/p/10350712.html
Copyright © 2011-2022 走看看