前言
在实际工作中,使用Excel存储自动化测试数据是很多公司的首选,一方面由于Excel比较直观,方便操作;另一方面Python读取Excel也比较方便,不需要写多少代码就可以读出所有的测试数据。但是,随着项目的业务日趋复杂,使用Excel中的一行作为一个测试用例,对于复杂的测试场景来说,涉及到几个甚至十几个接口的时候,分散的接口用例关联起来难度较大,比如注册接口在第二行,登录接口在第五行,选择商品接口在第二十行,这种散落在各个地方的接口找起来就特别费时间。因此,需要一种方式,将这些关联的接口合并一起,一个合并单元格作为一条用例,一条用例可能包含多个关联接口,这样整齐划一的方式极大的方便了后期的维护
扩展读取Excel类
对合并单元格的读取,要求对现有的读取Excel的类做一扩展。具体的思路是:通过self.sh.merged_cells
拿到合并单元格的行和列的范围,将这些行和列的范围添加到集合set中,分别命名为m_rows
和m_cols
,然后遍历m_rows
,拿到其中的每一个合并行范围,再对这个合并行范围做遍历,将合并行每一行对应的单元格的值组合成一个列表,通过列表生成式获取列表title
,再将合并单元格的列表和列表title
通过zip,dict转化成一个字典,添加到all_case_data
通过对最大行范围和合并行范围取差集,得到的就是非合并行,然后和前面类似,非合并行每一行的值都是列表,和列表title
转化成一个字典,添加到all_case_data
最后,你看到得到的all_case_data
是一个顺序混乱的列表字典的形式,将sorted中的key指定为列表中的字典的其中一个key:case_data["case_id"][0]
来升序排列(为了更好的排序,建议Excel中的case_id列采用001,002,003...这样的方式)
from openpyxl import load_workbook
from common.replace_variable import *
import json, re
class DoExcel:
def __init__(self, file_path):
self.file_path = file_path
self.wb = load_workbook(self.file_path)
self.sh = self.wb["case_datas"]
#读取所有测试数据
def read_all_caseData(self):
#得到最大行、最大列
max_row = self.sh.max_row
max_column = self.sh.max_column
#得到合并单元格的范围
m_rows = []
m_cols = []
for m_area in self.sh.merged_cells:
r1, r2, c1, c2 = m_area.min_row, m_area.max_row, m_area.min_col, m_area.max_col
m_rows.append((r1, r2))
m_cols.extend([c1, c2])
m_rows = set(m_rows)
m_cols = set(m_cols)
#获取所有的测试数据,将其以字典的形式存储在列表中
all_case_data = []
title = [self.sh.cell(1, column).value for column in range(1, max_column + 1)]
#获取合并行数据
for m_row in m_rows:
case_data = []
for column in range(1, max_column + 1):
for row in range(m_row[0], m_row[1] + 1):
if row == m_row[0]:
value = [[self.sh.cell(row, column).value]]
else:
for item in value:
item.append(self.sh.cell(row, column).value)
case_data.extend(value)
all_case_data.append(dict(zip(title, case_data)))
#最大行范围和合并范围求差集,就是非合并行
max_row_range = set(range(2, max_row + 1))
for m_row in m_rows:
m_row_range = set(range(m_row[0], m_row[1] + 1))
max_row_range = max_row_range.difference(m_row_range)
#获取非合并行数据
for row in max_row_range:
case_data = []
for column in range(1, max_column + 1):
value = [self.sh.cell(row, column).value]
case_data.append(value)
all_case_data.append(dict(zip(title, case_data)))
#排序:根据case_id的数字从大到小
all_case_data = sorted(all_case_data, key=lambda case_data: case_data["case_id"][0])
#对检查的字符串做字典的转化,最终以字典的形式存储在列表中
for case_data in all_case_data:
self._check_convert_dict(case_data)
print(all_case_data)
return all_case_data
#将检查项转化为字典
def _check_convert_dict(self, case_data):
for i in range(len(case_data["case_id"])):
check_list = case_data["check"]
new_check_list = []
for check in check_list:
new_check = {}
items = re.findall("($.*?):.*?"(.*?)"", check)
for item in items:
key, value = item
new_check[key] = value
new_check_list.append(new_check)
case_data["check"] = new_check_list
if __name__ == "__main__":
all_case_data = DoExcel(r"E:virtual_workshopPython_API est_datasapi_info.xlsx").read_all_caseData()
扩展测试类
拿到的all_case_data是列表字典的形式,其中的每个元素都是字典,每个字典的value都是列表,其中列表中的每一项元素都是合并单元格的每个单元格对应的值,就是这种形式
这就要求测试类test_my_request,在ddt的基础上,增加for循环。前面说过for循环遍历的N条用例都会默认是一条用例,正好将每一个合并单元格当做了一条用例对待
import unittest
import ddt
import json
from jsonpath import jsonpath
from common.do_excel import DoExcel
from common.replace_variable import ReplaceVariable
from common.context import Context
from common.my_request import *
from common.my_logging import *
from common.read_cfg import *
import logging
@ddt.ddt
class TestMyRequest(unittest.TestCase):
do_excel = DoExcel(r"{0}api_info.xlsx".format(testdatas_dir))
all_case_data = do_excel.read_all_caseData()
#读取配置文件
test_host, sql_host = get_host_section()
@ddt.data(*all_case_data)
def test_my_request(self, case_data):
logging.info("用例『{0}』测试开始了".format(case_data["case_desc"][0]))
for i in range(len(case_data["case_id"])):
#拼接url
if case_data["host"][i] == "test":
case_data["url"][i] = self.test_host + case_data["url"][i]
elif case_data["host"][i] == "sql":
case_data["url"][i] = self.sql_host + case_data["url"][i]
#替换请求参数
request_data = json.loads(ReplaceVariable.replace_variable(case_data["request_data"][i]))
logging.info("请求数据: {0}".format(request_data))
#请求数据反射
request_extract = case_data["request_extract"][i]
if request_extract:
key, express = request_extract.split("=")
setattr(Context, key, jsonpath(request_data, express)[0])
if case_data["method"][i].lower() == "get":
result = send_request(case_data["method"][i], case_data["url"][i], params=request_data)
elif case_data["method"][i].lower() == "post":
result = send_request(case_data["method"][i], case_data["url"][i], data=request_data)
logging.info("响应结果: {0}".format(result))
#响应数据反射
response_extract = case_data["response_extract"][i]
if response_extract:
key, express = response_extract.split("=")
setattr(Context, key, str(jsonpath(result, express)[0]))
#断言
for key, value in case_data["check"][i].items():
#替换期望结果
actual = str(jsonpath(result, key)[0])
expect = ReplaceVariable.replace_variable(value)
logging.info("实际结果: {0}".format(actual))
logging.info("期望结果: {0}".format(expect))
self.assertEqual(actual, expect)
logging.info("
")
数据库断言
常规的做法是,在Excel中写一条用例,通过PyMSQL读取,拿到数据后,再去响应结果里查找。这种方法很繁琐,因为你得解析响应结果。之前的接口框架也对请求数据和响应结果做了反射,为什么不利用这些数据呢?一个设想是,我把数据库的查询作为一个接口来测,使用Flask封装成接口,再通过Excel传入对应的sql语句,这个sql语句里需要的条件,我可能从上个接口里提取,提取到作为反射数据来用。举个例子,常规做法是我投资了200元,通过select语句查询这个人的投资记录,拿到投资记录的结果筛选出金额,再去响应结果里找,现在的做法是,我传入select * from t_user where uid=xxx and invest_money=200
,将这个作为请求参数,传给sql对应的查询接口,只要查询有结果就行,不需要再做额外的解析和比对
from flask import Flask, request
from common.read_mysql import ReadMysql
import json
app = Flask(__name__)
@app.route("/query/one", methods=["post"])
def query_one():
read_mysql = ReadMysql()
query_sql = request.form.get("query_sql")
one_data = read_mysql.select_one_data(query_sql)
if one_data:
return json.dumps({"code": "200", "result": str(one_data)})
else:
return json.dumps({"code": "404"})
app.run()