zoukankan      html  css  js  c++  java
  • 【第五章】接口关联 正则表达式jsonpath模块

      这一章节说到的是正则表达式模块跟jsonpthon模块  以及接口参数化的操作

       

      

    代码地址:https://gitee.com/todayisgoodday/PythonRequest

      简单介绍一下正则表达式跟jsonpath模块

      re模块是python独有的匹配字符串的模块,该模块中提供的很多功能是基于正则表达式实现的,而正则表达式是对字符串进行模糊匹配,提取自己需要的字符串部分,他对所有的语言都通用。注意:

    • re模块是python独有的
    • 正则表达式所有编程语言都可以使用
    • re模块、正则表达式是对字符串进行操作
    • json_path_demo_06.py
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/9 15:41
    # @Author : Limusen
    # @File : json_path_demo_06
    import re
    
    import jsonpath
    
    """
    
    jsonpath 第三方模块
    
    下载方式 
    pip install jsonpath
    
    """
    print("========================json_path=========================")
    json_data = {"name": "哈利波特", "age": 22, "books": [{"bn": "假如给我三天光明", "p": 32.3}, {"bn": "朝花夕拾", "p": 33.3}]}
    # jsonpath返回的是列表 需要用下标取值
    #  $ 表示根节点, .表示字典的key []表示列表的下标
    value1 = jsonpath.jsonpath(json_data, "$.age")[0]
    print(value1)
    value2 = jsonpath.jsonpath(json_data, "$.books[1].bn")[0]
    print(value2)
    value3 = jsonpath.jsonpath(json_data, "$.books[1].p")[0]
    print(value3)
    print("========================json_path=========================")
    
    print("========================re=========================")
    
    """
    
    re模块可以参考: https://www.cnblogs.com/shenjianping/p/11647473.html
    
    """
    
    # 正则表达式
    str_01 = "helloword669582"
    str_01 = re.findall("hell(.+?)69582", str_01)[0]
    print(str_01)
    
    str1 = "hello123kkkas567"
    value4 = re.findall("o(\d+)k", str1)[0]
    print(value4)
    
    print("========================re=========================")
    • 通过字符串的replace方法分割正则提取器的值
    • replace_demo_07.py
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/9 15:58
    # @Author : Limusen
    # @File : replace_demo_07
    
    
    """
    
    配合re模块进行参数替换
    
    """
    
    import re
    
    dict_01 = {"token": "88669952sss"}
    str1 = '{"access_token":"${token}"}'
    
    # 找出我们需要替换的值
    variables_list = re.findall("\\${.+?}", str1)
    print(variables_list)
    
    # 取variables_list列表的第一个值,然后通过 从${ 展位为2   }为-1  分割字段进行替换  \\${.+?}
    str1 = str1.replace(variables_list[0], dict_01[variables_list[0][2:-1]])
    print(str1)


    • 在excel里面我们已经定义好的取值方式 可以根据 正则 jsonpath 或者没有

    • 优化request_utils模块底层方法 get 跟post
    • 单独拿一个接口出来封装,以免导致之前的代码也不能运行
    • request_demo_08.py
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/9 16:17
    # @Author : Limusen
    # @File : request_demo_08
    
    import re
    import json
    import jsonpath
    import requests
    from common.config_utils import local_config
    
    
    class RequestsUtils:
    
        def __init__(self):
            # 封装好的配置文件直接拿来用
            self.hosts = local_config.get_hosts
            # 全局session调用
            self.session = requests.session()
            # 临时变量存储正则或者jsonpath取到的值
            self.temp_variables = {}
    
        def get(self, request_info):
            """
            get请求封装
            :param request_info:
            :return:
            """
            #  request_info 是我们封装好的数据,可以直接拿来用
            url = "https://%s%s" % (self.hosts, request_info["请求地址"])
            response = self.session.get(url=url,
                                        params=json.loads(
                                            requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                        # 头部信息有时候可能为空,这里运用到三元运算符如果为空则设置为None
                                        headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                        )
            if request_info["取值方式"] == "正则取值":
                value = re.findall(request_info['取值代码'], response.text)[0]
                self.temp_variables[request_info["取值变量"]] = value
            elif request_info["取值方式"] == "jsonpath取值":
                value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0]
                self.temp_variables[request_info["取值变量"]] = value
            print(self.temp_variables)
            
            # result = {
            #     "code": 0,
            #     "response_code": response.status_code,
            #     "response_reason": response.reason,
            #     "response_headers": response.headers,
            #     "response_body": response.text
            # }
            # return result
    
    
    if __name__ == '__main__':
        requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                         '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                         '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f8f0d","secret":"501123d2dd0f084"}',
                         '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                         '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
    
        res = RequestsUtils()
        res.get(requests_info)
        # print(res.get(requests_info))

    • 咱们要雨露均沾 post方法也封装一下
     def __post(self, request_info):
            """
            post请求封装
            :param request_info:
            :return:
            """
            url = "https://%s%s" % (self.hosts, request_info["请求地址"])
            response = self.session.post(url=url,
                                         params=json.loads(request_info['请求参数(get)']),
                                         # params=json.loads(
                                         #     requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                         headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                         json=json.loads(request_info["请求参数(post)"])
                                         )
            if request_info["取值方式"] == "正则取值":
                value = re.findall(request_info["取值代码"], response.text)[0]
                self.temp_variables[request_info["取值变量"]] = value
            elif request_info["取值方式"] == "jsonpath取值":
                value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0]
                self.temp_variables[request_info["取值变量"]] = value
    
            result = {
                "code": 0,
                "response_code": response.status_code,
                "response_reason": response.reason,
                "response_headers": response.headers,
                "response_body": response.text
            }
            return result
    • 接下来封装请求参数的参数化信息
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/9 16:17
    # @Author : Limusen
    # @File : request_demo_08
    
    import re
    import json
    import jsonpath
    import requests
    from common.config_utils import local_config
    
    
    class RequestsUtils:
    
        def __init__(self):
            # 封装好的配置文件直接拿来用
            self.hosts = local_config.get_hosts
            # 全局session调用
            self.session = requests.session()
            # 临时变量存储正则或者jsonpath取到的值
            self.temp_variables = {}
    
        def __get(self, request_info):
            """
            get请求封装
            :param request_info:
            :return:
            """
            #  request_info 是我们封装好的数据,可以直接拿来用
            url = "https://%s%s" % (self.hosts, request_info["请求地址"])
            variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"])
            for variable in variables_list:
                request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]])
    
            response = self.session.get(url=url,
                                        params=json.loads(request_info["请求参数(get)"]),
                                        # params=json.loads(
                                        #     requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                        headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                        )
            if request_info["取值方式"] == "正则取值":
                value = re.findall(request_info["取值代码"], response.text)[0]
                self.temp_variables[request_info["取值变量"]] = value
            elif request_info["取值方式"] == "jsonpath取值":
                value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0]
                self.temp_variables[request_info["取值变量"]] = value
    
            result = {
                "code": 0,
                "response_code": response.status_code,
                "response_reason": response.reason,
                "response_headers": response.headers,
                "response_body": response.text
            }
            return result
    
        def __post(self, request_info):
            """
            post请求封装
            :param request_info:
            :return:
            """
            url = "https://%s%s" % (self.hosts, request_info["请求地址"])
            variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"])
            for variable in variables_list:
                request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]])
    
            variables_list = re.findall('\\${.+?}', request_info["请求参数(post)"])
            for variable in variables_list:
                request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable,
                                                                                self.temp_variables[variable[2:-1]])
            response = self.session.post(url=url,
                                         params=json.loads(request_info['请求参数(get)']),
                                         # params=json.loads(
                                         #     requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                         headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                         json=json.loads(request_info["请求参数(post)"])
                                         )
            if request_info["取值方式"] == "正则取值":
                value = re.findall(request_info["取值代码"], response.text)[0]
                self.temp_variables[request_info["取值变量"]] = value
            elif request_info["取值方式"] == "jsonpath取值":
                value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0]
                self.temp_variables[request_info["取值变量"]] = value
    
            result = {
                "code": 0,
                "response_code": response.status_code,
                "response_reason": response.reason,
                "response_headers": response.headers,
                "response_body": response.text
            }
            return result
    
        def request(self, request_info):
            """
            封装方法自动执行post或者get方法
            :param request_info:
            :return:
            """
            request_type = request_info['请求方式']
            if request_type == "get":
                # 私有化方法,其他类均不可调用
                result = self.__get(request_info)
            elif request_type == "post":
                result = self.__post(request_info)
            else:
                result = {"code": 1, "error_message": "当前请求方式暂不支持!"}
            return result
    
        def request_steps(self, request_steps):
            """
            按照列表测试用例顺序执行测试用例
            :param request_steps:
            :return:
            """
            for request in request_steps:
                result = self.request(request)
                if result['code'] != 0:
                    break
            return result
    
    
    if __name__ == '__main__':
        requests_info = [
            {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01', '接口名称': '获取access_token接口',
             '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
             '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}',
             '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token_value', '断言类型': 'json_key',
             '期望结果': 'access_token,expires_in'},
            {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_02', '接口名称': '创建标签接口',
             '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"${token_value}"}',
             '请求参数(post)': '{   "tag" : {     "name" : "asssqwe" } } ', '取值方式': '', '取值代码': '"id":(.+?),',
             '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''}
        ]
    
        res = RequestsUtils()
        print(res.request_steps(requests_info))
    • 运行通过后放入 request_utils.py
    # -*- coding: utf-8 -*-
    # @Time : 2021/12/9 14:37
    # @Author : Limusen
    # @File : request_utils
    
    import re
    import json
    import jsonpath
    import requests
    from common.config_utils import local_config
    
    
    class RequestsUtils:
    
        def __init__(self):
            # 封装好的配置文件直接拿来用
            self.hosts = local_config.get_hosts
            # 全局session调用
            self.session = requests.session()
            self.temp_variables = {}
    
        def __get(self, request_info):
            """
            get请求封装
            :param request_info:
            :return:
            """
            #  request_info 是我们封装好的数据,可以直接拿来用
            url = "https://%s%s" % (self.hosts, request_info["请求地址"])
            variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"])
            for variable in variables_list:
                request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]])
    
            response = self.session.get(url=url,
                                        params=json.loads(request_info["请求参数(get)"]),
                                        # params=json.loads(
                                        #     requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                        headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                        )
            if request_info["取值方式"] == "正则取值":
                value = re.findall(request_info["取值代码"], response.text)[0]
                self.temp_variables[request_info["取值变量"]] = value
            elif request_info["取值方式"] == "jsonpath取值":
                value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0]
                self.temp_variables[request_info["取值变量"]] = value
    
            result = {
                "code": 0,
                "response_code": response.status_code,
                "response_reason": response.reason,
                "response_headers": response.headers,
                "response_body": response.text
            }
            return result
    
        def __post(self, request_info):
            """
            post请求封装
            :param request_info:
            :return:
            """
            url = "https://%s%s" % (self.hosts, request_info["请求地址"])
            variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"])
            for variable in variables_list:
                request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]])
    
            variables_list = re.findall('\\${.+?}', request_info["请求参数(post)"])
            for variable in variables_list:
                request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable,
                                                                                self.temp_variables[variable[2:-1]])
            response = self.session.post(url=url,
                                         params=json.loads(request_info['请求参数(get)']),
                                         # params=json.loads(
                                         #     requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                         headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                         json=json.loads(request_info["请求参数(post)"])
                                         )
            if request_info["取值方式"] == "正则取值":
                value = re.findall(request_info["取值代码"], response.text)[0]
                self.temp_variables[request_info["取值变量"]] = value
            elif request_info["取值方式"] == "jsonpath取值":
                value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0]
                self.temp_variables[request_info["取值变量"]] = value
    
            result = {
                "code": 0,
                "response_code": response.status_code,
                "response_reason": response.reason,
                "response_headers": response.headers,
                "response_body": response.text
            }
            return result
    
        def request(self, request_info):
            """
            封装方法自动执行post或者get方法
            :param request_info:
            :return:
            """
            request_type = request_info['请求方式']
            if request_type == "get":
                # 私有化方法,其他类均不可调用
                result = self.__get(request_info)
            elif request_type == "post":
                result = self.__post(request_info)
            else:
                result = {"code": 1, "error_message": "当前请求方式暂不支持!"}
            return result
    
        def request_steps(self, request_steps):
            """
            按照列表测试用例顺序执行测试用例
            :param request_steps:
            :return:
            """
            for request in request_steps:
                result = self.request(request)
                if result['code'] != 0:
                    break
            return result
    
    
    if __name__ == '__main__':
        requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                         '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                         '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}',
                         '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                         '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
        requests_info_post = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_02',
                              '接口名称': '创建标签接口', '请求方式': 'post',
                              '请求头部信息': '', '请求地址': '/cgi-bin/tags/create',
                              '请求参数(get)': '{"access_token":"51_136b9lRBH4SdLbSYI9C_1Sf1OogELivPJPNZ5z1mTzekmp3Yg4XQn8mx-sb3WxxV99NRWAX5CQhVIF6-uY12H_nRDjmEJ7H7oEbz9-qNHWV1g04V2t-29pslCsiuaSxIrkUChv4a2rPwdhnEEMHeADAMUP"}',
                              '请求参数(post)': '{   "tag" : {     "name" : "snsnssn" } } ', '取值方式': '正则取值',
                              '取值代码': '"id":(.+?),',
                              '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''}
        requests_info_list = [
            {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01', '接口名称': '获取access_token接口',
             '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
             '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb63f0bf1f0d","secret":"501123d29a9011d0f084"}',
             '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token_value', '断言类型': 'json_key',
             '期望结果': 'access_token,expires_in'},
            {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_02', '接口名称': '创建标签接口',
             '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"${token_value}"}',
             '请求参数(post)': '{   "tag" : {     "name" : "newtest" } } ', '取值方式': '', '取值代码': '"id":(.+?),',
             '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''}
        ]
    
        res = RequestsUtils()
        # res.get(requests_info)
        # res.post(requests_info_post)
        # print(res.request(requests_info_post))
        print(res.request_steps(requests_info_list))


    • 现在我们request模块的正则也封装好了,下一章节我们将讲述一下封装断言库
    欢迎转载,请注明出处: https://www.cnblogs.com/yushengaqingzhijiao/p/15668351.html
  • 相关阅读:
    EDA cheat sheet
    numpy.bincount()
    非负矩阵分解的两种方法简析
    Python列表解析和字典解析
    Pandas使用groupby()时是否会保留顺序?
    Reduce pandas memory size
    Understanding the Transform Function in Pandas
    What’s up with the Graph Laplacian
    如何在github上下载单个文件夹
    TCP回射服务器修订版(ubuntu 18.04)
  • 原文地址:https://www.cnblogs.com/yushengaqingzhijiao/p/15668351.html
Copyright © 2011-2022 走看看