zoukankan      html  css  js  c++  java
  • python_reques接口测试框架,Excel作为案例数据源

    一、框架菜单

    1.1 common模块

     1.2 其他

     二、Excel接口测试案例编写

     

    三、读取Excel测试封装(核心封装)

    excel_utils.py  读取Excel中的数据

    import os
    import xlrd   #内置模块、第三方模块pip install  自定义模块
    
    
    class ExcelUtils():
        def __init__(self,file_path,sheet_name):
            self.file_path = file_path
            self.sheet_name = sheet_name
            self.sheet = self.get_sheet()  # 整个表格对象
    
        def get_sheet(self):
            wb = xlrd.open_workbook(self.file_path)
            sheet = wb.sheet_by_name(self.sheet_name)
            return sheet
    
        def get_row_count(self):
            row_count = self.sheet.nrows
            return row_count
    
        def get_col_count(self):
            col_count = self.sheet.ncols
            return col_count
    
        def __get_cell_value(self,row_index, col_index):
            cell_value = self.sheet.cell_value(row_index,col_index)
            return cell_value
    
        def get_merged_info(self):
            merged_info = self.sheet.merged_cells
            return merged_info
    
        def get_merged_cell_value(self,row_index, col_index):
            """既能获取普通单元格的数据又能获取合并单元格数据"""
            cell_value = None
            for (rlow, rhigh, clow, chigh) in self.get_merged_info():
                if (row_index >= rlow and row_index < rhigh):
                    if (col_index >= clow and col_index < chigh):
                        cell_value = self.__get_cell_value(rlow, clow)
                        break;  # 防止循环去进行判断出现值覆盖的情况
                    else:
                        cell_value = self.__get_cell_value(row_index, col_index)
                else:
                    cell_value = self.__get_cell_value(row_index, col_index)
            return cell_value
    
        def get_sheet_data_by_dict(self):
            all_data_list = []
            first_row = self.sheet.row(0)  #获取首行数据
            for row in range(1, self.get_row_count()):
                row_dict = {}
                for col in range(0, self.get_col_count()):
                    row_dict[first_row[col].value] = self.get_merged_cell_value(row, col)
                all_data_list.append(row_dict)
            return all_data_list
    
    if __name__=='__main__':
        current_path = os.path.dirname(__file__)
        excel_path = os.path.join( current_path,'..','samples/data/test_case.xlsx' )
        excelUtils = ExcelUtils(excel_path,"Sheet1")
        for row in excelUtils.get_sheet_data_by_dict():
            print(  row )
    View Code

    testdata_utils.py 读取Excel中的数据后处理成需要的数据

    import os
    from common.excel_utils import ExcelUtils
    from common.config_utils import config
    
    current_path = os.path.dirname(__file__)
    test_data_path = os.path.join( current_path,'..', config.CASE_DATA_PATH )
    
    class TestdataUtils():
        def __init__(self,test_data_path = test_data_path):
            self.test_data_path = test_data_path
            self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict()
            self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()
    
    
        def __get_testcase_data_dict(self):
            testcase_dict = {}
            for row_data in self.test_data:
                testcase_dict.setdefault( row_data['测试用例编号'],[] ).append( row_data )
            return testcase_dict
    
        def def_testcase_data_list(self):
            testcase_list = []
            for k,v in self.__get_testcase_data_dict().items():
                one_case_dict = {}
                one_case_dict["case_id"] = k
                one_case_dict["case_info"] = v
                testcase_list.append( one_case_dict )
            return testcase_list
    
    
    if __name__=="__main__":
        testdataUtils = TestdataUtils()
        for i in testdataUtils.def_testcase_data_list():
            print( i )
    View Code

    四、request封装(核心封装)

    requests_utils.py   包含post请求,get请求,异常,调用断言
    import ast
    import re
    import requests
    import jsonpath
    from requests.exceptions import RequestException
    from requests.exceptions import ProxyError
    from requests.exceptions import ConnectionError
    from common.config_utils import config
    from common.check_utils import CheckUtils
    
    class RequestsUtils():
        def __init__(self):
            self.hosts =  config.hosts
            self.headers = {"ContentType":"application/json;charset=utf-8"}
            self.session = requests.session()
            self.temp_variables = {}
    
        def __get(self,get_info):
            try:
                url = self.hosts + get_info["请求地址"]
                response = self.session.get( url = url,
                                             params = ast.literal_eval(get_info["请求参数(get)"])
                                             )
                response.encoding = response.apparent_encoding
                if get_info["取值方式"] == "json取值":
                    value = jsonpath.jsonpath( response.json(),get_info["取值代码"] )[0]
                    self.temp_variables[ get_info["传值变量"] ] = value
                elif get_info["取值方式"] == "正则取值":
                    value = re.findall(get_info["取值代码"],response.text)[0]
                    self.temp_variables[get_info["传值变量"]] = value
                result = CheckUtils(response).run_check(get_info['期望结果类型'], get_info['期望结果'])
            except ProxyError as e:
                result = {'code': 4, 'result': '[%s]请求:代理错误异常' % (get_info["接口名称"])}
            except ConnectionError as e:
                result = {'code': 4, 'result': '[%s]请求:连接超时异常' % (get_info["接口名称"])}
            except RequestException as e:
                result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (get_info["接口名称"], e.__str__())}
            except Exception as e:
                result = {'code':4,'result':'[%s]请求:系统异常,原因:%s'%(get_info["接口名称"],e.__str__())}
            return result
    
        def __post(self,post_info):
            try:
                url = self.hosts + post_info["请求地址"]
                response = self.session.post( url = url,
                                             headers = self.headers,
                                             params = ast.literal_eval(post_info["请求参数(get)"]),
                                            # data = post_infos["提交数据(post)"],
                                             json=ast.literal_eval(post_info["提交数据(post)"])
                                            )
                response.encoding = response.apparent_encoding
                if post_info["取值方式"] == "json取值":
                    value = jsonpath.jsonpath( response.json(),post_info["取值代码"] )[0]
                    self.temp_variables[ post_info["传值变量"] ] = value
                elif post_info["取值方式"] == "正则取值":
                    value = re.findall(post_info["取值代码"],response.text)[0]
                    self.temp_variables[post_info["传值变量"]] = value
                #调用CheckUtils()
                result = CheckUtils(response).run_check(post_info['期望结果类型'],post_info['期望结果'])
            except ProxyError as e:
                result = {'code': 4, 'result': '[%s]请求:代理错误异常' % (post_info["接口名称"])}
            except ConnectionError as e:
                result = {'code': 4, 'result': '[%s]请求:连接超时异常' % (post_info["接口名称"])}
            except RequestException as e:
                result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (post_info["接口名称"], e.__str__())}
            except Exception as e:
                result = {'code':4,'result':'[%s]请求:系统异常,原因:%s'%(post_info["接口名称"],e.__str__())}
            return result
    
        def request(self,step_info):
            try:
                request_type = step_info["请求方式"]
                param_variable_list = re.findall('\${w+}', step_info["请求参数(get)"])
                if param_variable_list:
                    for param_variable in param_variable_list:
                        step_info["请求参数(get)"] = step_info["请求参数(get)"]
                            .replace(param_variable,'"%s"' % self.temp_variables.get(param_variable[2:-1]))
                if request_type == "get":
                    result = self.__get( step_info )
                elif request_type == "post":
                    data_variable_list = re.findall('\${w+}', step_info["提交数据(post)"])
                    if data_variable_list:
                        for param_variable in data_variable_list:
                            step_info["提交数据(post)"] = step_info["提交数据(post)"] 
                                .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1]))
                    result = self.__post( step_info )
                else:
                    result = {'code':1,'result':'请求方式不支持'}
            except Exception as e:
                result = {'code':4,'result':'用例编号[%s]的[%s]步骤出现系统异常,原因:%s'%(step_info['测试用例编号'],step_info["测试用例步骤"],e.__str__())}
            return result
    
        def request_by_step(self,step_infos):
            self.temp_variables = {}
            for step_info in step_infos:
                temp_result = self.request( step_info )
                # print( temp_result )
                if temp_result['code']!=0:
                    break
            return temp_result
    
    
    if __name__=="__main__":
    
        case_info = [
            {'请求方式': 'get', '请求地址': '/cgi-bin/token', '请求参数(get)': '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}', '提交数据(post)': '', '取值方式': 'json取值', '传值变量': 'token', '取值代码': '$.access_token', '期望结果类型': '正则匹配', '期望结果': '{"access_token":"(.+?)","expires_in":(.+?)}'},
            {'请求方式': 'post', '请求地址': '/cgi-bin/tags/create', '请求参数(get)': '{"access_token":${token}}', '提交数据(post)': '{"tag" : {"name" : "衡东"}}','取值方式': '', '传值变量': '', '取值代码': '', '期望结果类型': '正则匹配', '期望结果': '{"tag":{"id":(.+?),"name":"衡东"}}'}
        ]
        RequestsUtils().request_by_step(case_info)
    View Code

    五、断言封装(核心封装)

    check_utils.py  断言封装,与实际结果核对
    import re
    import ast
    
    class CheckUtils():
        def __init__(self,check_response=None):
            self.ck_response=check_response
            self.ck_rules = {
                '': self.no_check,
                'json键是否存在': self.check_key,
                'json键值对': self.check_keyvalue,
                '正则匹配': self.check_regexp
            }
            self.pass_result = {
                'code': 0,
                'response_reason': self.ck_response.reason,
                'response_code': self.ck_response.status_code,
                'response_headers': self.ck_response.headers,
                'response_body': self.ck_response.text,
                'check_result': True,
                'message': ''  # 扩招作为日志输出等
            }
            self.fail_result = {
                'code': 2,
                'response_reason': self.ck_response.reason,
                'response_code': self.ck_response.status_code,
                'response_headers': self.ck_response.headers,
                'response_body': self.ck_response.text,
                'check_result': False,
                'message': ''  # 扩招作为日志输出等
            }
    
    
        def no_check(self):
            return self.pass_result
    
    
        def check_key(self,check_data=None):
            check_data_list = check_data.split(',')   #把需要判断的值做切割,取出键值
            res_list = [] #存放每次比较的结果
            wrong_key = [] #存放比较失败key
            for check_data in check_data_list:   #把切割的键值和取出响应结果中的所有的键一个一个对比
                if check_data in self.ck_response.json().keys():
                    res_list.append(self.pass_result )
                else:
                    res_list.append( self.fail_result )
                    wrong_key.append(check_data)     #把失败的键放进来,便于后续日志输出
            # print(res_list)
            # print(wrong_key)
            if self.fail_result in res_list:
                return self.fail_result
            else:
                return self.pass_result
    
        def check_keyvalue(self,check_data=None):
            res_list = []  # 存放每次比较的结果
            wrong_items = []  # 存放比较失败 items
            for check_item in ast.literal_eval(check_data).items():  #literal_eval()安全性的把字符串转成字典,items()取出键值对
                if check_item in self.ck_response.json().items():
                    res_list.append( self.pass_result )
                else:
                    res_list.append( self.fail_result )
                    wrong_items.append(check_item)
            # print( res_list )
            # print( wrong_items )
    
            if self.fail_result in res_list:
                return self.fail_result
            else:
                return self.pass_result
    
        def check_regexp(self,check_data=None):
            pattern = re.compile(check_data)
            if re.findall(pattern=pattern,string=self.ck_response.text):  #匹配到了,不为空,为true
                return self.pass_result
            else:
                return self.fail_result
    
        def run_check(self,check_type=None,check_data=None):
            code = self.ck_response.status_code
            if code == 200:
                if check_type in self.ck_rules.keys():
                    result=self.ck_rules[check_type](check_data)
                    return result
                else:
                    self.fail_result['message'] = '不支持%s判断方法'%check_type
                    return self.fail_result
            else:
                self.fail_result['message'] = '请求的响应状态码非%s'%str(code)
                return self.fail_result
    
    
    
    
    if __name__=="__main__":
       # 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值
        CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in")
        #检查键值对是否存在
        CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue('{"expires_in": 7200}')
        #正则对比
       #TURE
        print(CheckUtils('{"access_token":"hello","expires_in":7200}').check_regexp('"expires_in":(.+?)'))
       #False
        print(CheckUtils('{"access_token":"hello","expires":7200}').check_regexp('"expires_in":(.+?)'))
    View Code

    六、api_testcase下的api_test.py 封装

    import warnings
    import unittest
    import paramunittest
    from common.testdata_utils import TestdataUtils
    from common.requests_utils import RequestsUtils
    
    #如果是mysql数据源的话切换成  def_testcase_data_list_by_mysql()   exccel数据源:def_testcase_data_list()
    
    case_infos = TestdataUtils().def_testcase_data_list_by_mysql()
    
    @paramunittest.parametrized(
        *case_infos
    )
    
    class APITest(paramunittest.ParametrizedTestCase):
        def setUp(self) -> None:
            warnings.simplefilter('ignore', ResourceWarning)  #不会弹出警告提示
    
        def setParameters(self, case_id, case_info):
            self.case_id = case_id
            self.case_info = case_info
    
        def test_api_common_function(self):
            '''测试描述'''
            self._testMethodName = self.case_info[0].get("测试用例编号")
            self._testMethodDoc = self.case_info[0].get("测试用例名称")
            actual_result = RequestsUtils().request_by_step(self.case_info)
            self.assertTrue( actual_result.get('check_result'),actual_result.get('message') )
    
    if __name__ == '__main__':
        unittest.main()
    View Code

    七、common下的log_utils.py 封装

    import os
    import logging
    import time
    from common.config_utils import config
    
    current_path = os.path.dirname(__file__)
    log_output_path = os.path.join( current_path,'..', config.LOG_PATH  )
    
    class LogUtils():
        def __init__(self,log_path=log_output_path):
            self.log_name = os.path.join( log_output_path ,'ApiTest_%s.log'%time.strftime('%Y_%m_%d') )
            self.logger = logging.getLogger("ApiTestLog")
            self.logger.setLevel( config.LOG_LEVEL )
    
            console_handler = logging.StreamHandler()  # 控制台输出
            file_handler = logging.FileHandler(self.log_name,'a',encoding='utf-8')  # 文件输出
            formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
            console_handler.setFormatter(formatter)
            file_handler.setFormatter(formatter)
    
            self.logger.addHandler( console_handler )
            self.logger.addHandler( file_handler )
    
            console_handler.close()  # 防止打印日志重复
            file_handler.close()     # 防止打印日志重复
    
        def get_logger(self):
            return self.logger
    
    logger = LogUtils().get_logger()   # 防止打印日志重复
    
    if __name__ == '__main__':
        logger.info('hello')
    View Code

    八、common下的config_utils.py的封装

    配置文件的编写:

     对配置文件的读取封装:

    import  os
    import configparser
    
    current_path = os.path.dirname(__file__)
    cfgpath = os.path.join(current_path, "../conf/local_config.ini")
    print(cfgpath)
    
    
    class ConfigUtils:
        def __init__(self,config_path=cfgpath):
            self.__conf=configparser.ConfigParser()
            self.__conf.read(config_path, encoding="utf-8")
    
        def read_ini(self,sec,option):
            value=self.__conf.get(sec,option)
            return value
    
        @property
        def hosts(self):
            value=self.read_ini('default','hosts')
            return value
    
        @property
        def LOG_PATH(self):
            value = self.read_ini('path', 'LOG_PATH')
            return value
    
        @property
        def CASE_DATA_PATH(self):
            value = self.read_ini('path', 'CASE_DATA_PATH')
            return value
    
        @property
        def REPORT_PATH(self):
            value = self.read_ini('path', 'REPORT_PATH')
            return value
    
        @property
        def LOG_LEVEL(self):
            value = int(self.read_ini('log', 'LOG_LEVEL'))
            return value
    
    
        @property
        def smtp_server(self):
            smtp_server_value = self.read_ini('email', 'smtp_server')
            return smtp_server_value
    
        @property
        def smtp_sender(self):
            smtp_sender_value = self.read_ini('email', 'smtp_sender')
            return smtp_sender_value
    
        @property
        def smtp_password(self):
            smtp_password_value = self.read_ini('email', 'smtp_password')
            return smtp_password_value
    
        @property
        def smtp_receiver(self):
            smtp_receiver_value = self.read_ini('email', 'smtp_receiver')
            return smtp_receiver_value
    
        @property
        def smtp_cc(self):
            smtp_cc_value = self.read_ini('email', 'smtp_cc')
            return smtp_cc_value
    
        @property
        def smtp_subject(self):
            smtp_subject_value = self.read_ini('email', 'smtp_subject')
            return smtp_subject_value
    
    
    config=ConfigUtils()
    
    
    if __name__=='__main__':
        current_path = os.path.dirname(__file__)
        cfgpath = os.path.join(current_path, "../conf/local_config.ini")
        config_u=ConfigUtils()
        print(config_u.hosts)
        print(config_u.LOG_LEVEL)
    View Code

    九、test_runner下的run_case.py 封装

    class RunCase():
        def __init__(self):
            self.test_case_path = test_case_path
            self.report_path = test_report_path
            self.title = 'P1P2接口自动化测试报告'
            self.description = '自动化接口测试框架'
            self.tester = '测试开发组'
        def load_test_suite(self):
            discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path,
                                                           pattern='api_test.py',
                                                           top_level_dir=self.test_case_path)
            all_suite = unittest.TestSuite()
            all_suite.addTest( discover )
            return all_suite
        def run(self):
            report_dir = HTMLTestReportCN.ReportDirectory(self.report_path)
            report_dir.create_dir(self.title)
            report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path')
            fp = open( report_file_path ,'wb' )
            runner = HTMLTestReportCN.HTMLTestRunner(stream=fp,
                                                     title=self.title,
                                                     description=self.description,
                                                     tester=self.tester)
            runner.run( self.load_test_suite() )
            fp.close()
            return report_file_path
    
    
    if __name__=='__main__':
        report_path = RunCase().run()
        EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()
    View Code

    十、common下的email_utils.py 封装

    import os
    import smtplib
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    from common.config_utils import config
    
    class EmailUtils():
        def __init__(self,smtp_body,smtp_attch_path=None):
           self.smtp_server = config.smtp_server
           self.smtp_sender = config.smtp_sender
           self.smtp_password = config.smtp_password
           self.smtp_receiver = config.smtp_receiver
           self.smtp_cc = config.smtp_cc
           self.smtp_subject = config.smtp_subject
           self.smtp_body = smtp_body
           self.smtp_attch = smtp_attch_path
    
        def mail_message_body(self):
            message = MIMEMultipart()
            message['from'] = self.smtp_sender
            message['to'] = self.smtp_receiver
            message['Cc'] = self.smtp_cc
            message['subject'] = self.smtp_subject
            message.attach( MIMEText(self.smtp_body,'html','utf-8') )
            if self.smtp_attch:
                attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8')
                attach_file['Content-Type'] = 'application/octet-stream'
                attach_file.add_header('Content-Disposition', 'attachment', filename=('gbk', '', os.path.basename(self.smtp_attch)))
                message.attach(attach_file)
            return message
    
        def send_mail(self):
            smtp = smtplib.SMTP()
            smtp.connect(self.smtp_server)
            smtp.login(user=self.smtp_sender, password=self.smtp_password)
            smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(",")+ self.smtp_cc.split(","), self.mail_message_body().as_string())
    
    if __name__=='__main__':
        html_path = os.path.dirname(__file__) + '/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html'
        EmailUtils('<h3 align="center">自动化测试报告</h3>',html_path).send_mail()
    View Code
  • 相关阅读:
    mysql book网盘
    mysql数据库测试库下载
    TEMP
    测试
    吴炳锡
    ACMUG
    mysql select语句执行顺序
    mysql mha高可用架构的安装
    执行计划解读 简朝阳 (Sky Jian) and 那蓝蓝海
    mysql字符串比较
  • 原文地址:https://www.cnblogs.com/123anqier-blog/p/13376455.html
Copyright © 2011-2022 走看看