zoukankan      html  css  js  c++  java
  • Python接口测试框架搭建

    思路

    1.首先选取数据存储的方式(本篇主要以excel为主),然后二次封装相应xlrd模块读取excel内容(excel_utils),使用testdata_utils调用excel_utils来读取成我们想要的数据格式

    2.使用config配置并读取使用

    3.封装相应requests模块方法,get/post方法,然后使用相关excel读取数据的方法,来对requests进行驱动

    4.接下来封装异常及断言使用;

    5.将用例整合至一起运行

    6.生成报告和发送邮件功能

    框架

    实现

    excel封装(commonexcel_utils.py)

    1.首先得自己搭建一个excel的文件,里面存放测试用例编号,测试用例名称,用例执行等,如下图

     

    2.这里面用到了合并单元格,所以使用了merged_cells方法(具体相关使用可查看https://www.cnblogs.com/ClownAlin/p/13224919.html)

    3.具体封装

     1 #!/usr/bin/env python
     2 # encoding: utf-8
     3 # @author: Alin
     4 # @file: .py
     5 # @time: 2020/7/5 8:58
     6 import os
     7 import xlrd
     8 
     9 
    10 class ExcelUtils:
    11     def __init__(self, file_path, sheet_name):
    12         '''
    13         :param file_path: 文件路径
    14         :param sheet_name: 文件标签名称
    15         '''
    16         self.file_path = file_path
    17         self.sheet_name = sheet_name
    18         self.sheet = self.get_sheet()
    19 
    20     def get_sheet(self):
    21         '''
    22         :return:将文本打开后,将相应方法传个sheet并返回
    23         '''
    24         wookbook = xlrd.open_workbook(self.file_path)
    25         sheet = wookbook.sheet_by_name(self.sheet_name)
    26         return sheet
    27 
    28     def get_row_count(self):
    29         '''
    30         :return: 返回行数
    31         '''
    32         row_count = self.sheet.nrows
    33         return row_count
    34 
    35     def get_col_count(self):
    36         '''
    37         :return: 返回列数
    38         '''
    39         col_count = self.sheet.ncols
    40         return col_count
    41 
    42     def __get_cell_value(self, row_index, col_index):
    43         """
    44         读取相应单元格中内容
    45         :param row_index: 行号
    46         :param col_index: 列号
    47         :return:
    48         """
    49         cell_value = self.sheet.cell_value(row_index, col_index)
    50         return cell_value
    51 
    52     def get_merged_info(self):
    53         """
    54         :return:返回的是一个列表,每一个元素是合并单元格的位置信息的数组,数组包含四个元素(起始行,结束行,起始列,结束列)
    55         """
    56         merged_info = self.sheet.merged_cells
    57         return merged_info
    58 
    59     def get_merged_cell_value(self, row_index, col_index):
    60         """
    61         读取合并单元格、或普通单元格内的cell
    62         :param row_index:行号
    63         :param col_index:列号
    64         :return:
    65         """
    66         for merged in self.get_merged_info():
    67             if (row_index >= merged[0] and row_index < merged[1] and col_index >= merged[2] and col_index < merged[3]):
    68                 return self.__get_cell_value(merged[0], merged[2])
    69         return self.__get_cell_value(row_index, col_index)
    70 
    71     def get_sheet_data_by_dict(self):
    72         """
    73         :return:将所有数据已字典格式返回
    74         """
    75         all_data_list = []
    76         first_row = self.sheet.row(0)
    77         for row in range(1, self.get_row_count()):
    78             row_dict = {}
    79             for col in range(0, self.get_col_count()):
    80                 row_dict[first_row[col].value] = self.get_merged_cell_value(row, col)
    81             all_data_list.append(row_dict)
    82         return all_data_list
    View Code

    数据读取(common estdata_utils.py)

     1 import os
     2 import pandas as pd
     3 from common.excel_utils import ExcelUtils
     4 from common.local_config_utils import local_config
     5 from common.sql_utils import SqlUtils
     6 
     7 current_path = os.path.dirname(__file__)
     8 test_data_path = os.path.join(current_path, '..', local_config.CASE_DATA_PATH)
     9 
    10 
    11 class TestDataUtils():
    12     def __init__(self, data_path=test_data_path):
    13         """
    14         :param data_path:excel数据存放路径
    15         """
    16         self.data_path = data_path
    17         self.test_data = ExcelUtils(data_path, 'Sheet1').get_sheet_data_by_dict()
    18         self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()
    19 
    20     def __get_test_case_data_dict(self):
    21         '''
    22         :return: 返回字典格式数据
    23         '''
    24         use_case_dict = {}
    25         for row_data in self.test_data:
    26             use_case_dict.setdefault(row_data['测试用例编号'], []).append(row_data)
    27         return use_case_dict
    28 
    29     def get_test_case_data_list(self):
    30         """
    31         :return: 封装成字典格式case_id为key,case_info为value
    32         """
    33         test_case_list = []
    34         for k, v in self.__get_test_case_data_dict().items():
    35             one_case_dict = {}
    36             one_case_dict["case_id"] = k
    37             one_case_dict['case_info'] = v
    38             test_case_list.append(one_case_dict)
    39         return tuple(test_case_list)
    View Code

    config读取使用

    存放(conf/config.ini)

    [default]
    # 配置接口测试的主机地址
    URL = https://api.weixin.qq.com
    
    [path]
    # 配置测试数据的存放路径
    CASE_DATA_PATH = test_data	est_case.xlsx
    # 运行日志存放路径
    LOG_PATH = .logs
    # 运行报告存放路径
    REPORTS_PATH = .	est_reports
    # 测试用例存放路径
    CASE_PATH = .api_testcase

    读取(commonlocal_config_utils.py)

     1 import os
     2 import configparser
     3 
     4 current_path = os.path.dirname(__file__)
     5 config_path = os.path.join(current_path, '..', r'conf/config.ini')
     6 
     7 
     8 class LocalConfigUtils():
     9     def __init__(self, config_path=config_path):
    10         self.cfg = configparser.ConfigParser()
    11         self.cfg.read(config_path, encoding='utf-8')
    12 
    13     @property  # 把方法变为属性方法
    14     def URL(self):
    15         url_value = self.cfg.get('default', 'URL')
    16         return url_value
    17 
    18     @property
    19     def CASE_DATA_PATH(self):
    20         case_data_path = self.cfg.get('path', 'CASE_DATA_PATH')
    21         return case_data_path
    22 
    23     @property
    24     def LOG_PATH(self):
    25         log_path = self.cfg.get('path', 'LOG_PATH')
    26         return log_path
    27 
    28     @property
    29     def REPORTS_PATH(self):
    30         reports_path = self.cfg.get('path', 'REPORTS_PATH')
    31         return reports_path
    32 
    33     @property
    34     def CASE_PATH(self):
    35         case_path = self.cfg.get('path', 'CASE_PATH')
    36         return case_path
    37 
    38     @property
    39     def LOG_LEVEL(self):
    40         log_level = int(self.cfg.get('log', 'LOG_LEVEL'))
    41         return log_level
    View Code

    封装requests相应方法(commom equests_utils.py)

      1 import ast
      2 import re
      3 import jsonpath
      4 import requests
      5 from requests.exceptions import RequestException
      6 from requests.exceptions import ProxyError
      7 from requests.exceptions import ConnectionError
      8 from common.local_config_utils import local_config
      9 from common.check_util import CheckUtil
     10 
     11 
     12 class RequestsUtils():
     13     def __init__(self):
     14         self.hosts = local_config.URL
     15         self.headers = {"ContentType": "application/json;charset='utf-8'"}
     16         self.session = requests.session()
     17         self.temp_variables = {}
     18 
     19     def __get(self, get_info):
     20         try:
     21             url = self.hosts + get_info['请求地址']
     22             response = self.session.get(url=url,
     23                                         params=ast.literal_eval(get_info['请求参数(get)'])
     24                                         )
     25             response.encoding = response.apparent_encoding
     26 
     27             if get_info["取值方式"] == "json取值":
     28                 value = jsonpath.jsonpath(response.json(), get_info["取值代码"])[0]
     29                 self.temp_variables[get_info["传值变量"]] = value
     30             elif get_info["取值方式"] == "正则取值":
     31                 value = re.findall(get_info["取值代码"], response.text)[0]
     32                 self.temp_variables[get_info["传值变量"]] = value
     33             result = CheckUtil(response).run_check(get_info['期望结果类型'], get_info['期望结果'])
     34         except ProxyError as e:
     35             result = {'code': 4, 'result': '[%s]请求:代理错误异常,原因:%s' % (get_info["接口名称"], e.__str__())}
     36         except ConnectionError as e:
     37             result = {'code': 4, 'result': '[%s]请求:连接超时异常,原因:%s' % (get_info["接口名称"], e.__str__())}
     38         except RequestException as e:
     39             result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (get_info["接口名称"], e.__str__())}
     40         # except Exception as e:
     41         #     result = {'code': 4, 'result': '[%s]请求:系统异常,原因:%s' % (get_info["接口名称"], e.__str__())}
     42         return result
     43 
     44     def __post(self, post_info):
     45         try:
     46             url = self.hosts + post_info['请求地址']
     47             response = self.session.post(url=url,
     48                                          headers=self.headers,
     49                                          params=ast.literal_eval(post_info['请求参数(get)']),
     50                                          json=ast.literal_eval(post_info["提交数据(post)"])
     51                                          )
     52             response.encoding = response.apparent_encoding
     53 
     54             if post_info["取值方式"] == "json取值":
     55                 value = jsonpath.jsonpath(response.json(), post_info["取值代码"])[0]
     56                 self.temp_variables[post_info["传值变量"]] = value
     57             elif post_info["取值方式"] == "正则取值":
     58                 value = re.findall(post_info["取值代码"], response.text)[0]
     59                 self.temp_variables[post_info["传值变量"]] = value
     60             result = CheckUtil(response).run_check(post_info['期望结果类型'], post_info['期望结果'])
     61         except ProxyError as e:
     62             result = {'code': 4, 'result': '[%s]请求:代理错误异常,原因:%s' % (post_info["接口名称"], e.__str__())}
     63         except ConnectionError as e:
     64             result = {'code': 4, 'result': '[%s]请求:连接超时异常,原因:%s' % (post_info["接口名称"], e.__str__())}
     65         except RequestException as e:
     66             result = {'code': 4, 'result': '[%s]请求:Request异常,原因:%s' % (post_info["接口名称"], e.__str__())}
     67         except Exception as e:
     68             result = {'code': 4, 'result': '[%s]请求:系统异常,原因:%s' % (post_info["接口名称"], e.__str__())}
     69         return result
     70 
     71     def request(self, step_info):
     72         try:
     73             requests_type = step_info['请求方式']
     74             param_variable_list = re.findall('\${w+}', step_info["请求参数(get)"])
     75             if param_variable_list:
     76                 for param_variable in param_variable_list:
     77                     step_info["请求参数(get)"] = step_info["请求参数(get)"] 
     78                         .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1]))
     79             if requests_type == "get":
     80                 result = self.__get(step_info)
     81             elif requests_type == "post":
     82                 param_variable_list = re.findall('\${w+}', step_info["提交数据(post)"])
     83                 if param_variable_list:
     84                     for param_variable in param_variable_list:
     85                         step_info["提交数据(post)"] = step_info["提交数据(post)"] 
     86                             .replace(param_variable, '"%s"' % self.temp_variables.get(param_variable[2:-1]))
     87                 result = self.__post(step_info)
     88             else:
     89                 result = {'code': 1, 'result': '请求方式不支持'}
     90         except Exception as e:
     91             result = {'code': 4, 'result': '用例编号[%s]的[%s]步骤出现系统异常,原因:%s' % (step_info['测试用例编号'], step_info["测试用例步骤"], e.__str__())}
     92         return result
     93 
     94     def request_by_step(self, step_infos):
     95         self.temp_variables = {}
     96         for step_info in step_infos:
     97             temp_result = self.request(step_info)
     98             if temp_result['code'] != 0:
     99                 break
    100         return temp_result
    View Code

    断言异常使用(commomcheck_utils.py)

     1 import ast
     2 import re
     3 
     4 
     5 class CheckUtil():
     6     def __init__(self, check_response=None):
     7         self.check_response = check_response
     8         self.check_rules = {
     9             '': self.no_check,
    10             'json键是否存在': self.check_key,
    11             'json键值对': self.check_keyValue,
    12             '正则匹配': self.check_regexp
    13         }
    14         self.pass_result = {
    15             'code': 0,
    16             'response_reason': self.check_response.reason,
    17             'response_code': self.check_response.status_code,
    18             'response_headers': self.check_response.headers,
    19             'response_body': self.check_response.text,
    20             'check_result': True,
    21             'message': ''  # 扩展作为日志输出等
    22         }
    23         self.fail_result = {
    24             'code': 2,
    25             'response_reason': self.check_response.reason,
    26             'response_code': self.check_response.status_code,
    27             'response_headers': self.check_response.headers,
    28             'response_body': self.check_response.text,
    29             'check_result': False,
    30             'message': ''  # 扩展作为日志输出等
    31         }
    32 
    33     def no_check(self):
    34         return self.pass_result
    35 
    36     def check_key(self, check_data=None):
    37         check_data_list = check_data.split(',')
    38         res_list = []  # 存放每次比较的结果
    39         wrong_key = []  # 存放比较失败的key
    40         for check_data in check_data_list:
    41             if check_data in self.check_response.json().keys():
    42                 res_list.append(self.pass_result)
    43             else:
    44                 res_list.append(self.fail_result)
    45                 wrong_key.append(check_data)
    46         if self.fail_result in res_list:
    47             return self.fail_result
    48         else:
    49             return self.pass_result
    50 
    51     def check_keyValue(self, check_data=None):
    52         res_list = []
    53         wrong_items = []
    54         for check_item in ast.literal_eval(check_data).items():
    55             if check_item in self.check_response.json().items():
    56                 res_list.append(self.pass_result)
    57             else:
    58                 res_list.append(self.fail_result)
    59                 wrong_items.append(check_item)
    60         if self.fail_result in res_list:
    61             return self.fail_result
    62         else:
    63             return self.pass_result
    64 
    65     def check_regexp(self, check_data=None):
    66         pattern = re.compile(check_data)
    67         if re.findall(pattern=pattern, string=self.check_response.text):
    68             # print(self.pass_result)
    69             return self.pass_result
    70         else:
    71             return self.fail_result
    72 
    73     def run_check(self, check_type=None, check_data=None):
    74         code = self.check_response.status_code
    75         if code == 200:
    76             if check_type in self.check_rules.keys():
    77                 result = self.check_rules[check_type](check_data)
    78                 return result
    79             else:
    80                 self.fail_result['message'] = '不支持%s判断方法' % check_type
    81                 return self.fail_result
    82         else:
    83             self.fail_result['message'] = '请求状态码%s,非200' % str(code)
    84             return self.fail_result
    View Code

    将用例整合运行(api_testcaseapi_test.py)

     1 import warnings
     2 import unittest
     3 import paramunittest
     4 from common.request_utils import RequestsUtils
     5 from common.testdata_utils import TestDataUtils
     6 
     7 case_infos_by_mysql = TestDataUtils().get_test_case_data_list_by_mysql()
     8 case_infos_by_excel = TestDataUtils().get_test_case_data_list()
     9 
    10 @paramunittest.parametrized(
    11     *case_infos_by_excel
    12 )
    13 class APITest(paramunittest.ParametrizedTestCase):
    14     def setUp(self) -> None:
    15         warnings.simplefilter('ignore', ResourceWarning)
    16 
    17     def setParameters(self, case_id, case_info):
    18         self.case_id = case_id
    19         self.case_info = case_info
    20 
    21     def test_api_common_function(self):
    22         self._testMethodName = self.case_info[0].get('测试用例编号')
    23         self._testMethodDoc = self.case_info[0].get('测试用例名称')
    24         actual_result = RequestsUtils().request_by_step(self.case_info)
    25         self.assertTrue(actual_result.get('check_result'), actual_result.get('message'))
    View Code

    生成测试报告(使用的是HTMLTestReportNg.py,封装至test_runner un_case.py)

     1 import os
     2 import unittest
     3 from common.local_config_utils import local_config
     4 from common import HTMLTestReportCN
     5 from common.email_utils import EmailUtils
     6 
     7 current_path = os.path.dirname(__file__)
     8 test_case_path = os.path.join(current_path, '..', local_config.CASE_PATH)
     9 test_report_path = os.path.join(current_path, '..', local_config.REPORTS_PATH)
    10 
    11 
    12 class RunCase:
    13     def __init__(self):
    14         self.case_path = test_case_path
    15         self.report_path = test_report_path
    16         self.title = 'VXAPI接口自动化报告'
    17         self.description = '自动化接口框架学习专用'
    18         self.tester = '我们'
    19 
    20     def load_test_suit(self):
    21         discover = unittest.defaultTestLoader.discover(start_dir=self.case_path,
    22                                                        pattern='api_test.py',
    23                                                        top_level_dir=self.case_path)
    24         all_suite = unittest.TestSuite()
    25         all_suite.addTest(discover)
    26         return all_suite
    27 
    28     def run(self):
    29         report_dir = HTMLTestReportCN.ReportDirectory(self.report_path)
    30         report_dir.create_dir(self.title)
    31         report_file_path = HTMLTestReportCN.GlobalMsg.get_value('report_path')
    32         fp = open(report_file_path, 'wb')
    33         runner = HTMLTestReportCN.HTMLTestRunner(stream=fp,
    34                                                  title=self.title,
    35                                                  description=self.description,
    36                                                  tester=self.tester)
    37         runner.run(self.load_test_suit())
    38         fp.close()
    39         return report_file_path
    40 
    41 
    42 if __name__ == '__main__':
    43     report_path = RunCase().run()
    44     EmailUtils(open(report_path, 'rb').read(), report_path).send_mail()
    View Code

    邮件发送封装(commonemail_utils.py)

     1 import os
     2 import smtplib
     3 from email.mime.text import MIMEText
     4 from email.mime.multipart import MIMEMultipart
     5 from common.local_config_utils import local_config
     6 
     7 
     8 class EmailUtils:
     9     def __init__(self, smtp_body, smtp_attch_path=None):
    10         self.smtp_server = local_config.SMTP_SERVER
    11         self.smtp_sender = local_config.SMTP_SENDER
    12         self.smtp_password = local_config.SMTP_PASSWORD
    13         self.smtp_receiver = local_config.SMTP_RECEIVER
    14         self.smtp_cc = local_config.SMTP_CC
    15         self.smtp_subject = local_config.SMTP_SUBJECT
    16         self.smtp_body = smtp_body
    17         self.smtp_attch = smtp_attch_path
    18 
    19     def mail_message_body(self):
    20         meassge = MIMEMultipart()
    21         meassge['from'] = self.smtp_sender
    22         meassge['to'] = self.smtp_receiver
    23         meassge['Cc'] = self.smtp_cc
    24         meassge['subject'] = self.smtp_subject
    25         meassge.attach(MIMEText(self.smtp_body, 'html', 'utf-8'))
    26         if self.smtp_attch:
    27             attach_file = MIMEText(open(self.smtp_attch, 'rb').read(), 'base64', 'utf-8')
    28             attach_file['Content-Type'] = 'application/octet-stream'
    29             attach_file.add_header('Content-Disposition', 'attachment',
    30                                    filename=('gbk', '', os.path.basename(self.smtp_attch)))
    31             meassge.attach(attach_file)
    32         return meassge
    33 
    34     def send_mail(self):
    35         smtp = smtplib.SMTP()
    36         smtp.connect(self.smtp_server)
    37         smtp.login(user=self.smtp_sender, password=self.smtp_password)
    38         smtp.sendmail(self.smtp_sender, self.smtp_receiver.split(",") + self.smtp_cc.split(","),
    39                       self.mail_message_body().as_string())
    View Code

     日志封装(commonlog_utils.py)

     1 import os
     2 import logging
     3 import time
     4 from common.local_config_utils import local_config
     5 
     6 current_path = os.path.dirname(__file__)
     7 log_out_path = os.path.join(current_path, '..', local_config.LOG_PATH)
     8 
     9 
    10 class LogUtils():
    11     def __init__(self, log_path=log_out_path):
    12         self.log_name = os.path.join(log_out_path, 'APITest_%s.log' % time.strftime("%Y_%m_%d"))
    13         self.logger = logging.getLogger("APITest")
    14         self.logger.setLevel(local_config.LOG_LEVEL)
    15 
    16         console_handler = logging.StreamHandler()
    17         file_handler = logging.FileHandler(self.log_name, 'a', encoding='utf-8')
    18         formatter = logging.Formatter(
    19             '[%(asctime)s] %(filename)s->%(funcName)s line:%(lineno)d [%(levelname)s] : %(message)s'
    20             )
    21         console_handler.setFormatter(formatter)
    22         file_handler.setFormatter(formatter)
    23 
    24         self.logger.addHandler(console_handler)
    25         self.logger.addHandler(file_handler)
    26 
    27         console_handler.close()
    28         file_handler.close()
    29 
    30     def get_logger(self):
    31         return self.logger
    32 
    33 
    34 logger = LogUtils().get_logger()
    View Code
  • 相关阅读:
    Ant
    责任链模式
    日志logback
    知识点
    三个实例演示 Java Thread Dump 日志分析
    IDEA运行编译后配置文件无法找到,或配置文件修改后无效的问题
    IDEA创建MAVEN WEB工程
    多线程源码分析ThreadPoolExecutor
    解决
    微博关系服务与Redis的故事
  • 原文地址:https://www.cnblogs.com/ClownAlin/p/13378601.html
Copyright © 2011-2022 走看看