zoukankan      html  css  js  c++  java
  • Python 实时获取任务请求对应的Nginx日志

    需求描述

    项目需求测试过程中,需要向Nginx服务器发送一些用例请求,然后查看对应的Nginx日志,判断是否存在特征内容,来判断任务是否执行成功。为了提升效率,需要将这一过程实现自动化。

    实践环境

    Python 3.6.5

    代码设计与实现

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    
    '''
    @CreateTime: 2021/06/26 9:05
    @Author : shouke
    '''
    
    
    import time
    import threading
    import subprocess
    from collections import deque
    
    
    def collect_nginx_log():
        global nginx_log_queue
        global is_tasks_compete
        global task_status
    
    
        args = 'tail -0f /usr/local/openresty/nginx/logs/access.log'
        while task_status != 'req_log_got':
            with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc:
                log_for_req = ''
                outs, errs = '', ''
    
                try:
                    outs, errs = proc.communicate(timeout=2)
                except subprocess.TimeoutExpired:
                    print('获取nginx日志超时,正在重试')
                    proc.kill()
                    try:
                        outs, errs = proc.communicate(timeout=5)
                    except subprocess.TimeoutExpired:
                        print('获取nginx日志超时,再次超时,停止重试')
                        break
                finally:
                    for line in outs.split('
    '):
                        flag = '"client_ip":"10.118.0.77"' # 特征
                        if flag in line: # 查找包含特征内容的日志
                            log_for_req += line
    
                    if task_status == 'req_finished':
                        nginx_log_queue.append(log_for_req)
                        task_status = 'req_log_got'
    
    
    
    def run_tasks(task_list):
        '''
        运行任务
        :param task_list 任务列表
        '''
    
        global nginx_log_queue
        global is_tasks_compete
        global task_status
    
    
        for task in task_list:
            thread = threading.Thread(target=collect_nginx_log,
                                        name="collect_nginx_log")
            thread.start()
            time.sleep(1) # 执行任务前,让收集日志线程先做好准备
    
            print('正在执行任务:%s' % task.get('name'))
    
            # 执行Nginx任务请求
            # ...
    
            task_status = 'req_finished'
            time_to_wait = 0.1
            while task_status != 'req_log_got': # 请求触发的nginx日志收集未完成
                time.sleep(time_to_wait)
                time_to_wait += 0.01
            else:# 获取到用例请求触发的nginx日志
                if nginx_log_queue:
                    nginx_log = nginx_log_queue.popleft()
                    task_status = 'req_ready'
                    # 解析日志
                    # do something here
                    # ...
                else:
                    print('存储请求日志的队列为空')
                    # do something here
                    # ...
    
    
    if __name__ == '__main__':
        nginx_log_queue = deque()
        is_tasks_compete = False # 所有任务是否执行完成
    
        task_status = 'req_ready' # req_ready,req_finished,req_log_got  # 存放执行次任务任务的一些状态
        print('###########################任务开始###########################')
    
        tast_list = [{'name':'test_task', 'other':'...'}]
        run_tasks(tast_list)
    
        is_tasks_compete = True
    
        current_active_thread_num = len(threading.enumerate())
        while current_active_thread_num != 1:
            time.sleep(2)
            current_active_thread_num = len(threading.enumerate())
        print('###########################任务完成###########################')
    

    注意:

    1、上述代码为啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"呢?这是因为这样做无法获取到Nginx的日志

    2、实践时发现,第一次执行proc.communicate(timeout=2)获取日志时,总是无法获取,会超时,需要二次获取,并且timeout设置太小时(实践时尝试过设置为1秒),也会导致第二次执行时无法获取Nginx日志。

    作者:授客
    公众号:授客的知识库
    QQ:1033553122
    全国软件测试QQ交流群:7156436

    Git地址:https://gitee.com/ishouke
    友情提示:限于时间仓促,文中可能存在错误,欢迎指正、评论!
    作者五行缺钱,如果觉得文章对您有帮助,请扫描下边的二维码打赏作者,金额随意,您的支持将是我继续创作的源动力,打赏后如有任何疑问,请联系我!!!
                微信打赏                       支付宝打赏                        授课的知识库               全国软件测试交流QQ群  
                          

  • 相关阅读:
    内置函数的补充
    python3 集合中的常用方法
    Salesforce: ISCHANGED在workflow中的使用
    Salesforce: setTreatTargetObjectAsRecipient的使用
    python实现用户登录次数太多账号"锁定"
    docker命令
    scrapy框架的安装
    分布式爬虫
    scrapy框架mongodb正规存储
    redis
  • 原文地址:https://www.cnblogs.com/shouke/p/14993393.html
Copyright © 2011-2022 走看看