-
结合断言库优化request_uitls底层数据
- 先在check_utils新增通过结果跟失败结果
def __init__(self, response_data): self.response_data = response_data self.function = { "none": self.__none_check, "json_key": self.__key_check, "json_key_value": self.__key_value_check, "body_regexp": self.__body_regexp_check, "header_key_check": self.__header_key_check, "header_key_value_check": self.__header_key_value_check, "response_code_check": self.__response_code_check } self.pass_result = { # 通过结果 "code": 0, "response_code": self.response_data.status_code, "response_reason": self.response_data.reason, "response_headers": self.response_data.headers, "response_body": self.response_data.text, "message": "测试用例执行通过", "check_result": True } self.fail_result = { # 失败结果 2 表示断言失败 "code": 2, "response_code": self.response_data.status_code, "response_reason": self.response_data.reason, "response_headers": self.response_data.headers, "response_body": self.response_data.text, "message": "测试用例断言失败,测试用例执行不通过", "check_result": False }
- 将原来的result替换成check_utils的断言方法
def __get(self, request_info): """ get请求封装 :param request_info: :return: """ # request_info 是我们封装好的数据,可以直接拿来用 url = "https://%s%s" % (self.hosts, request_info["请求地址"]) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) response = self.session.get(url=url, params=json.loads(request_info["请求参数(get)"]), # params=json.loads( # requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None), headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None ) if request_info["取值方式"] == "正则取值": value = re.findall(request_info["取值代码"], response.text)[0] self.temp_variables[request_info["取值变量"]] = value elif request_info["取值方式"] == "jsonpath取值": value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0] self.temp_variables[request_info["取值变量"]] = value # 将原来的替换成断言库 result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) return result
- request_utils.py 文件代码
# -*- coding: utf-8 -*- # @Time : 2021/12/9 14:37 # @Author : Limusen # @File : request_utils import re import json import jsonpath import requests from common.config_utils import local_config from common.check_utils import CheckUtils class RequestsUtils: def __init__(self): # 封装好的配置文件直接拿来用 self.hosts = local_config.get_hosts # 全局session调用 self.session = requests.session() self.temp_variables = {} def __get(self, request_info): """ get请求封装 :param request_info: :return: """ # request_info 是我们封装好的数据,可以直接拿来用 url = "https://%s%s" % (self.hosts, request_info["请求地址"]) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) response = self.session.get(url=url, params=json.loads(request_info["请求参数(get)"]), # params=json.loads( # requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None), headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None ) if request_info["取值方式"] == "正则取值": value = re.findall(request_info["取值代码"], response.text)[0] self.temp_variables[request_info["取值变量"]] = value elif request_info["取值方式"] == "jsonpath取值": value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0] self.temp_variables[request_info["取值变量"]] = value result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) return result def __post(self, request_info): """ post请求封装 :param request_info: :return: """ url = "https://%s%s" % (self.hosts, request_info["请求地址"]) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) variables_list = re.findall('\\${.+?}', request_info["请求参数(post)"]) for variable in variables_list: request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, self.temp_variables[variable[2:-1]]) response = self.session.post(url=url, params=json.loads(request_info['请求参数(get)']), # params=json.loads( # requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None), headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None, json=json.loads(request_info["请求参数(post)"]) ) if request_info["取值方式"] == "正则取值": value = re.findall(request_info["取值代码"], response.text)[0] self.temp_variables[request_info["取值变量"]] = value elif request_info["取值方式"] == "jsonpath取值": value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0] self.temp_variables[request_info["取值变量"]] = value result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) return result def request(self, request_info): """ 封装方法自动执行post或者get方法 :param request_info: :return: """ request_type = request_info['请求方式'] if request_type == "get": # 私有化方法,其他类均不可调用 result = self.__get(request_info) elif request_type == "post": result = self.__post(request_info) else: result = {"code": 1, "error_message": "当前请求方式暂不支持!" , "check_result": False} return result def request_steps(self, request_steps): """ 按照列表测试用例顺序执行测试用例 :param request_steps: :return: """ for request in request_steps: result = self.request(request) if result['code'] != 0: break return result if __name__ == '__main__': requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token', '断言类型': 'json_key', '期望结果': 'access_token,expires_in'} requests_info_post = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"51_136b9lRBH4SdLbSYI9C_1Sf1OogELivPJPNZ5z1mTzekmp3Yg4XQn8mx-sb3WxxV99NRWAX5CQhVIF6-uY12H_nRDjmEJ7H7oEbz9-qNHWV1g04V2t-29pslCsiuaSxIrkUChv4a2rPwdhnEEMHeADAMUP"}', '请求参数(post)': '{ "tag" : { "name" : "snsnssn" } } ', '取值方式': '无', '取值代码': '"id":(.+?),', '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''} requests_info_list = [ {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token_value', '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}, {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"${token_value}"}', '请求参数(post)': '{ "tag" : { "name" : "newtest" } } ', '取值方式': '无', '取值代码': '"id":(.+?),', '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''} ] res = RequestsUtils() # res.get(requests_info) # res.post(requests_info_post) print(res.request(requests_info_post)) # print(res.request_steps(requests_info_list))
- check_utils.py 文件代码
# -*- coding: utf-8 -*- # @Time : 2021/12/13 14:07 # @Author : Limusen # @File : check_utils import json import re class CheckUtils: def __init__(self, response_data): """ :param response_data: 响应结果 """ self.response_data = response_data self.function = { "none": self.none_check, "json_key": self.key_check, "json_key_value": self.key_value_check, "body_regexp": self.body_regexp_check, "header_key_check": self.header_key_check, "header_key_value_check": self.header_key_value_check, "response_code_check": self.response_code_check } self.pass_result = { # 通过结果 "code": 0, "response_code": self.response_data.status_code, "response_reason": self.response_data.reason, "response_headers": self.response_data.headers, "response_body": self.response_data.text, "message": "测试用例执行通过", "check_result": True } self.fail_result = { # 失败结果 2 表示断言失败 "code": 2, "response_code": self.response_data.status_code, "response_reason": self.response_data.reason, "response_headers": self.response_data.headers, "response_body": self.response_data.text, "message": "测试用例断言失败,测试用例执行不通过", "check_result": False } def none_check(self): """ 断言类型为空的情况 :return: """ return self.pass_result def key_check(self, check_data): """ 检查键是否相同 :param check_data: 需要检查的字段,注意得是字符串才行,因为要分割 :return: True说明断言成功,False说明断言失败 """ # 字符串逗号分割 key_list = check_data.split(",") tmp_result = [] # 取出需要断言的字段 for check_key in key_list: # 如果 check_key 在json串的键当中,则添加True,不是则添加False if check_key in self.response_data.json().keys(): tmp_result.append(True) else: tmp_result.append(False) if False in tmp_result: # 只要有一个不符合 用例全部失败 return self.fail_result else: return self.pass_result def key_value_check(self, check_data): """ 检查键值对是否一致 :param check_data: :return: """ key_dict = json.loads(check_data) tmp_result = [] for check_key in key_dict.items(): if check_key in self.response_data.json().items(): tmp_result.append(True) else: tmp_result.append(False) if False in tmp_result: return self.fail_result else: return self.pass_result def body_regexp_check(self, check_data): """ 根据正则表达式断言 :param check_data: :return: """ if re.findall(check_data, self.response_data.text): # 能找到check_data的值则算通过 return self.pass_result else: return self.fail_result def header_key_check(self, check_data): """ 检查头部信息是否包含某个值 可以参照key_check() :param check_data: :return: """ # 字符串逗号分割 key_list = check_data.split(",") tmp_result = [] # 取出需要断言的字段 for check_key in key_list: # 如果 check_key 在json串的键当中,则添加True,不是则添加False if check_key in self.response_data.headers.keys(): tmp_result.append(True) else: tmp_result.append(False) if False in tmp_result: # 只要有一个不符合 用例全部失败 return self.fail_result else: return self.pass_result def header_key_value_check(self, check_data): """ 检查头部键值对是否一致 参照key_value_check() :param check_data: :return: """ key_dict = json.loads(check_data) tmp_result = [] for check_key in key_dict.items(): if check_key in self.response_data.headers.items(): tmp_result.append(True) else: tmp_result.append(False) if False in tmp_result: # 只要有一个不符合 用例全部失败 return self.fail_result else: return self.pass_result def response_code_check(self, check_data): """ 检查返回状态码 :param check_data: :return: """ if self.response_data.status_code == int(check_data): return self.pass_result else: return self.fail_result def run_check(self, check_type, except_result): """ :param check_type: 检查的类型 :param except_result: 检查的字段 :return: """ if check_type == "none" or except_result == "": return self.function["none"]() else: return self.function[check_type](except_result) if __name__ == '__main__': import requests url = "https://api.weixin.qq.com/cgi-bin/token" get_params = {"grant_type": "client_credential", "appid": "wxb637f897f0bf1f0d", "secret": "501123d2d367b109a5cb9a9011d0f084"} response = requests.get(url=url, params=get_params) # print(response.headers) ck = CheckUtils(response) print(ck.none_check()) print(ck.run_check('json_key', "access_token,expires_in")) print(ck.run_check('json_key_value', '{"expires_in": 7200}')) print(ck.run_check("body_regexp", '"access_token":"(.+?)"')) print(ck.run_check("header_key_check", "Connection")) print(ck.run_check("header_key_value_check", '{"Connection": "keep-alive"}')) print(ck.run_check("response_code_check", "200"))
- 日志模块
- 配置文件新增log_path
- config_uitls.py 新增方法
- nb_log_config.py 引入logs路径
- 封装日志模块
# -*- coding: utf-8 -*- # @Time : 2021/12/17 15:57 # @Author : Limusen # @File : logs_utils from common.config_utils import local_config from nb_log import LogManager class LogsUtils: def logger(self): logger = LogManager('Api_Test').get_logger_and_add_handlers( is_add_stream_handler=True, log_filename=local_config.LOG_NAME ) return logger logger = LogsUtils().logger() logger.info("askd")
- 代码添加异常处理 在底层代码处添加 request_utils.py
# -*- coding: utf-8 -*- # @Time : 2021/12/9 14:37 # @Author : Limusen # @File : request_utils import re import json import jsonpath import requests from requests.exceptions import ConnectionError from requests.exceptions import ProxyError from requests.exceptions import RequestException from common.config_utils import local_config from common.check_utils import CheckUtils from common.logs_utils import logger class RequestsUtils: def __init__(self): # 封装好的配置文件直接拿来用 self.hosts = local_config.get_hosts # 全局session调用 self.session = requests.session() self.temp_variables = {} def __get(self, request_info): """ get请求封装 :param request_info: :return: """ try: # request_info 是我们封装好的数据,可以直接拿来用 url = "https://%s%s" % (self.hosts, request_info["请求地址"]) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) response = self.session.get(url=url, params=json.loads(request_info["请求参数(get)"]), # params=json.loads( # requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None), headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None ) if request_info["取值方式"] == "正则取值": value = re.findall(request_info["取值代码"], response.text)[0] self.temp_variables[request_info["取值变量"]] = value elif request_info["取值方式"] == "jsonpath取值": value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0] self.temp_variables[request_info["取值变量"]] = value result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) except ProxyError as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生连接异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) except ConnectionError as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生连接异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) except RequestException as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) except Exception as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) return result def __post(self, request_info): """ post请求封装 :param request_info: :return: """ try: url = "https://%s%s" % (self.hosts, request_info["请求地址"]) variables_list = re.findall('\\${.+?}', request_info["请求参数(get)"]) for variable in variables_list: request_info["请求参数(get)"] = request_info["请求参数(get)"].replace(variable, self.temp_variables[variable[2:-1]]) variables_list = re.findall('\\${.+?}', request_info["请求参数(post)"]) for variable in variables_list: request_info["请求参数(post)"] = request_info["请求参数(post)"].replace(variable, self.temp_variables[variable[2:-1]]) response = self.session.post(url=url, params=json.loads(request_info['请求参数(get)']), # params=json.loads( # requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None), headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None, json=json.loads(request_info["请求参数(post)"]) ) if request_info["取值方式"] == "正则取值": value = re.findall(request_info["取值代码"], response.text)[0] self.temp_variables[request_info["取值变量"]] = value elif request_info["取值方式"] == "jsonpath取值": value = jsonpath.jsonpath(response.json(), request_info["取值代码"])[0] self.temp_variables[request_info["取值变量"]] = value result = CheckUtils(response).run_check(request_info["断言类型"], request_info["期望结果"]) except ProxyError as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生连接异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) except ConnectionError as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生连接异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) except RequestException as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) except Exception as e: result = {"code": 3, "error_message": "调用接口 [%s] 时发生请求异常,异常原因 [%s]" % (request_info["接口名称"], e.__str__()), "check_result": False} logger.error("调用接口 {} 时发生异常,异常原因是: {}".format(request_info["接口名称"], e.__str__())) return result def request(self, request_info): """ 封装方法自动执行post或者get方法 :param request_info: :return: """ request_type = request_info['请求方式'] if request_type == "get": # 私有化方法,其他类均不可调用 result = self.__get(request_info) elif request_type == "post": result = self.__post(request_info) else: result = {"code": 1, "error_message": "当前请求方式暂不支持!", "check_result": False} return result def request_steps(self, request_steps): """ 按照列表测试用例顺序执行测试用例 :param request_steps: :return: """ for request in request_steps: result = self.request(request) if result['code'] != 0: break return result if __name__ == '__main__': requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token', '断言类型': 'json_key', '期望结果': 'access_token,expires_in'} requests_info_post = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"51_136b9lRBH4SdLbSYI9C_1Sf1OogELivPJPNZ5z1mTzekmp3Yg4XQn8mx-sb3WxxV99NRWAX5CQhVIF6-uY12H_nRDjmEJ7H7oEbz9-qNHWV1g04V2t-29pslCsiuaSxIrkUChv4a2rPwdhnEEMHeADAMUP"}', '请求参数(post)': '{ "tag" : { "name" : "snsnssn" } } ', '取值方式': '无', '取值代码': '"id":(.+?),', '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''} requests_info_list = [ {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token_value', '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}, {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"${token_value}"}', '请求参数(post)': '{ "tag" : { "name" : "newtest" } } ', '取值方式': '无', '取值代码': '"id":(.+?),', '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''} ] res = RequestsUtils() # res.get(requests_info) # res.post(requests_info_post) print(res.request(requests_info_post)) # print(res.request_steps(requests_info_list))
这一章节主要说到的是将断言库跟请求库整合,还有日志模块的封装,下一章节将讲到的是请求参数,参数化