zoukankan      html  css  js  c++  java
  • 断言封装整合到requests封装中应用(纠错False,Result循环,tag测试)

    检查json_key_value:

    检查:

    requests.py

    # -*- coding: utf-8 -*-
    #@File :demo_04.py
    #@Auth : wwd
    #@Time : 2020/12/10 9:09 下午
    import json
    import jsonpath
    import requests
    import re
    from utils.config_utils import local_config
    from utils.check_utils import CheckUtils

    class RequestsUtils:
    def __init__(self):
    self.hosts = local_config.HOSTS
    self.session = requests.session()
    self.tmp_variables={}
    def __get(self,requests_info):
    url = self.hosts + requests_info['请求地址']
    variable_list = re.findall('\${w+}',requests_info['请求参数(get)'])
    for variable in variable_list:
    requests_info['请求参数(get)'] = requests_info['请求参数(get)'].replace(variable,
    '"%s"'%self.tmp_variables[variable[2:-1]])
    response = self.session.get( url = url,
    params = json.loads(requests_info['请求参数(get)']),
    headers = requests_info['请求头部信息'])
    response.encoding = response.apparent_encoding #保证不乱码
    if requests_info['取值方式'] == 'jsonpath取值':
    value = jsonpath.jsonpath(response.json(),requests_info['取值代码'])[0]
    self.tmp_variables[requests_info['取值变量']] = value
    elif requests_info['取值方式'] == '正则取值':
    value = re.findall(requests_info['取值代码'],response.text)[0]
    self.tmp_variables[requests_info['取值变量']] = value
    result = CheckUtils(response).run_check(requests_info['断言类型'],requests_info['期望结果'])
    return result

    def __post(self,requests_info):
    url = self.hosts + requests_info['请求地址']
    get_variable_list = re.findall('\${w+}', requests_info['请求参数(get)'])
    for variable in get_variable_list:
    requests_info['请求参数(get)'] = requests_info['请求参数(get)'].replace(variable,
    '"%s"'%self.tmp_variables[variable[2:-1]])
    post_variable_list = re.findall('\${w+}', requests_info['请求参数(post)'])
    for variable in post_variable_list:
    requests_info['请求参数(post)'] = requests_info['请求参数(post)'].replace(variable,
    '"%s"'%self.tmp_variables[variable[2:-1]])
    response = self.session.post(url=url,
    headers=requests_info['请求头部信息'],
    params=json.loads(requests_info['请求参数(get)']),
    json = json.loads(requests_info['请求参数(post)'])
    )
    response.encoding = response.apparent_encoding
    if requests_info['取值方式'] == 'jsonpath取值':
    value = jsonpath.jsonpath(response.json(),requests_info['取值代码'])[0]
    self.tmp_variables[requests_info['取值变量']] = value
    elif requests_info['取值方式'] == '正则取值':
    value = re.findall(requests_info['取值代码'], response.text)[0]
    self.tmp_variables[requests_info['取值变量']] = value
    result = CheckUtils(response).run_check(requests_info['断言类型'], requests_info['期望结果'])
    return result

    def request(self,step_info):
    request_type = step_info['请求方式']
    if request_type == "get":
    result = self.__get(step_info)
    elif request_type == "post":
    result = self.__post(step_info)
    else:
    result = {'code':2,'result':'请求方式不支持'}
    print(self.tmp_variables)
    return result

    def request_by_step(self,test_steps):
    for test_step in test_steps:
    result = self.request(test_step)
    print(result)
    if result['code'] != 0:
    break
    #print(result)
    return result


    if __name__=='__main__':
    # req_post_dict = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_03', '接口名称': '删除标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/delete', '请求参数(get)': '{"access_token":"39_ZlzNDPma7qLWpLJ4K0ir_cSahJ_fg9aevBpGvqRp9VNjqRE6hSkBOSUFla-mFjSGKyF-YFx28sM4Ch1rJISPGVSTahZ8l_xQ9M7CnAFoqUfibusAdeOI4lHEIzB6zhXJQHN5b9as9zhcGtSbBYKeAGAEBN"}', '请求参数(post)': '{ "tag":{ "id" : 456 } }'}
    # req_dict = {'测试用例编号': 'api_case_01', '测试用例名称': '获取access_token接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wx55614004f367f8ca","secret":"65515b46dd758dfdb09420bb7db2c67f"}', '请求参数(post)': ''}

    # step_list = [{'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wx55614004f367f8ca","secret":"65515b46dd758dfdb09420bb7db2c67f"}', '请求参数(post)': ''}, {'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":"39_Bm5UI-zvWkokwnl6d3zCW30hk3sVHSv6sh6cHN3dbgnwUdfmhM-EFZ3OIrTechkzaRt9Iae3yX_MF7_h7bobNybvkoAC1CM2pAfGfNqSegXsPbjyJzkgSHtBV1OezPwEvFn60jS3__w5BdzVMRHcAHAYDT"}', '请求参数(post)': '{ "tag" : { "name" : "广东" } } '}]
    # requestsUtils = RequestsUtils()
    # requestsUtils.request_by_step( step_list )

    #step_list = [{'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wx55614004f367f8ca","secret":"65515b46dd758dfdb09420bb7db2c67f"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token'}, {'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":${token}}', '请求参数(post)': '{ "tag" : { "name" : "p3p4hehehe1" } } ', '取值方式': '无', '取值代码': '', '取值变量': ''}]
    #step_list = [{'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wx55614004f367f8ca","secret":"65515b46dd758dfdb09420bb7db2c67f"}', '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token'}, {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":${token}}', '请求参数(post)': '{ "tag" : { "name" : "p3p4pppp2" } } ', '取值方式': 'jsonpath取值', '取值代码': '$.tag.id', '取值变量': 'tag_id'}, {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '是', '用例步骤': 'step_03', '接口名称': '删除标签接口', '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/delete', '请求参数(get)': '{"access_token":${token}}', '请求参数(post)': '{"tag":{"id":${tag_id}}}', '取值方式': '无', '取值代码': '', '取值变量': ''}]
    step_list = [{'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_01', '接口名称': '获取access_token接口',
    '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
    '请求参数(get)': '{"grant_type":"client_credential","appid":"wx55614004f367f8ca","secret":"65515b46dd758dfdb09420bb7db2c67f"}',
    '请求参数(post)': '', '取值方式': '正则取值', '取值代码': '"access_token":"(.+?)"', '取值变量': 'token', '断言类型': 'json_key_value',
    '期望结果': '{"expires_in":7200}'},
    {'测试用例编号': 'api_case_02', '测试用例名称': '创建标签接口测试', '用例执行': '是', '用例步骤': 'step_02', '接口名称': '创建标签接口',
    '请求方式': 'post', '请求头部信息': '', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":${token}}',
    '请求参数(post)': '{ "tag" : { "name" : "P3P4ssss" } } ', '取值方式': '无', '取值代码': '', '取值变量': '',
    '断言类型': 'json_key', '期望结果': 'tag'}]
    requestsUtils = RequestsUtils()
    r = requestsUtils.request_by_step(step_list)

    # print( r['response_body'] )

    check_utils.py
    # -*- coding: utf-8 -*-
    #@File :check_utils.py
    #@Auth : wwd
    #@Time : 2020/12/10 7:18 下午
    import requests
    import json
    import requests
    import json

    class CheckUtils:
    def __init__(self,response_data):
    self.response_data = response_data
    self.check_rules = { #判断规则和方法名去对应的一个过程
    '无': self.none_check,
    'json_key': self.key_check,
    'json_key_value': self.key_value_check,
    '正则比对': self.regexp_check
    }
    self.pass_result = {
    'code':0, # 状态吗,0表示断言成功
    'response_code':self.response_data.status_code,
    'response_reason':self.response_data.reason,
    'response_headers':self.response_data.headers,
    'response_body':self.response_data.text,
    'response_url':self.response_data.url,
    'check_result': True
    }
    self.fail_result = {
    'code':1,
    'response_code':self.response_data.status_code,
    'response_reason':self.response_data.reason,
    'response_headers':self.response_data.headers,
    'response_body':self.response_data.text,
    'response_url':self.response_data.url,
    'check_result': False
    }

    def none_check(self):
    return self.pass_result

    def key_check(self,check_data):
    print('111')
    key_list = check_data.split(',')
    tmp_result = []
    print(key_list)
    print(self.response_data.json().keys())
    for key in key_list:
    if key in self.response_data.json().keys():
    tmp_result.append( self.pass_result )
    else:
    tmp_result.append( self.fail_result )
    if self.fail_result in tmp_result:
    return self.fail_result
    else:
    return self.pass_result

    def key_value_check(self,check_data):
    key_value_dict = json.loads( check_data )
    tmp_result = []
    for key_value in key_value_dict.items():
    if key_value in self.response_data.json().items():
    tmp_result.append( self.pass_result )
    else:
    tmp_result.append( self.fail_result )
    if self.fail_result in tmp_result:
    return self.fail_result
    else:
    return self.pass_result

    def regexp_check(self,check_data):
    pass

    def run_check(self,check_type,check_data):
    if check_type=='无' or check_data == '':
    return self.check_rules[check_type]()
    else:
    return self.check_rules[check_type](check_data)

    if __name__=='__main__':
    session = requests.session()
    get_param_dict = {
    "grant_type": "client_credential",
    "appid": "wx55614004f367f8ca",
    "secret": "65515b46dd758dfdb09420bb7db2c67f"
    }
    response = session.get(url='https://api.weixin.qq.com/cgi-bin/token',
    params=get_param_dict)
    response.encoding = response.apparent_encoding
    checkUtils = CheckUtils(response)
    # print( checkUtils.key_check('access_token,expires_in') )
    # print( checkUtils.key_value_check('{"expires_in":7200}') )
    print( checkUtils.run_check('json_key','access_token,expires_in') )
    print( checkUtils.run_check('json_key_value','{"expires_in":7200}') )


    github源码:https://github.com/w550856163/JKZDH_KJ.git tag:V1.2
    回忆滋润坚持
  • 相关阅读:
    MonoDevelop/MonoTouch SDK开发iOS程序体验! 不及格的程序员
    iPad 开发陷井1 不及格的程序员
    以苹果平台下开发语言 "ObjectiveC" 谈语言开发效率之争 C#与Java你们都不要争,我才是最慢的! 不及格的程序员
    C# 扩展方法 借签于 ObjectiveC 扩展类. 不及格的程序员
    iOS 编程陷井. 不及格的程序员
    Multimedia Programming Guide & Audio 不及格的程序员
    GoogleStyleGuide 一个不错的开源项目. 不及格的程序员
    撒哟那拉 volatile !!! 不及格的程序员
    iPhone 游戏 Infinity Blade1.0 通关 不及格的程序员
    终于在中国大陆的互联网络中用Facebook and tWitter 's iPhone 客户端登录了。 不及格的程序员
  • 原文地址:https://www.cnblogs.com/james5d/p/14123619.html
Copyright © 2011-2022 走看看