zoukankan      html  css  js  c++  java
  • API 自动化框架

    API 自动化框架

      个人认为接口自动化测试使用python语言编写更加简单,但所有接口自动化项目代码的思维都是一样的

    1.项目包结构

    1.case:存放用例数据的包,将所有用例数据以配置文件形式传入

    2.core:核心包

      1)config.py:封装ConfigParser解析获取配置文件数据的方法

        python的内置模块ConfigParser:不太了解的可以百度

      2)log.py:封装log的模块

      3)request.py:封装接口测试的方法

      4)mysql.py:封装连接数据库的方法

    3.function:功能包,封装执行用例的方法,以及生成测试报告的方法

      生成测试报告的原理:使用file生成.md文件,使用pip install mkdocs安装mkdocs,本地服务器的话端口默认为8000

    4.report:存放所有生成的测试报告

    5.constant.py:存放所有全局变量的模块

    6.run.py:运行测试用例,以及生成测试报告的模块

    2.核心代码

     config.py

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 基础包:配置服务
    
    import ConfigParser
    import core.log as log
    
    config = ConfigParser.ConfigParser()
    logging = log.get_logger()
    
    def get_config(filename):
        """
        获取文件配置
        :param filename: 配置文件名
        :return: None
        """
        global config
        try:
            config.read(filename)
            return True
        except Exception, e:
            logging.error("读取配置失败 %s" % e)
    
    
    def get_data(title, key):
        """
        参数配置
        :param title: 配置文件的头信息
        :param key: 配置文件的key值
        :return: 配置文件的value
        """
        try:
            value = config.get(title, key)
            type(value)
            return value
        except Exception, e:
            logging.error("获取配置文件参数失败 %s" % e)
    
    
    def get_title_list():
        """
        获取所有title
        :return: title list
        """
        try:
            title = config.sections()
            return str(title).decode("string_escape")
            # return '
    '.join(title)
        except Exception, e:
            logging.error("获取title信息失败 %s", e)

      

    mysql.py

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # author: zhizhi
    # 基础包: MySQL
    
    import pymysql.cursors
    import core.log as log
    
    
    logging = log.get_logger()
    conn = None
    
    def connect(host, user, password, db, charset='utf8'):
        """
        链接Mysql
        :param host: 地址
        :param user: 用户
        :param password: 密码
        :param db: 数据库名
        :param charset: 数据类型
        :return: 链接
        """
        global conn
        if conn == None:
            conn = pymysql.connect(host=host,
                                   user=user,
                                   password=password,
                                   db=db,
                                   charset=charset,
                                   cursorclass=pymysql.cursors.DictCursor)
        return conn
    
    
    def execute(sql):
        """
        执行SQL
        :param sql: 执行的SQL
        :return: 影响行数
        """
        global conn
        try:
            with conn.cursor() as cursor:
                res = cursor.execute(sql)
            conn.commit()
            # 这里一定要写commit 不然提交的sql 都会被事务回滚
            return res
        except Exception, e:
            logging.error("sql is empty or error %s" % e)
    
    
    def close():
        """
        关闭MySQL连接
        :return: None
        """
        global conn
        conn.close()

    request.py

    #!/usr/bin/python
    #-*- coding: UTF-8 -*-
    # 基础包:接口测试的封装
    
    import requests
    import core.log as log
    import json
    
    
    
    logging = log.get_logger()
    
    def change_type(value):
        """
        对dict类型进行中文识别
        :param value: 传的数据值
        :return: 转码后的值
        """
        try:
            if isinstance(eval(value), str):
                return value
            if isinstance(eval(value), dict):
                result = eval(json.dumps(value))
                return result
        except Exception, e:
            logging.error("类型问题 %s", e)
    
    
    def api(method, url, data ,headers):
        """
        自定义一个接口测试的方法
        :param method: 请求类型
        :param url: 地址
        :param data: 数据
        :param headers: 请求头
        :return: success(str)
        """
        global results
        try:
            if method == ("post" or "POST"):
                results = requests.post(url, data, headers=headers,cookies=cookie)
            if method == ("get" or "GET"):
                results = requests.get(url, data, headers=headers,cookies=cookie)
    
            response = results.json()
            success = response.get("success")
            return success
        except Exception, e:
            logging.error("service is error", e)
    
    def set_cookieApi(method, url, data ,headers):
        """
        自定义一个登录接口测试的方法
        :param method: 请求类型
        :param url: 地址
        :param data: 数据
        :param headers: 请求头
        :return: success(bool)
        """
        global cookie
        try:
            if method == ("post" or "POST"):
                results = requests.post(url, data, headers=headers)
            if method == ("get" or "GET"):
                results = requests.get(url, data, headers=headers)
            response = results.json()
            success = str(response.get("success"))
            cookie = requests.utils.dict_from_cookiejar(results.cookies)
            return success
        except Exception, e:
            logging.info("LoginApi请求失败", e)

    func.py: 可以通过接口返回数据与自己写的sql查询的数据进行对比,判断用例是否通过,所以提供了mysql通用模块

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 业务包:通用函数
    
    
    import core.mysql as mysql
    import core.log as log
    import core.request as request
    import core.config as conf
    import constants as cs
    import os
    
    
    logging = log.get_logger()
    
    
    class ApiTest:
        """接口测试业务类"""
    
        def __init__(self):
            pass
    
        def prepare_data(self, host, user, password, db, sql):
            """
            数据准备,添加测试数据
            :param host: 服务地址
            :param user: 用户
            :param password: 密码
            :param db: 数据库名
            :param sql: 执行的SQL
            :return:
            """
            mysql.connect(host, user, password, db)
            res = mysql.execute(sql)
            mysql.close()
            logging.info("Run sql: the row number affected is %s" % res)
            return res
    
        def get_prepare_sql(self, filename, key):
            """
            获取预备执行的SQL
            :param title: 配置文件头信息
            :param key: 配置文件值
            :return: Value
            """
            try:
                conf.get_config(filename)
                value = conf.get_data(title=cs.TITLE, key=key)
                return value
            except Exception, e:
                logging.error("获取用例参数值失败 %s" % e)
    
        def new_report_menu(self, filename):
            """
            这个方法主要是通过写入文件的方法,先打开cs.YML_REPORT也就是
            mkdocs.yml文件,判断文件中是否存在当前写入的内容。
            :param filename: 测试用例文件
            :return: 测试报告内容
            """
            try:
                result = os.path.exists(cs.REPORT_PATH)
                if result == True:
                    conf.get_config(filename)
                    reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
                    report_name = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.R_NAME))
                    file = open(cs.YML_REPORT, 'r')
                    list_con = file.readlines()
                    content = str(list_con).decode("string_escape")
                    fileContent = "- %s"
                    row = "
    "
                    _content = fileContent % (reportName + cs.NOW)
                    con = row + _content
    
                    if _content not in content:
                        f = open(cs.YML_REPORT, 'a+')
                        f.write(con)
                    else:
                        logging.info("内容已经存在 %s" % _content)
            except Exception, e:
                logging.error("文件路径不存在 %s", e)
    
        def write_report(self, content):
            """
            这个方法用于书写测试报告从而解决之前的通过
            logging方式写入导致其他的日志无法实现写入
            :param content: 传入文件的内容
            :return: None
            """
            reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
            _reportName = reportName + cs.NOW
            filename = cs.REPORT_PATH + _reportName
            try:
                file = open(filename, 'a+')
                file.writelines(content)
            except Exception, e:
                logging.error("文件路径不存在 %s", e)
    
    
    
        def execute_case(self, filename):
            """
            执行接口测试用例的方法
            :param filename: 用例文件名称
            :return: 测试结果
            """
            conf.get_config(filename)
            list = eval(conf.get_title_list())
    
            for i in range(1, len(list)):
                title = list[i]
    
                number = conf.get_data(title, key=cs.NUMBER)
    
                name = conf.get_data(title, key=cs.NAME)
                method = conf.get_data(title, key=cs.METHOD)
                url = conf.get_data(title, key=cs.URL)
                data = conf.get_data(title, key=cs.DATA)
                _data = request.json.dumps(data,ensure_ascii=False,indent=4)
                headers = eval(conf.get_data(title, key=cs.HEADERS))
                # headers['Cookie']=cookie
                _headers = request.json.dumps(headers,ensure_ascii=False,indent=4)
    
                testUrl = cs.DOMAIN + url
                login=cs.LOGIN
                if(title==login):
                    actualCode = request.set_cookieApi(method, testUrl, data, headers)
                else:
                    actualCode = str(request.api(method, testUrl, data, headers))
    
                expectCode = str(conf.get_data(title, key=cs.CODE))
    
    
                if actualCode != expectCode:
                    logging.info("新增一条接口失败报告")
                    self.write_report(cs.API_TEST_FAIL % (name, number, method, testUrl, headers,data, expectCode, actualCode))
                else:
                    logging.info("新增一条接口成功报告")
                    self.write_report(cs.API_TEST_SUCCESS % (name, number, method, testUrl, headers,data, expectCode, actualCode))
    
        def run_test(self, filename):
            """
            普通接口测试类方法
            :param filename: 接口的用例name
            :return: 测试报告
            """
            reportName =eval( conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
    
            _filename = cs.REPORT_PATH + reportName + cs.NOW
            try:
                if os.path.exists(_filename):
                    os.remove(_filename)
                    self.execute_case(filename)
                else:
                    self.execute_case(filename)
            except Exception, e:
                logging.error("执行接口测试失败 %s", e)
    
        def write_report_result(self):
            """
            这个方法用于书写测试报告结果
            :return: None
            """
            reportName = eval(conf.get_data(title=cs.REPORT_NAME, key=cs.REPORT))
            _filename = cs.REPORT_PATH + reportName + cs.NOW
            try:
                f = file(_filename)
                content = f.read()
                if content != None:
                    _count = content.count("Number")
                    _fail = content.count("Case Fail")
                    _pass = content.count("Case Pass")
                    space = content.split('
    ')
                    space.insert(0,cs.RESULT_CONTENT % (_count, _pass, _fail))
                    _content_ = '
    '.join(space)
                    fp = file(_filename,'r+')
                    fp.write(_content_)
            except Exception, e:
                logging.error("文件路径不存在 %s", e)

    run.py

    # !/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 执行包:runscript
    
    import function.func as func
    
    ApiTest = func.ApiTest()
    
    FILENAME = 'login.ini'
    
    """1.新建测试报告目录"""
    ApiTest.new_report_menu(filename=func.cs.CASE_PATH+FILENAME)
    
    """2.执行测试用例"""
    ApiTest.run_test(filename=func.cs.CASE_PATH+FILENAME)
    
    """3.统计测试报告结果"""
    ApiTest.write_report_result()

    配置文件:login.ini

    [Test Report]
    report = 'Enterprise Version'
    reportName = 'Regression Testing Report'
    
    [login]
    number = 1
    name = login
    method = post
    url = /s1/web/login
    data = {data:{'userCode':'admin','password':'admin'}}
    headers = {'Content-Type': 'application/json;charset=UTF-8;'}
    code = True
    
    [brand]
    number = 2
    name = get_brand
    method = post
    url = /s1/brand/getList
    data = {"pageSize":10,"pageNum":1}
    headers = {'Content-Type': 'application/json; charset=UTF-8;'}
    code = True

     constant.py

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 脚本功能:全部变量
    
    import sys
    import time
    import os
    reload(sys)
    sys.setdefaultencoding('utf8')
    
    DOMAIN = 'http://****'
    
    REPORT_NAME = 'Test Report'
    TITLE = 'All Data prepare the SQL'
    
    METHOD = 'method'
    URL = 'url'
    DATA = 'data'
    NAME = 'name'
    NUMBER = 'number'
    CODE = 'code'
    HEADERS = 'headers'
    REPORT = 'report'
    R_NAME = 'reportName'
    LOGIN='login'
    
    REPORT_PATH = "../api4code/report/docs/"
    YML_REPORT = "../api4code/report/mkdocs.yml"
    
    CASE_PATH = "../api4code/case/"
    
    
    #测试报告内容
    API_TEST_FAIL = """
    ```
    %s: Case Fail
     Number: %s
     Method: %s
     Url: %s
     Headers:
     %s
     Data : 
     %s
     Expect : %s
     Actual : %s
    ```
    """
    
    API_TEST_SUCCESS = """
    ```
    %s: Case Pass
     Number: %s
     Method: %s
     Url: %s
     Headers:
     %s
     Data : 
     %s
     Expect : %s
     Actual : %s
    ```
    """
    
    #报告结果统计
    RESULT_CONTENT = """
    <p>Result:</p>
    <table border="3" width="500px">
      <tr>
        <th style="color: #787878">All</th>
        <th style="color: #3cc8b4">Pass</th>
        <th style="color: #FFB5C5">Fail</th>
      </tr>
      <tr>
        <th style="color: #787878">%s</th>
        <th style="color: #3cc8b4">%s</th>
        <th style="color: #FFB5C5">%s</th>
      </tr>
    </table>
    """
    
    NOW = '_' + time.strftime('%Y%m%d', time.localtime(time.time())) + '.md'
    PROJECT_TIME = time.strftime('%Y%m%d', time.localtime(time.time()))

     

     

  • 相关阅读:
    jquery 插件扩展2
    jquery 插件扩展
    call apply bind
    bom object
    js oop 封装
    js oop 继承
    js页面之间传参2
    js弹出新窗口的6中方法
    display Tag
    js中extends方法
  • 原文地址:https://www.cnblogs.com/shadow-yin/p/10678680.html
Copyright © 2011-2022 走看看