zoukankan      html  css  js  c++  java
  • 【优化框架】提供参数动态化替换,正则表达式提取数据优化

    • 提供参数动态化替换,正则表达式提取数据优化

    代码地址:
    https://gitee.com/todayisgoodday/P9P10_API_FRAME/blob/master/common/request_utils.py
    • 期望的效果

    • 优化数据demo
    代码优化:
    
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/24 13:46
    # @Author : Limusen
    # @File : regular_26
    
    import re
    import jsonpath
    
    #  示例代码
    num_list = [1, 2, 3, 4]
    name_list = ["one", "two", "three", "four"]
    # print({name_list[i]: num_list[i] for i in range(len(num_list))})
    
    print('#######单个值替换#######')
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求方式': 'get',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': '正则取值',
                    '取值代码': '"userId":"(.+?)"', '取值变量': 'userid',
                    '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'}
    value = '"data":"{"测试用例编号":"api_cese_01","请求参数(get)": "","userId":"测试奥什","phone":"17717870595","取值变量": "userid,nick_name","nickName":"恭喜发财"}"'
    
    # 取单个值
    temp_variables = {}
    if request_info["取值方式"] == "正则取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for c in range(0, len(value_list)):  # 取出总共需要替换的次数,每次循环都一次替换两个值
            temp_variables[key_list[c]] = re.findall(value_list[c], value)[0]
    print(temp_variables)
    print('#######单个值替换#######')
    
    print("\n")
    
    print('#######多个值替换#######')
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求方式': 'get',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '', '取值方式': '正则取值',
                    '取值代码': '"userId":"(.+?)","测试用例编号":"(.+?)"', '取值变量': 'userid,case_id',
                    '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'}
    value = '"data":"{"测试用例编号":"api_cese_01","请求参数(get)": "","userId":"测试奥什","phone":"17717870595","取值变量": "userid,nick_name","nickName":"恭喜发财"}"'
    
    #  取多个值
    temp_variables = {}
    if request_info["取值方式"] == "正则取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for c in range(0, len(value_list)):
            temp_variables[key_list[c]] = re.findall(value_list[c], value)[0]
    print(temp_variables)
    print('#######多个值替换#######')
    
    print('\n')
    
    print('#######优化替换正则模块#######')
    
    result_list = '{"code": 100001,"desc": "成功","success": "true","data": "{"token": "I0k",  "activation":"true"}"}'
    result_json = {"code": "100001", "desc": "成功", "success": "true", "data": {"token": "I0k", "activation": "true"}}
    
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '',
                    '取值方式': '正则取值',
                    '取值代码': '"code": (\d+),"success": "(.+?)"', '取值变量': 'code,success',
                    '断言类型': '正则取值', '期望结果': '{"code":"100001"}'}
    temp_variables = {}
    if request_info["取值方式"] == "正则取值":
        # 注意:取值代码与取值变量需要对应,否则会报错,用逗号分割数据
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for index in range(0, len(value_list)):
            # 如果这个地方报错了,请查看正则表达式或者jsonpath取值模板是否有问题
            # 这里需要注意的是 取出字符串数据,中间是没有空格的
            # 例如"code": "100001"  正则模板应该为 "code":"(.+?)"
            temp_variables[key_list[index]] = re.findall(value_list[index], result_list)[0]
    print(temp_variables)
    
    print('#######优化替换正则模块#######')
    
    print('#######单个值替换#######')
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求方式': 'get',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '',
                    '取值方式': 'jsonpath取值',
                    '取值代码': '$..userId', '取值变量': 'userId',
                    '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'}
    json_path_list = {
        "access_token": "52_obqRF34JqB59FMSh",
        "expires_in": 7200,
        "sads": 545,
        "data": {"测试用例编号": "api_cese_01", "userId": "张二狗", "phone": "17717870595"}
    }
    # 取单个值
    temp_variables = {}
    if request_info["取值方式"] == "jsonpath取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for index in range(0, len(value_list)):
            temp_variables[key_list[index]] = jsonpath.jsonpath(json_path_list, value_list[index])[0]
    print(temp_variables)
    print('#######单个值替换#######')
    
    print('#######多个值替换#######')
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求方式': 'get',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '',
                    '取值方式': 'jsonpath取值',
                    '取值代码': '$..userId,$.sads', '取值变量': 'userId,sads',
                    '断言类型': 'json_key_value', '期望结果': '{"code":"100001"}'}
    json_path_list = {
        "access_token": "52_obqRF34JqB59FMSh",
        "expires_in": 7200,
        "sads": 545,
        "data": {"测试用例编号": "api_cese_01", "userId": "张二狗", "phone": "17717870595"}
    }
    # 取单个值
    temp_variables = {}
    if request_info["取值方式"] == "jsonpath取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for index in range(0, len(value_list)):
            temp_variables[key_list[index]] = jsonpath.jsonpath(json_path_list, value_list[index])[0]
    print(temp_variables)
    print('#######多个值替换#######')
    
    print('#######优化替换jsonpath模块#######')
    
    result_json = {"code": "100001", "desc": "成功", "success": "true", "data": {"token": "I0k", "activation": "true"}}
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '',
                    '取值方式': 'jsonpath取值',
                    '取值代码': '$.code,$..token', '取值变量': 'code,token',
                    '断言类型': 'jsonpath取值', '期望结果': '{"code":"100001"}'}
    temp_variables = {}
    if request_info["取值方式"] == "jsonpath取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for index in range(0, len(value_list)):
            temp_variables[key_list[index]] = jsonpath.jsonpath(result_json, value_list[index])[0]
    print(temp_variables)
    
    print('#######优化替换jsonpath模块#######')
    
    print('####### 整合替换request_utils.py模块 #######')
    
    result_list = '{"code": 100001,"desc": "成功","success": "true","data": "{"token": "I0k",  "activation":"true"}"}'
    result_json = {"code": "100001", "desc": "成功", "success": "true", "data": {"token": "I0k", "activation": "true"}}
    
    request_info = {'测试用例编号': 'api_cese_01', '测试用例名称': '获取用户信息', '用例执行': '', '用例步骤': 'step_02', '接口名称': '获取用户信息',
                    '请求地址': '/gateway/system/user/userInfo/oneself', '请求参数(get)': '', '请求参数(post)': '',
                    '取值方式': '正则取值',
                    '取值代码': '"code": (\d+)', '取值变量': 'code',
                    '断言类型': '正则取值', '期望结果': '{"code":"100001"}'}
    temp_variables = {}
    if request_info["取值方式"] == "正则取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for index in range(0, len(value_list)):
            # 如果这个地方报错了,请查看正则表达式或者jsonpath取值模板是否有问题
            # 这里需要注意的是 取出字符串数据,中间是没有空格的
            # 例如"code": "100001"  正则模板应该为 "code":"(.+?)"
            temp_variables[key_list[index]] = re.findall(value_list[index], result_list)[0]
    elif request_info["取值方式"] == "jsonpath取值":
        value_list = request_info['取值代码'].split(',')
        key_list = request_info['取值变量'].split(',')
        for index in range(0, len(value_list)):
            temp_variables[key_list[index]] = jsonpath.jsonpath(result_json, value_list[index])[0]
    print(temp_variables)
    
    print('####### 整合替换request_utils.py模块 #######')

    • 替换request_utils.py中的内容
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/9 14:37
    # @Author : Limusen
    # @File : request_utils
    
    import re
    import json
    
    import allure
    import jsonpath
    import requests
    from faker import Faker
    from requests.exceptions import ConnectionError
    from requests.exceptions import ProxyError
    from requests.exceptions import RequestException
    from common.config_utils import local_config
    from common.check_utils import CheckUtils
    from nb_log import LogManager
    
    logger = LogManager('Api_Test').get_logger_and_add_handlers(
        is_add_stream_handler=True,
        log_filename=local_config.LOG_NAME,
    )
    
    fake = Faker('zh_CN')
    
    
    class RequestsUtils:
    
        def __init__(self):
            # 封装好的配置文件直接拿来用
            self.hosts = local_config.HOSTS
            # 全局session调用
            self.session = requests.session()
            self.temp_variables = {}
    
        def __get(self, request_info):
            """
            get请求封装
            :param request_info:
            :return:
            """
            try:
                #  request_info 是我们封装好的数据,可以直接拿来用
                url = "https://%s%s" % (self.hosts, request_info["请求地址"])
                variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"])
                for variable in variables_list:
                    request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable,
                                                                                  self.temp_variables[variable[2:-1]])
                # 新增头部获取正确值的判断
                variables_list = re.findall('\\${.+?}', request_info["请求头部信息"])
                for variable in variables_list:
                    request_info["请求头部信息"] = request_info["请求头部信息"].replace(variable,
                                                                            self.temp_variables[variable[2:-1]])
                response = self.session.get(url=url,
                                            params=json.loads(request_info["请求参数(get)"]) if request_info[
                                                "请求参数(get)"] else None,
                                            headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                            )
                if request_info["取值方式"] == "正则取值":
                    value_list = request_info['取值代码'].split(',')
                    key_list = request_info['取值变量'].split(',')
                    for index in range(0, len(value_list)):
                        # 如果总是抛错超出索引范围就需要查看,请查看正则表达式或者jsonpath取值模板是否有问题
                        # 这里需要注意的是 取出字符串数据,中间是没有空格的
                        # 例如"code":"100001"  正则模板应该为 "code":"(\d+)" 遇到数字请使用(\d+) 可以学习一下正则表达式提取
                        self.temp_variables[key_list[index]] = re.findall(value_list[index], response.text)[0]
                elif request_info["取值方式"] == "jsonpath取值":
                    value_list = request_info['取值代码'].split(',')
                    key_list = request_info['取值变量'].split(',')
                    for index in range(0, len(value_list)):
                        self.temp_variables[key_list[index]] = jsonpath.jsonpath(response.json(), value_list[index])[0]
    
                result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"])
    
            except Exception as e:
                result = {"code": 3,
                          "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()),
                          "check_result": False}
                logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__()))
                raise
            logger.info(response.text)
            return result
    
        def __post(self, request_info):
            """
            post请求封装
            :param request_info:
            :return:
            """
            try:
                url = "https://%s%s" % (self.hosts, request_info["请求地址"])
                logger.info(self.temp_variables)
                variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"])
                for variable in variables_list:
                    request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable,
                                                                                  self.temp_variables[variable[2:-1]])
                variables_list = re.findall('\\${.+?}', request_info["请求头部信息"])
                for variable in variables_list:
                    request_info["请求头部信息"] = request_info["请求头部信息"].replace(variable,
                                                                            self.temp_variables[variable[2:-1]])
                variables_list = re.findall('\\${.+?}', request_info["请求参数(post)"])
                for variable in variables_list:
                    request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable,
                                                                                    self.temp_variables[variable[2:-1]])
                # 新增的如果请求参数之中需要用到动态数据,则进行替换
                variables_list = re.findall('{{.+?}}', request_info['请求参数(post)'])
                for variable in variables_list:
                    if variable == "{{rand_name}}":
                        request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, fake.name())
                    elif variable == "{{rand_phone}}":
                        request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, fake.phone_number())
                    elif variable == "{{rand_code}}":
                        # fake.random_number(digits=4) 生成长度为4的code
                        request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable,
                                                                                        str(fake.random_number(
                                                                                            digits=4)))
                response = self.session.post(url=url,
                                             params=json.loads(request_info["请求参数(get)"]) if request_info[
                                                 "请求参数(get)"] else None,
                                             headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                             json=json.loads(request_info["请求参数(post)"])
                                             )
                if request_info["取值方式"] == "正则取值":
                    value_list = request_info['取值代码'].split(',')
                    key_list = request_info['取值变量'].split(',')
                    for index in range(0, len(value_list)):
                        # 如果总是抛错超出索引范围就需要查看,请查看正则表达式或者jsonpath取值模板是否有问题
                        # 这里需要注意的是 取出字符串数据,中间是没有空格的
                        # 例如"code":"100001"  正则模板应该为 "code":"(\d+)" 遇到数字请使用(\d+) 可以学习一下正则表达式提取
                        self.temp_variables[key_list[index]] = re.findall(value_list[index], response.text)[0]
                elif request_info["取值方式"] == "jsonpath取值":
                    value_list = request_info['取值代码'].split(',')
                    key_list = request_info['取值变量'].split(',')
                    for index in range(0, len(value_list)):
                        self.temp_variables[key_list[index]] = jsonpath.jsonpath(response.json(), value_list[index])[0]
                        logger.info(self.temp_variables)
                result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"])
    
            except Exception as e:
                result = {"code": 3,
                          "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()),
                          "check_result": False}
                logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__()))
                raise
            logger.info(response.text)
    
            return result
    
        def request(self, request_info):
            """
            封装方法自动执行post或者get方法
            :param request_info:
            :return:
            """
            with allure.step("==步骤{}:接口{}开始调用==".format(request_info["用例步骤"], request_info["接口名称"])):
                logger.info("==步骤{}:接口{}开始调用==".format(request_info["用例步骤"], request_info["接口名称"]))
            request_type = request_info['请求方式']
            if request_type == "get":
                # 私有化方法,其他类均不可调用
                result = self.__get(request_info)
            elif request_type == "post":
                result = self.__post(request_info)
            else:
                result = {"code": 1, "error_message": "当前请求方式暂不支持!", "check_result": False}
            with allure.step("==步骤{}:接口{}调用结束==".format(request_info["用例步骤"], request_info["接口名称"])):
                logger.info("==步骤{}:接口{}调用结束==".format(request_info["用例步骤"], request_info["接口名称"]))
            return result
    
        def request_steps(self, request_steps):
            """
            按照列表测试用例顺序执行测试用例
            :param request_steps:
            :return:
            """
            for request in request_steps:
                result = self.request(request)
                if result['code'] != 0:
                    break
            return result
    
    
    if __name__ == '__main__':
        requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                         '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                         '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}',
                         '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                         '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
        requests_info_post = {'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '', '用例步骤': 'step_02',
                              '接口名称': '创建标签接口', '请求方式': 'post',
                              '请求头部信息': '{"Authorization":"546545465"}',
                              '请求地址': '/cgi-bin/tags/create',
                              '请求参数(get)': '{"access_token":"52_fNw3P3XinmEjtFJsznb6NZli2hSezPyFj16ZmDMi5uSZ6mkObGQQxSdnc5D8GmZPyT_2zACf6w9zHQOun'
                                           '7IU-FIoIh-tcIjki25tzgUL812cKD6OPL3zkeOlzS7gS9q3CdrltHJoWqd91WS7KRAdAEAWDM"}',
                              '请求参数(post)': '{   "tag" : {     "name" : "{{rand_code}}" } } ', '取值方式': '', '取值代码': '',
                              '取值变量': '', '断言类型': 'json_key', '期望结果': 'tag'}
    
        res = RequestsUtils()
        print(res.request(requests_info_post))
  • 相关阅读:
    JAVA基础补漏--文件读取
    JAVA-Lambda表达式
    JAVA基础补漏--可变参数
    JAVA基础补漏--SET
    Apache ab 测试结果的分析
    同源策略和跨域问题
    php curl 伪造IP来源的实例代码
    HTTP状态码详解
    PHP 根据IP地址获取所在城市
    MySQL MERGE存储引擎 简介及用法
  • 原文地址:https://www.cnblogs.com/yushengaqingzhijiao/p/15727180.html
Copyright © 2011-2022 走看看