zoukankan      html  css  js  c++  java
  • Python3+Requests+Excel完整接口自动化测试框架的实现

    框架整体使用Python3+Requests+Excel:包含对实时token的获取

    1、------base

    -------runmethond.py

    runmethond:对不同的请求方式进行封装

    import json
    import requests
     
    requests.packages.urllib3.disable_warnings()
     
    class RunMethod:
      def post_main(self, url, data, header=None):
        res = None
        if header != None:
          res = requests.post(url=url, data=data, headers=header,verify=False)
        else:
          res = requests.post(url=url, data=data,verify=False)
        return res.json()
     
      def get_main(self, url, data=None, header=None):
        res = None
        if header != None:
          res = requests.get(url=url, params=data, headers=header, verify=False)
        else:
          res = requests.get(url=url, params=data, verify=False)
        return res.json()
     
      def run_main(self, method, url, data=None, header=None):
        res = None
        if method == 'Post':
          res = self.post_main(url, data, header)
        else:
          res = self.get_main(url, data, header)
        return json.dumps(res, indent=2, sort_keys=True, ensure_ascii=False)
     
     
    if __name__ == '__main__':
      url = 'http://httpbin.org/post'
      data = {
        'cart': '11'
      }
      run = RunMethod()
      run_test = run.run_main(method="Post", url=url, data=data)
      print(run_test)
    

      2、------data

    ------data_config.py

    data_config:获取excel模块中数据

    class global_val:
      Id = '0'
      request_name = '1'
      url = '2'
      run = '3'
      request_way = '4'
      header = '5'
      case_depend = '6'
      data_depend = '7'
      field_depend = '8'
      data = '9'
      expect = '10'
      result = '11'
     
     
    def get_id():
      """获取case_id"""
      return global_val.Id
     
     
    def get_request_name():
      """获取请求模块名称"""
      return global_val.request_name
     
     
    def get_url():
      """获取请求url"""
      return global_val.url
     
     
    def get_run():
      """获取是否运行"""
      return global_val.run
     
     
    def get_run_way():
      """获取请求方式"""
      return global_val.request_way
     
     
    def get_header():
      """获取是否携带header"""
      return global_val.header
     
     
    def get_case_depend():
      """case依赖"""
      return global_val.case_depend
     
     
    def get_data_depend():
      """依赖的返回数据"""
      return global_val.data_depend
     
     
    def get_field_depend():
      """数据依赖字段"""
      return global_val.field_depend
     
     
    def get_data():
      """获取请求数据"""
      return global_val.data
     
     
    def get_expect():
      """获取预期结果"""
      return global_val.expect
     
     
    def get_result():
      """获取返回结果"""
      return global_val.result
    

      3、-----data

    -----dependent_data.py

    dependent_data:解决数据依赖问题

    from util.operation_excel import OperationExcel
    from base.runmethod import RunMethod
    from data.get_data import GetData
    from jsonpath_rw import jsonpath, parse
    import json
     
     
    class DependentData:
      """解决数据依赖问题"""
     
      def __init__(self, case_id):
        self.case_id = case_id
        self.opera_excel = OperationExcel()
        self.data = GetData()
     
      def get_case_line_data(self):
        """
        通过case_id去获取该case_id的整行数据
        :param case_id: 用例ID
        :return:
        """
        rows_data = self.opera_excel.get_row_data(self.case_id)
        return rows_data
     
      def run_dependent(self):
        """
        执行依赖测试,获取结果
        :return:
        """
        run_method = RunMethod()
        row_num = self.opera_excel.get_row_num(self.case_id)
        request_data = self.data.get_data_for_json(row_num)
        # header = self.data.is_header(row_num)
        method = self.data.get_request_method(row_num)
        url = self.data.get_request_url(row_num)
        res = run_method.run_main(method, url, request_data)
        return json.loads(res)
     
      def get_data_for_key(self, row):
        """
        根据依赖的key去获取执行依赖case的响应然后返回
        :return:
        """
        depend_data = self.data.get_depend_key(row)
        response_data = self.run_dependent()
        return [match.value for match in parse(depend_data).find(response_data)][0]
    

      4、-----data

    -----get_data.py

    get_data:获取excel数据

    from util.operation_excel import OperationExcel
    from data import data_config
    from util.operation_json import OperationJson
     
     
    class GetData:
      """获取excel数据"""
     
      def __init__(self):
        self.opera_excel = OperationExcel()
     
      def get_case_lines(self):
        """获取excel行数,即case的个数"""
        return self.opera_excel.get_lines()
     
      def get_is_run(self, row):
        """获取是否执行"""
        flag = None
        col = int(data_config.get_run())
        run_model = self.opera_excel.get_cell_value(row, col)
        if run_model == 'yes':
          flag = True
        else:
          flag = False
        return flag
     
      def is_header(self, row):
        """
        是否携带header
        :param row: 行号
        :return:
        """
        col = int(data_config.get_header())
        header = self.opera_excel.get_cell_value(row, col)
        if header != '':
          return header
        else:
          return None
     
      def get_request_method(self, row):
        """
        获取请求方式
        :param row: 行号
        :return:
        """
        # col 列
        col = int(data_config.get_run_way())
        request_method = self.opera_excel.get_cell_value(row, col)
        return request_method
     
      def get_request_url(self, row):
        """
        获取url
        :param row: 行号
        :return:
        """
        col = int(data_config.get_url())
        url = self.opera_excel.get_cell_value(row, col)
        return url
     
      def get_request_data(self, row):
        """
        获取请求数据
        :param row:行号
        :return:
        """
        col = int(data_config.get_data())
        data = self.opera_excel.get_cell_value(row, col)
        if data == '':
          return None
        return data
     
      def get_data_for_json(self, row):
        """
        通过关键字拿到data数据
        :param row:
        :return:
        """
        opera_json = OperationJson()
        request_data = opera_json.get_data(self.get_request_data(row))
        return request_data
     
      def get_expcet_data(self, row):
        """
        获取预期结果
        :param row:
        :return:
        """
        col = int(data_config.get_expect())
        expect = self.opera_excel.get_cell_value(row, col)
        if expect == "":
          return None
        else:
          return expect
     
      def write_result(self, row, value):
        """
        写入结果数据
        :param row:
        :param col:
        :return:
        """
        col = int(data_config.get_result())
        self.opera_excel.write_value(row, col, value)
     
      def get_depend_key(self, row):
        """
        获取依赖数据的key
        :param row:行号
        :return:
        """
        col = int(data_config.get_data_depend())
        depend_key = self.opera_excel.get_cell_value(row, col)
        if depend_key == "":
          return None
        else:
          return depend_key
     
      def is_depend(self, row):
        """
        判断是否有case依赖
        :param row:行号
        :return:
        """
        col = int(data_config.get_case_depend()) # 获取是否存在数据依赖列
        depend_case_id = self.opera_excel.get_cell_value(row, col)
        if depend_case_id == "":
          return None
        else:
          return depend_case_id
     
      def get_depend_field(self, row):
        """
        获取依赖字段
        :param row:
        :return:
        """
        col = int(data_config.get_field_depend())
        data = self.opera_excel.get_cell_value(row, col)
        if data == "":
          return None
        else:
          return data
    

      5、-----dataconfig

    -----case.xls

    case.xls:用例数据

     6、-----dataconfig

    -----data.json

    data.json:请求数据,根据自己实际业务,且与case层的请求数据列是关联的

    {
     "user": {
      "username": "1111111",
      "password": "123456"
     },
     "filtrate": {
      "type_id": "2",
      "brand_id": "1",
      "model_id": "111"
     },
     "search": {
      "page": "1",
      "keyword": "oppo",
      "type": "12"
     },
     "token": {
      "token": ""
     }
    

      7、-----dataconfig

    -----token.json

    token.json:实时自动将获取的token写入到该文件

    {"data": {"token": "db6f0abee4e5040f5337f5c47a82879"}}
    

      8、-----main

    -----run_test.py

    run_test:主运行程序

    from base.runmethod import RunMethod
    from data.get_data import GetData
    from util.common_util import CommonUtil
    from data.dependent_data import DependentData
    # from util.send_email import SendEmail
    from util.operation_header import OperationHeader
    from util.operation_json import OperationJson
     
     
    class RunTest:
     
      def __init__(self):
        self.run_method = RunMethod()
        self.data = GetData()
        self.com_util = CommonUtil()
        # self.send_email = SendEmail()
     
      def go_on_run(self):
        """程序执行"""
        pass_count = []
        fail_count = []
        res = None
        # 获取用例数
        rows_count = self.data.get_case_lines()
        # 第一行索引为0
        for i in range(1, rows_count):
          is_run = self.data.get_is_run(i)
          if is_run:
            url = self.data.get_request_url(i)
            method = self.data.get_request_method(i)
            request_data = self.data.get_data_for_json(i)
            expect = self.data.get_expcet_data(i)
            header = self.data.is_header(i)
            depend_case = self.data.is_depend(i)
     
            if depend_case != None:
              self.depend_data = DependentData(depend_case)
              # 获取依赖的响应数据
              depend_response_data = self.depend_data.get_data_for_key(i)
              # 获取依赖的key
              depend_key = self.data.get_depend_field(i)
              # 更新请求字段
              request_data[depend_key] = depend_response_data
            # 如果header字段值为write则将该接口的返回的token写入到token.json文件,如果为yes则读取token.json文件
            if header == "write":
              res = self.run_method.run_main(method, url, request_data)
              op_header = OperationHeader(res)
              op_header.write_token()
            elif header == 'yes':
              op_json = OperationJson("../dataconfig/token.json")
              token = op_json.get_data('data')
              request_data = dict(request_data, **token) # 把请求数据与登录token合并,并作为请求数据
     
              res = self.run_method.run_main(method, url, request_data)
            else:
              res = self.run_method.run_main(method, url, request_data)
     
            if expect != None:
              if self.com_util.is_contain(expect, res):
                self.data.write_result(i, "Pass")
                pass_count.append(i)
              else:
                self.data.write_result(i, res)
                fail_count.append(i)
            else:
              print(f"用例ID:case-{i},预期结果不能为空")
     
        # 发送邮件
        # self.send_email.send_main(pass_count, fail_count)
     
        print(f"通过用例数:{len(pass_count)}")
        print(f"失败用例数:{len(fail_count)}")
     
     
    if __name__ == '__main__':
      run = RunTest()
      run.go_on_run()
    

      9、-----util

    -----common_util.py

    common_util:用于断言

    class CommonUtil:
      def is_contain(self, str_one, str_two):
        """
        判断一个字符串是否在另一个字符串中
        :param str_one:
        :param str_two:
        :return:
        """
        flag = None
        if str_one in str_two:
          flag = True
        else:
          flag = False
        return flag
    

      10、-----util

    -----operation_excel.py

    operation_excel:操作excel

    import xlrd
    from xlutils.copy import copy
     
     
    class OperationExcel:
      """操作excel"""
     
      def __init__(self, file_name=None, sheet_id=None):
        if file_name:
          self.file_name = file_name
          self.sheet_id = sheet_id
        else:
          self.file_name ='../dataconfig/case1.xls'
          self.sheet_id = 0
        self.data = self.get_data()
     
      def get_data(self):
        """
        获取sheets的内容
        :return:
        """
        data = xlrd.open_workbook(self.file_name)
        tables = data.sheets()[self.sheet_id]
        return tables
     
      def get_lines(self):
        """
        获取单元格行数
        :return:
        """
        tables = self.data
        return tables.nrows
     
      def get_cell_value(self, row, col):
        """
        获取单元格数据
        :param row: 行
        :param col: 列
        :return:
        """
        tables = self.data
        cell = tables.cell_value(row, col)
        return cell
     
      def write_value(self, row, col, value):
        """
        回写数据到excel
        :param row:行
        :param col:列
        :param value:值
        :return:
        """
        read_data = xlrd.open_workbook(self.file_name)
        write_data = copy(read_data)
        sheet_data = write_data.get_sheet(0)
        sheet_data.write(row, col, value)
        write_data.save(self.file_name)
     
      def get_row_data(self, case_id):
        """
        根据对应的case_id获取对应行的内容
        :param case_id: 用例id
        :return:
        """
        row_num = self.get_row_num(case_id)
        row_data = self.get_row_value(row_num)
        return row_data
     
      def get_row_num(self, case_id):
        """
        根据case_id获取对应行号
        :param case_id:
        :return:
        """
        num = 0
        cols_data = self.get_cols_data()
        for col_data in cols_data:
          if case_id in col_data:
            return num
          num = num + 1
     
      def get_row_value(self, row):
        """
         根据行号,找到该行的内容
        :param row:行号
        :return:
     
        """
        tables = self.data
        row_data = tables.row_values(row)
        return row_data
     
      def get_cols_data(self, col_id=None):
        """
        获取某一列的内容
        :param col_id:列号
        :return:
        """
        if col_id != None:
          cols = self.data.col_values(col_id)
        else:
          cols = self.data.col_values(0)
        return cols
     
     
    if __name__ == '__main__':
      opera = OperationExcel()
      opera.get_data()
      print(opera.get_data().nrows)
      print(opera.get_lines())
      print(opera.get_cell_value(1, 2))
    

      11、-----util

    -----operation_header.py

    operation_header:实时获取登录token及将token写入到token.json文件

    import json
    from util.operation_json import OperationJson
    from base.runmethod import RunMethod
    class OperationHeader:
     
      def __init__(self, response):
        self.response = json.loads(response)
     
      def get_response_token(self):
        '''
        获取登录返回的token
        '''
        token = {"data":{"token":self.response['data']['token']}}
        return token
     
      def write_token(self):
        op_json = OperationJson()
        op_json.write_data(self.get_response_token())
     
      
     
    if __name__ == '__main__':
     
      url = "http://xxxxx"
     
      data = {
        "username": "1111",
        "password": "123456"
      }
      run_method=RunMethod()
      # res = json.dumps(requests.post(url, data).json())
      res=run_method.run_main('Post', url, data)
      op = OperationHeader(res)
      op.write_token()
    

      12、-----util

    -----operation_json.py

    operation_json:操作json文件

    import json
     
     
    class OperationJson:
      """操作json文件"""
     
      def __init__(self,file_path=None):
        if file_path==None:
          self.file_path="../dataconfig/data.json"
        else:
          self.file_path=file_path
        self.data = self.read_data()
     
      def read_data(self):
        """
        读取json文件
        :param file_name:文件路径
        :return:
        """
        with open(self.file_path) as fp:
          data = json.load(fp)
          return data
     
      def get_data(self, id):
        """根据关键字获取对应数据"""
        return self.data[id]
     
      # 写入json
      def write_data(self, data):
        with open("../dataconfig/token.json", 'w') as fp:
          fp.write(json.dumps(data))
     
     
    if __name__ == '__main__':
      # file_path = "../dataconfig/data.json"
      opejson = OperationJson()
      print(opejson.read_data())
      print(opejson.get_data('filtrate'))
    

      

     

     

     

     

    import json
    import requests
     
    requests.packages.urllib3.disable_warnings()
     
    class RunMethod:
      def post_main(self, url, data, header=None):
        res = None
        if header != None:
          res = requests.post(url=url, data=data, headers=header,verify=False)
        else:
          res = requests.post(url=url, data=data,verify=False)
        return res.json()
     
      def get_main(self, url, data=None, header=None):
        res = None
        if header != None:
          res = requests.get(url=url, params=data, headers=header, verify=False)
        else:
          res = requests.get(url=url, params=data, verify=False)
        return res.json()
     
      def run_main(self, method, url, data=None, header=None):
        res = None
        if method == 'Post':
          res = self.post_main(url, data, header)
        else:
          res = self.get_main(url, data, header)
        return json.dumps(res, indent=2, sort_keys=True, ensure_ascii=False)
     
     
    if __name__ == '__main__':
      data = {
        'cart': '11'
      }
      run = RunMethod()
      run_test = run.run_main(method="Post", url=url, data=data)
      print(run_test)
  • 相关阅读:
    线程实现的两种方式
    webhook功能概述
    Linux 常用高频命令
    Mac安装Homebrew的正确姿势
    Mac 环境变量配置
    详解k8s零停机滚动发布微服务
    详解k8s原生的集群监控方案(Heapster+InfluxDB+Grafana)
    白话kubernetes的十万个为什么(持续更新中...)
    一个典型的kubernetes工作流程
    k8s实战之从私有仓库拉取镜像
  • 原文地址:https://www.cnblogs.com/chenlimei/p/13721586.html
Copyright © 2011-2022 走看看