一、jsonpath难点分析
dic = {
"error_code": 0,
"stu_info": [
{
"id": 2057,
"name": "xiaohei",
"sex": "nan",
"age": 29,
"addr": "beijing",
"grade": "tianxie",
"phone": "18712321234",
"gold": 100
}
]
}
import jsonpath
s = jsonpath.jsonpath(dic,'$..error_code')
print(s)
# seqs=['!=','>=','<=','=','<','>','in','notin']
# #list循环这种的话,写用例的时候简单
# s1='error_code=0,name!=xxx,age>18'
# #[error_code>=0 , name!=xxx,age>1]
# s1_list = s1.split(',')
# format_list=[]
# for s in s1_list:
# #erro_code=0 [error_code,0]
# #name!=xxx
# for seq in seqs:
# # = != >=
# if seq in s:
# key,value = s.split(seq)
# tmep = [key,seq,value]
# format_list.append(tmep)
# break
# #[[error_code,'=',0] ]
#
# print('format_list',format_list)
#s='erro_code:=:0,name:!=:xxx,age:>:18'
#这种写代码的时候简单
erro_code = 0
# error_code=0
# name!=xxx
# age>18
# < >= in <= notin
二、实现参数化
封装好的requests:
import requests
import nnlog
import os
from conf.setting import LOG_PATH
class MyRequest:
log_file_name = os.path.join(LOG_PATH,'MyRequest.log')#日子文件名
time_out = 10 #请求超时时间
def __init__(self,url,data=None,headers=None,file=None):
self.url = url
self.data = data
self.headers = headers
self.file = file
def post(self):
try:
req = requests.post(self.url,data=self.data,headers=self.headers,
files=self.file,timeout=self.time_out)
except Exception as e:
res = {"status":0,"data":e.args} #0代表请求失败
else:
try:
res = {"status":1,"data":req.json()} #1代表返回的json
except Exception as e:
res = {"staus":2,"data":req.text} #2代表返回不是json
log_str = 'url: %s 请求方式:post data:%s ,返回数据:%s'%(self.url,self.data,res)
self.write_log(log_str)
return res
def get(self):
try:
req = requests.get(self.url,params=self.data,headers=self.headers,timeout=self.time_out)
except Exception as e:
res = {"status":0,"data":e.args} #0代表请求失败
else:
try:
res = {"status":1,"data":req.json()} #1代表返回的json
except Exception as e:
res = {"staus":2,"data":req.text} #2代表返回不是json
log_str = 'url: %s get请求 data:%s ,返回数据:%s'%(self.url,self.data,res)
self.write_log(log_str)
return res
@classmethod
def write_log(cls,content):
log = nnlog.Logger(cls.log_file_name)
log.debug(content)
case operation:
import xlrd
from core.my_requests import MyRequest
def get_case(path):
'''
:param path: excel测试用例
:return: 二维数组,每一个里面是一条测试用例
'''
all_case = []
book = xlrd.open_workbook(path)
sheet = book.sheet_by_index(0)
for i in range(1,sheet.nrows):
row_data = sheet.row_values(i)[4:8]
all_case.append(row_data)
return all_case
def send_request(url,method,data,headers=None):
req = MyRequest(url,data,headers=headers)
if method.upper()=="POST":
res = req.post()
elif method.upper() =='GET':
res = req.get()
else:
res = {"data":"暂时不支持该方法!"}
return res['data']
tools方法:
import random
import string
import time
class ParseParam:
func_map = ['phone','email','id_card','cur_time','money']
#映射函数的
def __init__(self,param):
self.param = param
self.parse()
def phone(self):
phone_starts = ['134','181','138','177','150','132','188','186','189','130','170','153','155']
start = random.choice(phone_starts)
end = str(random.randint(0,99999999))
res = start+ end.zfill(8)
return res
def email(self):
email_end=['163.com','qq.com','126.com','sina.com']
end = random.choice(email_end)
start_str='ATP_test_'
email_start = ''.join(random.sample(string.ascii_letters+string.digits,6))
return start_str+email_start+'@'+end
def id_card(self):
'''这个产生身份证号的'''
return 410881199011212121
def cur_time(self):
return int(time.time())
def money(self):
return 10000
def parse(self):
for func in self.func_map:
temp = str(getattr(self,func)()) #手机号
#<phone>,186122323
#<email> ,sdfsdfs@163.com
self.param = self.param.replace('<%s>'%func,temp)
# s='abc'
# s.replace('abc','123')
def strToDict(self):
data ={}
pl =self.param.split(',')
for p in pl:
temp = p.split('=')
if len(temp)>1:
key,value =temp
data[key] =value
return data
if __name__ == '__main__':
param = 'username=niuhanyang'
'&phone=<phone>&email=<email>'
'&id_card=<id_card>&start_time='
'<cur_time>,balan=<money>'
p = ParseParam(param)
print(p.param)
# print(p.phone())
# res = getattr(p,'money') #获取一个对象里面的属性(方法、变量)
# # print(res())
# import os,requests
# res = hasattr(requests,'get')#判断某个模块、类下面有没有某个方法或者变量
# print(res)
parse_response
import jsonpath
class ResponseParse:
seqs = ['!=', '>=', '<=', '=', '<', '>', 'in', 'notin']
def __init__(self,response,check):
self.response = response
self.check = check
def format_check(self):
#格式化检查信息,分别列出key 运算符 实际结果
#会返回 [['error_code','=','0'],['name','!=','xxx']]
format_list = []
check_list = self.check.split(',')
for s in check_list:
for seq in self.seqs:
if seq in s:
if len(s.split(seq))>1:
key, value = s.split(seq)
temp = [key, seq, value]
format_list.append(temp)
break
return format_list
def get_real_value(self,key):
#从字典里面获取key对应的value
res = jsonpath.jsonpath(self.response,'$..%s'%key)
#$..%s这个是jsonpath这个模块的用法
if res:
return res[0]
return '找不到该key【%s】'%key
def operation_check(self,real,seq,hope):
#根据运算符判断结果
msg = "判断信息:%s %s %s "%(real,seq,hope)
real = str(real)#为了保持类型一致
if seq=='=':
status = real == hope
elif seq=='!=':
status = real != hope
elif seq =='in':
status = real in hope
elif seq =='notin':
status = real not in hope
else:
status,msg = self.num_check(real,seq,hope)
return status,msg
def num_check(self,real,seq,hope):
msg = "判断信息:%s %s %s "%(real,seq,hope)
try:
real=float(real)
hope=float(hope)
except Exception as e:
msg = "比较时出错,大小比较只能是数字类型!"
"%s %s %s"%(real,seq,hope)
status = False
else:
if seq=='>':
status = real > hope
elif seq =='<':
status = real < hope
elif seq == '<=':
status = real <= hope
else:
status = real >= hope
return status,msg
def check_res(self):
#校验所有的检查点
check_list = self.format_check()
#[['error_code','=','0'],['name','!=','xxx']]
all_msg=''
for check in check_list:
key,seq,hope = check
real = self.get_real_value(key)
status,msg = self.operation_check(real,seq,hope)
all_msg = all_msg+msg+' ' #累加提示信息
if status:
pass
else:
return '失败',all_msg
return '通过',all_msg
获取返回结果输出报告
import xlrd
from xlutils.copy import copy
import os
import datetime
from conf import setting
import yagmail
def make_today_dir():
#创建当天的文件夹,返回绝对路径
today = str(datetime.date.today())
#c:/xxx/xxx/atp/report/2018-11-24/测试用例.xls
abs_path = os.path.join(setting.REPORT_PATH,today)
#拼成当天的绝对路径
if os.path.exists(abs_path):
pass
else:
os.mkdir(abs_path)
return abs_path
def write_res(case_path,case_res):
#c:/xxx/xxx/atp/cases/测试用例.xls
#[ ['{"xdfsdf}','通过'],['{"xdfsdf}','失败'] ]
book = xlrd.open_workbook(case_path)
new_book = copy(book)
sheet = new_book.get_sheet(0)
for row,res in enumerate(case_res,1):
response,status = res
sheet.write(row,8,response)
sheet.write(row,9,status)
#写第8列和第9列
cur_date_dir = make_today_dir()#创建当前文件夹,并且返回绝对路径
file_name = os.path.split(case_path)[-1] #只获取到filename
cur_time = datetime.datetime.today().strftime('%H%M%S') #获取到当天时分秒
new_file_name = cur_time+'_'+file_name #165530_测试用例.xls
real_path = os.path.join(cur_date_dir,new_file_name)#拼路径
new_book.save(real_path)
return real_path
def send_mail(content,file_path=None):
#发邮件,传入邮件正文和附件
m = yagmail.SMTP(**setting.MAIL_INFO,)
subject = '接口测试报告_%s'%str(datetime.datetime.today())
m.send(subject=subject,to=setting.TO,contents=content,attachments=file_path)
#1、检查点解析完成
#2、取到实际的值
#
s='error_code=0,name!=xxx,age>18'
print(type(s[-1]))
{
"error_code": 0,
"name": "小黑",
"stu_info": {
"age": 18
}
}
串联起来
import os,sys
BAE_PATH = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
) #atp的目录
sys.path.insert(0,BAE_PATH)
from conf.setting import CASE_PATH
from core import case_operation,parse_param,parse_response
from core import tools
import glob
class RunCase:
content = '''
各位好!
本次测试结果:总共运行%s条用例,通过%s条,失败%s条。详细信息见附件。
'''
def get_excel(self):
#s='/Users/nhy/test*.xls'
for excel in glob.glob(os.path.join(CASE_PATH,'test*.xls')):
cases = case_operation.get_case(excel)#调用读取excel的函数
results = self.send_requests(cases) #发送请求,并校验结果
report_file_path = tools.write_res(excel,results)#写入结果
all_count = len(cases) #总共多条用例
fail_count = all_count - self.success_count
content = self.content%(all_count,self.success_count,fail_count)
tools.send_mail(content,report_file_path)
def send_requests(self,cases):
# #[[url,get,data,check],[url,get,data,check]]
self.success_count = 0
results = []
for case in cases:
url,method,param,check = case #获取到每条用例的参数
p = parse_param.ParseParam(param) #解析请求参数
data = p.strToDict()#请求参数转成字典
response = case_operation.send_request(url,method,data)#发请求
#下面这2行代码是判断用例执行是否通过的
p2 = parse_response.ResponseParse(response,check)
status, msg = p2.check_res()#调用写好的校验结果方法,
real_res = str(response)+' '+msg #是把校验的信息和返回的json拼到一起
results.append([real_res,status]) #这里面的小list是每一个用例运行的结果
if status == '通过':
self.success_count += 1 #统计成功的次数
return results #返回运行的结果
def main(self):
print('开始测试'.center(50,'*'))
self.get_excel()
print('测试结束'.center(50,'*'))
if __name__ == '__main__':
run = RunCase()
run.main()