zoukankan      html  css  js  c++  java
  • 【Python自学】python_接口自动化测试框架(转))

    python_接口自动化测试框架

    本文总结分享介绍接口测试框架开发,环境使用python3+selenium3+unittest+ddt+requests测试框架及ddt数据驱动,采用Excel管理测试用例等集成测试数据功能,以及使用HTMLTestRunner来生成测试报告,目前有开源的poman、Jmeter等接口测试工具,为什么还要开发接口测试框架呢?因接口测试工具也有存在几点不足。

    • 测试数据不可控制。比如接口返回数据不可控,就无法自动断言接口返回的数据,不能断定是接口程序引起,还是测试数据变化引起的错误,所以需要做一些初始化测试数据。接口工具没有具备初始化测试数据功能,无法做到真正的接口测试自动化。
    • 无法测试加密接口。实际项目中,多数接口不是可以随便调用,一般情况无法摸拟和生成加密算法。如时间戳和MDB加密算法,一般接口工具无法摸拟。
    • 扩展能力不足。开源的接口测试工具无法实现扩展功能。比如,我们想生成不同格式的测试报告,想将测试报告发送到指定邮箱,又想让接口测试集成到CI中,做持续集成定时任务。

    测试框架处理流程

     

     测试框架处理过程如下:

    1. 首先初始化清空数据库表的数据,向数据库插入测试数据;
    2. 调用被测试系统提供的接口,先数据驱动读取excel用例一行数据;
    3. 发送请求数据,根据传参数据,向数据库查询得到对应的数据;
    4. 将查询的结果组装成JSON格式的数据,同时根据返回的数据值与Excel的值对比判断,并写入结果至指定Excel测试用例表格;
    5. 通过单元测试框架断言接口返回的数据,并生成测试报告,最后把生成最新的测试报告HTML文件发送指定的邮箱。

    测试框架结构目录介绍

     

    目录结构介绍如下:

    • config/:                    文件路径配置
    • database/:               测试用例模板文件及数据库和发送邮箱配置文件
    • db_fixture/:              初始化接口测试数据
    • lib/:                          程序核心模块。包含有excel解析读写、发送邮箱、发送请求、生成最新测试报告文件
    • package/:                存放第三方库包。如HTMLTestRunner,用于生成HTML格式测试报告
    • report/:                    生成接口自动化测试报告
    • testcase/:                用于编写接口自动化测试用例
    • run_demo.py:          执行所有接口测试用例的主程序
    • GitHub项目地址:    https://github.com/yingoja/DemoAPI

    数据库封装

    复制代码
     1 [tester]
     2 name = Jason
     3 
     4 [mysqlconf]
     5 host = 127.0.0.1
     6 port = 3306
     7 user = root
     8 password = 123456
     9 db_name = guest
    10 
    11 [user]
    12 # 发送邮箱服务器
    13 HOST_SERVER = smtp.163.com
    14 # 邮件发件人
    15 FROM = 111@163.com
    16 # 邮件收件人
    17 TO = 222@126.com
    18 # 发送邮箱用户名/密码
    19 user = aaa
    20 password = aaa
    21 # 邮件主题
    22 SUBJECT = 发布会系统接口自动化测试报告
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os,sys
     6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
     7 from config import setting
     8 from pymysql import connect,cursors
     9 from pymysql.err import OperationalError
    10 import configparser as cparser
    11 
    12 # --------- 读取config.ini配置文件 ---------------
    13 cf = cparser.ConfigParser()
    14 cf.read(setting.TEST_CONFIG,encoding='UTF-8')
    15 host = cf.get("mysqlconf","host")
    16 port = cf.get("mysqlconf","port")
    17 user = cf.get("mysqlconf","user")
    18 password = cf.get("mysqlconf","password")
    19 db = cf.get("mysqlconf","db_name")
    20 
    21 class DB:
    22     """
    23     MySQL基本操作
    24     """
    25     def __init__(self):
    26         try:
    27             # 连接数据库
    28             self.conn = connect(host = host,
    29                                 user = user,
    30                                 password = password,
    31                                 db = db,
    32                                 charset = 'utf8mb4',
    33                                 cursorclass = cursors.DictCursor
    34                                 )
    35         except OperationalError as e:
    36             print("Mysql Error %d: %s" % (e.args[0],e.args[1]))
    37 
    38    # 清除表数据
    39     def clear(self,table_name):
    40         real_sql = "delete from " + table_name + ";"
    41         with self.conn.cursor() as cursor:
    42              # 取消表的外键约束
    43             cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
    44             cursor.execute(real_sql)
    45         self.conn.commit()
    46 
    47     # 插入表数据
    48     def insert(self, table_name, table_data):
    49         for key in table_data:
    50             table_data[key] = "'"+str(table_data[key])+"'"
    51         key   = ','.join(table_data.keys())
    52         value = ','.join(table_data.values())
    53         real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")"
    54 
    55         with self.conn.cursor() as cursor:
    56             cursor.execute(real_sql)
    57         self.conn.commit()
    58 
    59     # 关闭数据库
    60     def close(self):
    61         self.conn.close()
    62 
    63     # 初始化数据
    64     def init_data(self, datas):
    65         for table, data in datas.items():
    66             self.clear(table)
    67             for d in data:
    68                 self.insert(table, d)
    69         self.close()
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import sys, time, os
     6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
     7 from db_fixture.mysql_db import DB
     8 
     9 # 定义过去时间
    10 past_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()-100000))
    11 # 定义将来时间
    12 future_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()+10000))
    13 
    14 # 创建测试数据
    15 datas = {
    16     # 发布会表数据
    17     'sign_event':[
    18         {'id':1,'name':'红米Pro发布会','`limit`':2000,'status':1,'address':'北京会展中心','start_time':future_time},
    19         {'id':2,'name':'苹果iphon6发布会','`limit`':1000,'status':1,'address':'宝安体育馆','start_time':future_time},
    20         {'id':3,'name':'华为荣耀8发布会','`limit`':2000,'status':0,'address':'深圳福田会展中心','start_time':future_time},
    21         {'id':4,'name':'苹果iphon8发布会','`limit`':2000,'status':1,'address':'深圳湾体育中心','start_time':past_time},
    22         {'id':5,'name':'小米5发布会','`limit`':2000,'status':1,'address':'北京国家会议中心','start_time':future_time},
    23     ],
    24     # 嘉宾表数据
    25     'sign_guest':[
    26         {'id':1,'realname':'Tom','phone':13511886601,'email':'alen@mail.com','sign':0,'event_id':1},
    27         {'id':2,'realname':'Jason','phone':13511886602,'email':'sign@mail.com','sign':1,'event_id':1},
    28         {'id':3,'realname':'Jams','phone':13511886603,'email':'tom@mail.com','sign':0,'event_id':5},
    29     ],
    30 }
    31 
    32 # 测试数据插入表
    33 def init_data():
    34     DB().init_data(datas)
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os,sys
     6 BASE_DIR = os.path.dirname(os.path.dirname(__file__))
     7 sys.path.append(BASE_DIR)
     8 
     9 # 配置文件
    10 TEST_CONFIG =  os.path.join(BASE_DIR,"database","config.ini")
    11 # 测试用例模板文件
    12 SOURCE_FILE = os.path.join(BASE_DIR,"database","DemoAPITestCase.xlsx")
    13 # excel测试用例结果文件
    14 TARGET_FILE = os.path.join(BASE_DIR,"report","excelReport","DemoAPITestCase.xlsx")
    15 # 测试用例报告
    16 TEST_REPORT = os.path.join(BASE_DIR,"report")
    17 # 测试用例程序文件
    18 TEST_CASE = os.path.join(BASE_DIR,"testcase")
    复制代码

    程序核心模块

    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os
     6 
     7 def new_report(testreport):
     8     """
     9     生成最新的测试报告文件
    10     :param testreport:
    11     :return:返回文件
    12     """
    13     lists = os.listdir(testreport)
    14     lists.sort(key=lambda fn: os.path.getmtime(testreport + "\" + fn))
    15     file_new = os.path.join(testreport,lists[-1])
    16     return file_new
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import xlrd
     6 
     7 class ReadExcel():
     8     """读取excel文件数据"""
     9     def __init__(self,fileName, SheetName="Sheet1"):
    10         self.data = xlrd.open_workbook(fileName)
    11         self.table = self.data.sheet_by_name(SheetName)
    12 
    13         # 获取总行数、总列数
    14         self.nrows = self.table.nrows
    15         self.ncols = self.table.ncols
    16     def read_data(self):
    17         if self.nrows > 1:
    18             # 获取第一行的内容,列表格式
    19             keys = self.table.row_values(0)
    20             listApiData = []
    21             # 获取每一行的内容,列表格式
    22             for col in range(1, self.nrows):
    23                 values = self.table.row_values(col)
    24                 # keys,values组合转换为字典
    25                 api_dict = dict(zip(keys, values))
    26                 listApiData.append(api_dict)
    27             return listApiData
    28         else:
    29             print("表格是空数据!")
    30             return None
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os,sys,json
     6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
     7 
     8 
     9 class SendRequests():
    10     """发送请求数据"""
    11     def sendRequests(self,s,apiData):
    12         try:
    13             #从读取的表格中获取响应的参数作为传递
    14             method = apiData["method"]
    15             url = apiData["url"]
    16             if apiData["params"] == "":
    17                 par = None
    18             else:
    19                 par = eval(apiData["params"])
    20             if apiData["headers"] == "":
    21                 h = None
    22             else:
    23                 h = eval(apiData["headers"])
    24             if apiData["body"] == "":
    25                 body_data = None
    26             else:
    27                 body_data = eval(apiData["body"])
    28             type = apiData["type"]
    29             v = False
    30             if type == "data":
    31                 body = body_data
    32             elif type == "json":
    33                 body = json.dumps(body_data)
    34             else:
    35                 body = body_data
    36 
    37             #发送请求
    38             re = s.request(method=method,url=url,headers=h,params=par,data=body,verify=v)
    39             return re
    40         except Exception as e:
    41             print(e)
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os,sys
     6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
     7 from config import setting
     8 import smtplib
     9 from lib.newReport import new_report
    10 import configparser
    11 from email.mime.text import MIMEText
    12 from email.mime.multipart import MIMEMultipart
    13 
    14 
    15 def send_mail(file_new):
    16     """
    17     定义发送邮件
    18     :param file_new:
    19     :return: 成功:打印发送邮箱成功;失败:返回失败信息
    20     """
    21     f = open(file_new,'rb')
    22     mail_body = f.read()
    23     f.close()
    24     #发送附件
    25     con = configparser.ConfigParser()
    26     con.read(setting.TEST_CONFIG,encoding='utf-8')
    27     report = new_report(setting.TEST_REPORT)
    28     sendfile = open(report,'rb').read()
    29     # --------- 读取config.ini配置文件 ---------------
    30     HOST = con.get("user","HOST_SERVER")
    31     SENDER = con.get("user","FROM")
    32     RECEIVER = con.get("user","TO")
    33     USER = con.get("user","user")
    34     PWD = con.get("user","password")
    35     SUBJECT = con.get("user","SUBJECT")
    36 
    37     att = MIMEText(sendfile,'base64','utf-8')
    38     att["Content-Type"] = 'application/octet-stream'
    39     att.add_header("Content-Disposition", "attachment", filename=("gbk", "", report))
    40 
    41     msg = MIMEMultipart('related')
    42     msg.attach(att)
    43     msgtext = MIMEText(mail_body,'html','utf-8')
    44     msg.attach(msgtext)
    45     msg['Subject'] = SUBJECT
    46     msg['from'] = SENDER
    47     msg['to'] = RECEIVER
    48 
    49     try:
    50         server = smtplib.SMTP()
    51         server.connect(HOST)
    52         server.starttls()
    53         server.login(USER,PWD)
    54         server.sendmail(SENDER,RECEIVER,msg.as_string())
    55         server.quit()
    56         print("邮件发送成功!")
    57     except Exception as  e:
    58         print("失败: " + str(e))
    复制代码
    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os,sys
     6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
     7 import shutil
     8 from config import setting
     9 from openpyxl import load_workbook
    10 from openpyxl.styles import Font,Alignment
    11 from openpyxl.styles.colors import RED,GREEN,DARKYELLOW
    12 import configparser as cparser
    13 
    14 # --------- 读取config.ini配置文件 ---------------
    15 cf = cparser.ConfigParser()
    16 cf.read(setting.TEST_CONFIG,encoding='UTF-8')
    17 name = cf.get("tester","name")
    18 
    19 class WriteExcel():
    20     """文件写入数据"""
    21     def __init__(self,fileName):
    22         self.filename = fileName
    23         if not os.path.exists(self.filename):
    24             # 文件不存在,则拷贝模板文件至指定报告目录下
    25             shutil.copyfile(setting.SOURCE_FILE,setting.TARGET_FILE)
    26         self.wb = load_workbook(self.filename)
    27         self.ws = self.wb.active
    28 
    29     def write_data(self,row_n,value):
    30         """
    31         写入测试结果
    32         :param row_n:数据所在行数
    33         :param value: 测试结果值
    34         :return: 无
    35         """
    36         font_GREEN = Font(name='宋体', color=GREEN, bold=True)
    37         font_RED = Font(name='宋体', color=RED, bold=True)
    38         font1 = Font(name='宋体', color=DARKYELLOW, bold=True)
    39         align = Alignment(horizontal='center', vertical='center')
    40         # 获数所在行数
    41         L_n = "L" + str(row_n)
    42         M_n = "M" + str(row_n)
    43         if value == "PASS":
    44             self.ws.cell(row_n, 12, value)
    45             self.ws[L_n].font = font_GREEN
    46         if value == "FAIL":
    47             self.ws.cell(row_n, 12, value)
    48             self.ws[L_n].font = font_RED
    49         self.ws.cell(row_n, 13, name)
    50         self.ws[L_n].alignment = align
    51         self.ws[M_n].font = font1
    52         self.ws[M_n].alignment = align
    53         self.wb.save(self.filename)
    复制代码

    接口测试用例编写

    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 import os,sys
     6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
     7 import unittest,requests,ddt
     8 from config import setting
     9 from lib.readexcel import ReadExcel
    10 from lib.sendrequests import SendRequests
    11 from lib.writeexcel import WriteExcel
    12 
    13 testData = ReadExcel(setting.SOURCE_FILE, "Sheet1").read_data()
    14 
    15 @ddt.ddt
    16 class Demo_API(unittest.TestCase):
    17     """发布会系统"""
    18     def setUp(self):
    19         self.s = requests.session()
    20 
    21     def tearDown(self):
    22         pass
    23 
    24     @ddt.data(*testData)
    25     def test_api(self,data):
    26         # 获取ID字段数值,截取结尾数字并去掉开头0
    27         rowNum = int(data['ID'].split("_")[2])
    28         # 发送请求
    29         re = SendRequests().sendRequests(self.s,data)
    30         # 获取服务端返回的值
    31         self.result = re.json()
    32         # 获取excel表格数据的状态码和消息
    33         readData_code = int(data["status_code"])
    34         readData_msg = data["msg"]
    35         if readData_code == self.result['status'] and readData_msg == self.result['message']:
    36             OK_data = "PASS"
    37             WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,OK_data)
    38         if readData_code != self.result['status'] or readData_msg != self.result['message']:
    39             NOT_data = "FAIL"
    40             WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,NOT_data)
    41         self.assertEqual(self.result['status'], readData_code, "返回实际结果是->:%s" % self.result['status'])
    42         self.assertEqual(self.result['message'], readData_msg, "返回实际结果是->:%s" % self.result['message'])
    43 
    44 if __name__=='__main__':
    45     unittest.main()
    复制代码

    集成测试报告

    复制代码
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 __author__ = 'YinJia'
     4 
     5 
     6 import os,sys
     7 sys.path.append(os.path.dirname(__file__))
     8 from config import setting
     9 import unittest,time
    10 from HTMLTestRunner import HTMLTestRunner
    11 from lib.sendmail import send_mail
    12 from lib.newReport import new_report
    13 from db_fixture import test_data
    14 from package.HTMLTestRunner import HTMLTestRunner
    15 
    16 def add_case(test_path=setting.TEST_CASE):
    17     """加载所有的测试用例"""
    18     discover = unittest.defaultTestLoader.discover(test_path, pattern='*API.py')
    19     return discover
    20 
    21 def run_case(all_case,result_path=setting.TEST_REPORT):
    22     """执行所有的测试用例"""
    23 
    24     # 初始化接口测试数据
    25     test_data.init_data()
    26 
    27     now = time.strftime("%Y-%m-%d %H_%M_%S")
    28     filename =  result_path + '/' + now + 'result.html'
    29     fp = open(filename,'wb')
    30     runner = HTMLTestRunner(stream=fp,title='发布会系统接口自动化测试报告',
    31                             description='环境:windows 7 浏览器:chrome',
    32                             tester='Jason')
    33     runner.run(all_case)
    34     fp.close()
    35     report = new_report(setting.TEST_REPORT) #调用模块生成最新的报告
    36     send_mail(report) #调用发送邮件模块
    37 
    38 if __name__ =="__main__":
    39     cases = add_case()
    40     run_case(cases)
    复制代码

    测试结果展示

    • HTML测试结果报告:

    • Excel测试用例结果

    • 邮件收到的测试报告

     
    分类: python项目
  • 相关阅读:
    《SQL初学者指南》——第1章 关系型数据库和SQL
    《SQL初学者指南》——第1章 关系型数据库和SQL
    快讯:Oracle自治事务处理数据库发布和19c路线图
    NoReverseMatch: u'polls' is not a registered namespace
    Tomcat配置图片保存路径,图片不保存在项目路径下
    请慎用java的File#renameTo(File)方法
    sentinel monitor mymaster 10.10.17.200 6379 1 1个哨兵同意就切换
    静默错误:Oracle 数据库是如何应对和处理的 ?
    如何对Node.js默认下载路径进行修改
    “我卖一个群可以赚2万,但这个项目,死了!”
  • 原文地址:https://www.cnblogs.com/conquerorren/p/14115068.html
Copyright © 2011-2022 走看看