zoukankan      html  css  js  c++  java
  • 【python大牛分享】python——接口自动化测试框架环境的使用

    本文总结分享介绍接口测试框架开发,环境使用python3+selenium3+unittest+ddt+requests测试框架及ddt数据驱动,采用Excel管理测试用例等集成测试数据功能,以及使用HTMLTestRunner来生成测试报告,目前有开源的poman、Jmeter等接口测试工具,为什么还要开发接口测试框架呢?因接口测试工具也有存在几点不足。

    • 测试数据不可控制。比如接口返回数据不可控,就无法自动断言接口返回的数据,不能断定是接口程序引起,还是测试数据变化引起的错误,所以需要做一些初始化测试数据。接口工具没有具备初始化测试数据功能,无法做到真正的接口测试自动化。
    • 无法测试加密接口。实际项目中,多数接口不是可以随便调用,一般情况无法摸拟和生成加密算法。如时间戳和MDB加密算法,一般接口工具无法摸拟。
    • 扩展能力不足。开源的接口测试工具无法实现扩展功能。比如,我们想生成不同格式的测试报告,想将测试报告发送到指定邮箱,又想让接口测试集成到CI中,做持续集成定时任务。

    测试框架处理过程如下:

    1. 首先初始化清空数据库表的数据,向数据库插入测试数据;
    2. 调用被测试系统提供的接口,先数据驱动读取excel用例一行数据;
    3. 发送请求数据,根据传参数据,向数据库查询得到对应的数据;
    4. 将查询的结果组装成JSON格式的数据,同时根据返回的数据值与Excel的值对比判断,并写入结果至指定Excel测试用例表格;
    5. 通过单元测试框架断言接口返回的数据,并生成测试报告,最后把生成最新的测试报告HTML文件发送指定的邮箱。

    测试框架结构目录介绍:

    目录结构介绍如下:

    • config/:                    文件路径配置
    • database/:               测试用例模板文件及数据库和发送邮箱配置文件
    • db_fixture/:              初始化接口测试数据
    • lib/:                          程序核心模块。包含有excel解析读写、发送邮箱、发送请求、生成最新测试报告文件
    • package/:                存放第三方库包。如HTMLTestRunner,用于生成HTML格式测试报告
    • report/:                    生成接口自动化测试报告
    • testcase/:                用于编写接口自动化测试用例
    • run_demo.py:          执行所有接口测试用例的主程序
    • GitHub项目地址:    https://github.com/yingoja/DemoAPI如果对python软件测试、接口测试、自动化测试、面试经验交流。感兴趣可以加软件测试交流:1079636098,还会有同行一起技术交流。

    数据库封装:

    [tester]
    name = Jason

    [mysqlconf]
    host = 127.0.0.1
    port = 3306
    user = root
    password = 123456
    db_name = guest

    [user]
    # 发送邮箱服务器
    HOST_SERVER = smtp.163.com
    # 邮件发件人
    FROM = 111@163.com
    # 邮件收件人
    TO = 222@126.com
    # 发送邮箱用户名/密码
    user = aaa
    password = aaa
    # 邮件主题
    SUBJECT = 发布会系统接口自动化测试报告

    config.ini

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os,sys
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    from config import setting
    from pymysql import connect,cursors
    from pymysql.err import OperationalError
    import configparser as cparser
    
    # --------- 读取config.ini配置文件 ---------------
    cf = cparser.ConfigParser()
    cf.read(setting.TEST_CONFIG,encoding='UTF-8')
    host = cf.get("mysqlconf","host")
    port = cf.get("mysqlconf","port")
    user = cf.get("mysqlconf","user")
    password = cf.get("mysqlconf","password")
    db = cf.get("mysqlconf","db_name")
    
    class DB:
        """
        MySQL基本操作
        """
        def __init__(self):
            try:
                # 连接数据库
                self.conn = connect(host = host,
                                    user = user,
                                    password = password,
                                    db = db,
                                    charset = 'utf8mb4',
                                    cursorclass = cursors.DictCursor
                                    )
            except OperationalError as e:
                print("Mysql Error %d: %s" % (e.args[0],e.args[1]))
    
       # 清除表数据
        def clear(self,table_name):
            real_sql = "delete from " + table_name + ";"
            with self.conn.cursor() as cursor:
                 # 取消表的外键约束
                cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
                cursor.execute(real_sql)
            self.conn.commit()
    
        # 插入表数据
        def insert(self, table_name, table_data):
            for key in table_data:
                table_data[key] = "'"+str(table_data[key])+"'"
            key   = ','.join(table_data.keys())
            value = ','.join(table_data.values())
            real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")"
    
            with self.conn.cursor() as cursor:
                cursor.execute(real_sql)
            self.conn.commit()
    
        # 关闭数据库
        def close(self):
            self.conn.close()
    
        # 初始化数据
        def init_data(self, datas):
            for table, data in datas.items():
                self.clear(table)
                for d in data:
                    self.insert(table, d)
            self.close()
    
    mysql_db.py
    

      

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import sys, time, os
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    from db_fixture.mysql_db import DB
    
    # 定义过去时间
    past_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()-100000))
    # 定义将来时间
    future_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()+10000))
    
    # 创建测试数据
    datas = {
        # 发布会表数据
        'sign_event':[
            {'id':1,'name':'红米Pro发布会','`limit`':2000,'status':1,'address':'北京会展中心','start_time':future_time},
            {'id':2,'name':'苹果iphon6发布会','`limit`':1000,'status':1,'address':'宝安体育馆','start_time':future_time},
            {'id':3,'name':'华为荣耀8发布会','`limit`':2000,'status':0,'address':'深圳福田会展中心','start_time':future_time},
            {'id':4,'name':'苹果iphon8发布会','`limit`':2000,'status':1,'address':'深圳湾体育中心','start_time':past_time},
            {'id':5,'name':'小米5发布会','`limit`':2000,'status':1,'address':'北京国家会议中心','start_time':future_time},
        ],
        # 嘉宾表数据
        'sign_guest':[
            {'id':1,'realname':'Tom','phone':13511886601,'email':'alen@mail.com','sign':0,'event_id':1},
            {'id':2,'realname':'Jason','phone':13511886602,'email':'sign@mail.com','sign':1,'event_id':1},
            {'id':3,'realname':'Jams','phone':13511886603,'email':'tom@mail.com','sign':0,'event_id':5},
        ],
    }
    
    # 测试数据插入表
    def init_data():
        DB().init_data(datas)
    
    test_data.py
    

      

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os,sys
    BASE_DIR = os.path.dirname(os.path.dirname(__file__))
    sys.path.append(BASE_DIR)
    
    # 配置文件
    TEST_CONFIG =  os.path.join(BASE_DIR,"database","config.ini")
    # 测试用例模板文件
    SOURCE_FILE = os.path.join(BASE_DIR,"database","DemoAPITestCase.xlsx")
    # excel测试用例结果文件
    TARGET_FILE = os.path.join(BASE_DIR,"report","excelReport","DemoAPITestCase.xlsx")
    # 测试用例报告
    TEST_REPORT = os.path.join(BASE_DIR,"report")
    # 测试用例程序文件
    TEST_CASE = os.path.join(BASE_DIR,"testcase")
    
    setting.py
    

      

    程序核心模块

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os
    
    def new_report(testreport):
        """
        生成最新的测试报告文件
        :param testreport:
        :return:返回文件
        """
        lists = os.listdir(testreport)
        lists.sort(key=lambda fn: os.path.getmtime(testreport + "\" + fn))
        file_new = os.path.join(testreport,lists[-1])
        return file_new
    
    netReport.py
    

      

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import xlrd
    
    class ReadExcel():
        """读取excel文件数据"""
        def __init__(self,fileName, SheetName="Sheet1"):
            self.data = xlrd.open_workbook(fileName)
            self.table = self.data.sheet_by_name(SheetName)
    
            # 获取总行数、总列数
            self.nrows = self.table.nrows
            self.ncols = self.table.ncols
        def read_data(self):
            if self.nrows > 1:
                # 获取第一行的内容,列表格式
                keys = self.table.row_values(0)
                listApiData = []
                # 获取每一行的内容,列表格式
                for col in range(1, self.nrows):
                    values = self.table.row_values(col)
                    # keys,values组合转换为字典
                    api_dict = dict(zip(keys, values))
                    listApiData.append(api_dict)
                return listApiData
            else:
                print("表格是空数据!")
                return None
    
    readexcel.py
    

      

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os,sys,json
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    
    
    class SendRequests():
        """发送请求数据"""
        def sendRequests(self,s,apiData):
            try:
                #从读取的表格中获取响应的参数作为传递
                method = apiData["method"]
                url = apiData["url"]
                if apiData["params"] == "":
                    par = None
                else:
                    par = eval(apiData["params"])
                if apiData["headers"] == "":
                    h = None
                else:
                    h = eval(apiData["headers"])
                if apiData["body"] == "":
                    body_data = None
                else:
                    body_data = eval(apiData["body"])
                type = apiData["type"]
                v = False
                if type == "data":
                    body = body_data
                elif type == "json":
                    body = json.dumps(body_data)
                else:
                    body = body_data
    
                #发送请求
                re = s.request(method=method,url=url,headers=h,params=par,data=body,verify=v)
                return re
            except Exception as e:
                print(e)
    
    sendrequests.py
    

     

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os,sys
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    from config import setting
    import smtplib
    from lib.newReport import new_report
    import configparser
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    
    
    def send_mail(file_new):
        """
        定义发送邮件
        :param file_new:
        :return: 成功:打印发送邮箱成功;失败:返回失败信息
        """
        f = open(file_new,'rb')
        mail_body = f.read()
        f.close()
        #发送附件
        con = configparser.ConfigParser()
        con.read(setting.TEST_CONFIG,encoding='utf-8')
        report = new_report(setting.TEST_REPORT)
        sendfile = open(report,'rb').read()
        # --------- 读取config.ini配置文件 ---------------
        HOST = con.get("user","HOST_SERVER")
        SENDER = con.get("user","FROM")
        RECEIVER = con.get("user","TO")
        USER = con.get("user","user")
        PWD = con.get("user","password")
        SUBJECT = con.get("user","SUBJECT")
    
        att = MIMEText(sendfile,'base64','utf-8')
        att["Content-Type"] = 'application/octet-stream'
        att.add_header("Content-Disposition", "attachment", filename=("gbk", "", report))
    
        msg = MIMEMultipart('related')
        msg.attach(att)
        msgtext = MIMEText(mail_body,'html','utf-8')
        msg.attach(msgtext)
        msg['Subject'] = SUBJECT
        msg['from'] = SENDER
        msg['to'] = RECEIVER
    
        try:
            server = smtplib.SMTP()
            server.connect(HOST)
            server.starttls()
            server.login(USER,PWD)
            server.sendmail(SENDER,RECEIVER,msg.as_string())
            server.quit()
            print("邮件发送成功!")
        except Exception as  e:
            print("失败: " + str(e))
    
    sendmail.py
    

      

     

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os,sys
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    import shutil
    from config import setting
    from openpyxl import load_workbook
    from openpyxl.styles import Font,Alignment
    from openpyxl.styles.colors import RED,GREEN,DARKYELLOW
    import configparser as cparser
    
    # --------- 读取config.ini配置文件 ---------------
    cf = cparser.ConfigParser()
    cf.read(setting.TEST_CONFIG,encoding='UTF-8')
    name = cf.get("tester","name")
    
    class WriteExcel():
        """文件写入数据"""
        def __init__(self,fileName):
            self.filename = fileName
            if not os.path.exists(self.filename):
                # 文件不存在,则拷贝模板文件至指定报告目录下
                shutil.copyfile(setting.SOURCE_FILE,setting.TARGET_FILE)
            self.wb = load_workbook(self.filename)
            self.ws = self.wb.active
    
        def write_data(self,row_n,value):
            """
            写入测试结果
            :param row_n:数据所在行数
            :param value: 测试结果值
            :return: 无
            """
            font_GREEN = Font(name='宋体', color=GREEN, bold=True)
            font_RED = Font(name='宋体', color=RED, bold=True)
            font1 = Font(name='宋体', color=DARKYELLOW, bold=True)
            align = Alignment(horizontal='center', vertical='center')
            # 获数所在行数
            L_n = "L" + str(row_n)
            M_n = "M" + str(row_n)
            if value == "PASS":
                self.ws.cell(row_n, 12, value)
                self.ws[L_n].font = font_GREEN
            if value == "FAIL":
                self.ws.cell(row_n, 12, value)
                self.ws[L_n].font = font_RED
            self.ws.cell(row_n, 13, name)
            self.ws[L_n].alignment = align
            self.ws[M_n].font = font1
            self.ws[M_n].alignment = align
            self.wb.save(self.filename)
    
    writeexcel.py
    

      

    接口测试用例编写

     testAPI.py

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    import os,sys
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    import unittest,requests,ddt
    from config import setting
    from lib.readexcel import ReadExcel
    from lib.sendrequests import SendRequests
    from lib.writeexcel import WriteExcel
    
    testData = ReadExcel(setting.SOURCE_FILE, "Sheet1").read_data()
    
    @ddt.ddt
    class Demo_API(unittest.TestCase):
        """发布会系统"""
        def setUp(self):
            self.s = requests.session()
    
        def tearDown(self):
            pass
    
        @ddt.data(*testData)
        def test_api(self,data):
            # 获取ID字段数值,截取结尾数字并去掉开头0
            rowNum = int(data['ID'].split("_")[2])
            # 发送请求
            re = SendRequests().sendRequests(self.s,data)
            # 获取服务端返回的值
            self.result = re.json()
            # 获取excel表格数据的状态码和消息
            readData_code = int(data["status_code"])
            readData_msg = data["msg"]
            if readData_code == self.result['status'] and readData_msg == self.result['message']:
                OK_data = "PASS"
                WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,OK_data)
            if readData_code != self.result['status'] or readData_msg != self.result['message']:
                NOT_data = "FAIL"
                WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,NOT_data)
            self.assertEqual(self.result['status'], readData_code, "返回实际结果是->:%s" % self.result['status'])
            self.assertEqual(self.result['message'], readData_msg, "返回实际结果是->:%s" % self.result['message'])
    
    if __name__=='__main__':
        unittest.main()
    
    testAPI.py
    

      

    集成测试报告

     run_demo.py

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __author__ = 'YinJia'
    
    
    import os,sys
    sys.path.append(os.path.dirname(__file__))
    from config import setting
    import unittest,time
    from HTMLTestRunner import HTMLTestRunner
    from lib.sendmail import send_mail
    from lib.newReport import new_report
    from db_fixture import test_data
    from package.HTMLTestRunner import HTMLTestRunner
    
    def add_case(test_path=setting.TEST_CASE):
        """加载所有的测试用例"""
        discover = unittest.defaultTestLoader.discover(test_path, pattern='*API.py')
        return discover
    
    def run_case(all_case,result_path=setting.TEST_REPORT):
        """执行所有的测试用例"""
    
        # 初始化接口测试数据
        test_data.init_data()
    
        now = time.strftime("%Y-%m-%d %H_%M_%S")
        filename =  result_path + '/' + now + 'result.html'
        fp = open(filename,'wb')
        runner = HTMLTestRunner(stream=fp,title='发布会系统接口自动化测试报告',
                                description='环境:windows 7 浏览器:chrome',
                                tester='Jason')
        runner.run(all_case)
        fp.close()
        report = new_report(setting.TEST_REPORT) #调用模块生成最新的报告
        send_mail(report) #调用发送邮件模块
    
    if __name__ =="__main__":
        cases = add_case()
        run_case(cases)
    
    run_demo.py
    

      

    测试结果展示

    • HTML测试结果报告:

    • Excel测试用例结果

    • 邮件收到的测试报告

  • 相关阅读:
    2020下第八周总结
    《程序员的自我修养》——阅读感悟1
    2020下第七周总结
    【基础组件11】hdfs与hbase
    【基础组件10】hadoop拓展(三)NameNode工作机制
    【基础组件9】hadoop入门(二)启动节点、集群、hdfs查看文件系统、清数据
    【基础组件8】hadoop入门(一)集群搭建/ HDFS-HA高可用搭建
    【基础组件7】flink入门(一)集群搭建、实时数据处理
    【基础组件6】kafkamanager安装部署+详细参数讲解+使用教程
    【基础组件5】kafka入门(一)集群搭建+常用命令+基本原理+存储分析
  • 原文地址:https://www.cnblogs.com/cemaxueyuan/p/12804739.html
Copyright © 2011-2022 走看看