1 类封装接口测试脚本
import requests,json class SendMain(): def __init__(self,url,method,headers=None,data=None): self.url = url self.headers = headers self.method = method self.data = data def send_get(self): res = requests.get(url=self.url) return res def send_post(self): res = requests.post(url=self.url, headers=self.headers, json=self.data) return res def send_main(self): if self.method == 'GET': res = self.send_get() else: res = self.send_post() return res if __name__ == '__main__': u ='http://111.230.173.51:2018/app' h ={ 'sid':'1010', 'partnerNo':'15601', 'Content-Type':'application/json', } c ={ "serviceHeader":{ "sessionId":"", "userId":"", "deviceId":"990009263463478", "osType":"1", "version":"", "lang":"" }, "serviceBody":{} } res=SendMain(u,'POST',h,c) response = res.send_main() print(json.loads(response.text))
2 unittest模块
class TestGetSession(unittest.TestCase): u = 'http://111.230.173.51:2018/app' h = { 'sid': '1010', 'partnerNo': '15601', 'Content-Type': 'application/json', } def setUp(self): pass def tearDown(self): pass def test_success(self): ''' 案例:成功创建会话 :return: ''' c = { "serviceHeader": { "sessionId": "", "userId": "", "deviceId": "9876543210", "osType": "1", "version": "", "lang": "" }, "serviceBody": {} } res = SendMain(self.u,'POST',self.h,c) R = res.send_main() r = json.loads(R.text,encoding='utf-8') self.assertEqual(r['msg'], '请求成功', '测试失败') def test_failure(self): ''' '设备号为空,会话创建失败' :return: ''' c = { "serviceHeader": { "sessionId": "", "userId": "", "deviceId": "", "osType": "1", "version": "", "lang": "" }, "serviceBody": {} } res = SendMain(self.u,'POST',self.h,c) R = res.send_main() r = json.loads(R.text,encoding='utf-8') self.assertEqual(r['msg'], '会话无效,请创建会话', '测试失败') if __name__ == '__main__': unittest.main(verbosity=2)
3 mock模块
将测试对象所依存的对象替换为虚构对象的库,该虚构对象的调用允许事后查看。
from mock import mock
R = res.send_main() # R = mock.Mock(return_value=data)
5 封装常量
class global_var:
#case_id
Id = '0'
request_name = '1'
url = '2'
run = '3'
request_way = '4'
header = '5'
case_depend = '6'
data_depend = '7'
field_depend = '8'
data = '9'
expect = '10'
result = '11'
#获取caseid
def get_id():
return global_var.Id
#获取url
def get_url():
return global_var.url
def get_run():
return global_var.run
def get_req_way():
return global_var.request_way
def get_header():
return global_var.header
def get_case_depend():
return global_var.case_depend
def get_data_depend():
return global_var.data_depend
def get_field_depend():
return global_var.field_depend
def get_data():
return global_var.data
def get_expect():
return global_var.expect
def get_result():
return global_var.result
def get_header_value():
return global_var.header
5 封装excel模块
import xlrd from xlutils.copy import copy class OperationExcel: def __init__(self,file_dir=None,sheet_id=None): if file_dir: self.file_dir = file_dir self.sheet_id = sheet_id else: self.file_dir = 'C:\Users\zhouxy\Desktop\case1.xls' self.sheet_id = 0 self.data =self.get_data() #获取到sheet页的内容 #获取sheet内容 def get_data(self): data = xlrd.open_workbook(self.file_dir) table = data.sheet_by_index(self.sheet_id) return table #获取单元格的行数 def get_lines(self): table = self.data return table.nrows #获取单元格内容 def get_cell_value(self,row,col): return self.data.cell_value(row,col) #根据行号获取某行内容 def get_row_value(self,row): return self.data.row_values(row) #根据列号获取某列内容 def get_col_value(self,col): return self.data.col_values(col) #根据caseid查找对应行的行号 def get_row_data(self,caseId): num = 0 cols_data = self.get_col_value(0) for col_data in cols_data: if caseId == col_data: return num num = num+1 #根据caseId查找对应行的内容 def get_col_data(self,caseId): row_num = self.get_row_data(caseId) row_data = self.get_row_value(row_num) return row_data #写入exce数据,使用xlutils.copy的方法 def write_value(self,row,col,value): read_data = xlrd.open_workbook(self.file_dir) write_data = copy(read_data) sheet_data = write_data.get_sheet(0) sheet_data.write(row, col, value) write_data.save(self.file_dir) if __name__ == '__main__': excel = OperationExcel() excel.write_value(1,1,'007')
6 封装json模块
import json class OperationJson: def __init__(self,file_path=None): if file_path == None: self.file_path = '某个路径' else: self.file_path = file_path self.data = self.get_json() #获取json文件 def read_json_data(self): with open(self.file_path,'r+') as f : data = json.loads(f) return data #根据key获取数据 def get_json_data(self,key): return self.data[key] #写入json文件 def write_json_data(self,data): with open(self.file_path,'a+') as f: f.write(json.dumps(data))
7 封装pymysql模块
import json import pymysql.cursors #python2.0中用使用MySQLdb class OperationMysql: def __init__(self): self.conn = pymysql.connect( host ='111.230.173.51', port =3306, user ='sit_was', passwd ='sit_was321', db ='sit_wasdb', charset='utf8' ) self.cur = self.conn.cursor() #使用cursor()方法创建一个游标对象 #查询一条数据 def search_one(self,sql): self.cur.execute(sql) #使用execute()方法执行查询,查询到第一条数据 res = self.cur.fetchone() #使用fetchone()方法获取单条数据 return res self.conn.close() #关闭数据库连接 if __name__ == '__main__': db = OperationMysql() res = db.search_one('SELECT * FROM WAS_WFT_ORDER where order_no ="58e2cf3007b84f4084ff51319db99dac"') print(res)
8 封装逾期结果对比
import json,operator class CommonUtil: #判断字符串1是否在字符串2中 def is_contain(self,str1,str2): flag = None if str1 in str2: flag = True else: flag = False return flag #判断两个字典是否相等 #isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()。 #除此之外,sinstance() 会认为子类是一种父类类型,考虑继承关系。 def is_equal_dict(self,dict1,dict2): if isinstance(dict1,str): dict1 = json.loads(dict1) if isinstance(dict2,str): dict2 = json.loads(dict2) return operator.eq(dict1,dict2) if __name__ == '__main__': a = CommonUtil() print(a.is_equal_dict(json.dumps({'user':'zhouxy'}),{'user':'zhouxy'})) print(type(json.dumps({'user':'zhouxy'}))) print(type({'user':'zhouxy'})) print(a.is_contain(json.dumps('zhou'),'zhouxy')) print(a.is_contain('zhou', 'zhouxy')) print(type(json.dumps('zhou'))) print(type('zhouxy'))
未解决json.dumps之后的字符串对比:
9 封装获取数据
from excel_get_session import OperationExcel from mysql_get_session import OperationMysql from json_get_session import OperationJson from config_get_session import global_var class GetData: def __init__(self): self.oper_excel = OperationExcel() #获取是否执行 def get_is_run(self,row): flag = None col = int(global_var.get_run()) is_run = self.oper_excel.get_cell_value(row,col) if is_run == 'yes': flag = True else: flag = False return flag #获取是否携带header def get_is_header(self,row): flag = None col = int(global_var.get_header()) is_header = self.oper_excel.get_cell_value(row,col) if is_header != '': flag = True else: flag = False return flag #获取请求方式 def get_request_method(self,row): col = int(global_var.get_req_way()) res_method = self.oper_excel.get_cell_value(row,col) return res_method # 获取url def get_request_url(self,row): col = int(global_var.get_url()) url = self.opera_excel.get_cell_value(row,col) return url # 获取请求数据 def get_request_data(self, row): col = int(global_var.get_data()) data = self.opera_excel.get_cell_value(row, col) if data == '': return None return data # 通过获取关键字拿到data数据 def get_data_for_json(self,row): opera_json = OperationJson() request_data = opera_json.get_data(self.get_request_data(row)) #请求数据是按关键字从json文件中获取 return request_data # 获取预期结果 def get_expect_data(self,row): col = int(global_var.get_expect()) expect = self.opera_excel.get_cell_value(row, col) if expect == '': return None return expect #通过sql获取预期结果 def get_expect_data_for_mysql(self,row): db = OperationMysql() sql = self.get_expect_data(row) #预期结果存储select语句 res = OperationMysql.search_one(sql) return res #将实际结果回写到excel def write_result(self,row,value): col = int(global_var.get_result()) self.oper_excel.write_value(row,col,value) # 获取依赖数据的key def get_depend_key(self,row): col = int(global_var.get_data_depend()) depent_key = self.opera_excel.get_cell_value(row,col) if depent_key == "": return None else: return depent_key # 判断是否有case依赖 def is_depend(self,row): col = int(global_var.get_case_depend()) depend_case_id = self.opera_excel.get_cell_value(row,col) if depend_case_id == "": return None else: return depend_case_id # 获取数据依赖字段 def get_depend_field(self,row): col = int(global_var.get_field_depend()) data = self.opera_excel.get_cell_value(row,col) if data == "": return None else: return data
10 封装获取依赖数据
jsonpath_rw的应用:
安装:pip install jsonpath_rw
导入模块:from jsonpath_rw import jsonpath,parse
match.value返回数据是一个list,我们要获取特定的值
jsonpath_expr = parse(‘foo[*].baz‘) data = {‘foo‘: [{‘baz‘: ‘news‘}, {‘baz‘: ‘music‘}]} print([match.value for match in jsonpath_expr.find(data)][0]) 运行结果: news
from excel_get_session import OperationExcel from data_get_session import GetData from get_session import SendMain import json from jsonpath_rw import jsonpath,parse class DependentData: def __init__(self,caseId): self.caseId = caseId self.opera_excel = OperationExcel() self.get_data = GetData() #执行依赖测试,获取结果 def run_depend(self): row_num = self.opera_excel.get_row_data(self.caseId) req_data = self.get_data.get_data_for_json(row_num) req_method = self.get_data.get_request_method(row_num) req_header = self.get_data.get_is_header(row_num) req_url = self.get_data.get_request_url(row_num) R = SendMain(req_url,req_method,req_header,req_data) r = R.send_main() return json.loads(r) #根据依赖的key执行case的响应 def get_depend_key(self,row): depend_key = self.get_data.get_depend_key(row) respouse = self.run_depend() jsonpath_expr = parse(respouse) return [match.value for match in jsonpath_expr.find(depend_key)][0]
11 执行测试用例
from get_session import SendMain from data_get_session import GetData from excel_get_session import OperationExcel from dependent_get_session import DependentData from util_get_session import CommonUtil class RunTest(): def __init__(self): self.send_main = SendMain() self.data = GetData def run_test(self): row_nums = OperationExcel.get_lines() for i in range(1,row_nums): is_run = self.data.get_is_run(i) if is_run: url = self.data.get_request_url(i) method = self.data.get_request_method(i) request_data = self.data.get_data_for_json(i) depend_case = self.data.is_depend(i) expect_res = self.data.get_expect_data_for_mysql(i) #依赖处理 if depend_case != None: self.depend_data = DependentData() #根据依赖case执行获取对应key值的响应 depend_value = self.depend_data.get_depend_key(i) #获取依赖的字段 depend_key = self.data.get_depend_field(i) request_data[depend_key] = depend_value res = self.send_main.send_main(url, method, request_data) else: res = self.send_main.send_main(url,method,request_data) #回写结果 if CommonUtil.is_equal_dict(expect_res,res): self.data.write_result(i,'pass') else: self.data.write_result(i,res)