一、框架结构:
工程目录
二、Case文件设计
三、基础包 base
3.1 封装get/post请求(runmethon.py)
1 import requests 2 import json 3 class RunMethod: 4 def post_main(self,url,data,header=None): 5 res = None 6 if header !=None: 7 res = requests.post(url=url,data=data,headers=header) 8 else: 9 res = requests.post(url=url,data=data) 10 return res.json() 11 12 def get_main(self,url,data=None,header=None): 13 res = None 14 if header !=None: 15 res = requests.get(url=url,data=data,headers=header,verify=False) 16 else: 17 res = requests.get(url=url,data=data,verify=False) 18 return res.json() 19 20 def run_main(self,method,url,data=None,header=None): 21 res = None 22 if method == 'Post': 23 res = self.post_main(url,data,header) 24 else: 25 res = self.get_main(url,data,header) 26 return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2)
3.2 封装mock(mock.py)
1 from mock import mock 2 #模拟mock 封装 3 def mock_test(mock_method,request_data,url,method,response_data): 4 mock_method = mock.Mock(return_value=response_data) 5 res = mock_method(url,method,request_data) 6 return res
四、数据操作包 operation_data
4.1 获取excel单元格中的内容(get_data.py)
1 #coding:utf-8 2 from tool.operation_excel import OperationExcel 3 import data_config 4 from tool.operation_json import OperetionJson 5 from tool.connect_db import OperationMysql 6 class GetData: 7 def __init__(self): 8 self.opera_excel = OperationExcel() 9 10 #去获取excel行数,就是case的个数 11 def get_case_lines(self): 12 return self.opera_excel.get_lines() 13 14 #获取是否执行 15 def get_is_run(self,row): 16 flag = None 17 col = int(data_config.get_run()) 18 run_model = self.opera_excel.get_cell_value(row,col) 19 if run_model == 'yes': 20 flag = True 21 else: 22 flag = False 23 return flag 24 25 #是否携带header 26 def is_header(self,row): 27 col = int(data_config.get_header()) 28 header = self.opera_excel.get_cell_value(row,col) 29 if header != '': 30 return header 31 else: 32 return None 33 34 #获取请求方式 35 def get_request_method(self,row): 36 col = int(data_config.get_run_way()) 37 request_method = self.opera_excel.get_cell_value(row,col) 38 return request_method 39 40 #获取url 41 def get_request_url(self,row): 42 col = int(data_config.get_url()) 43 url = self.opera_excel.get_cell_value(row,col) 44 return url 45 46 #获取请求数据 47 def get_request_data(self,row): 48 col = int(data_config.get_data()) 49 data = self.opera_excel.get_cell_value(row,col) 50 if data == '': 51 return None 52 return data 53 54 #通过获取关键字拿到data数据 55 def get_data_for_json(self,row): 56 opera_json = OperetionJson() 57 request_data = opera_json.get_data(self.get_request_data(row)) 58 return request_data 59 60 #获取预期结果 61 def get_expcet_data(self,row): 62 col = int(data_config.get_expect()) 63 expect = self.opera_excel.get_cell_value(row,col) 64 if expect == '': 65 return None 66 return expect 67 68 #通过sql获取预期结果 69 def get_expcet_data_for_mysql(self,row): 70 op_mysql = OperationMysql() 71 sql = self.get_expcet_data(row) 72 res = op_mysql.search_one(sql) 73 return res.decode('unicode-escape') 74 75 def write_result(self,row,value): 76 col = int(data_config.get_result()) 77 self.opera_excel.write_value(row,col,value) 78 79 #获取依赖数据的key 80 def get_depend_key(self,row): 81 col = int(data_config.get_data_depend()) 82 depent_key = self.opera_excel.get_cell_value(row,col) 83 if depent_key == "": 84 return None 85 else: 86 return depent_key 87 88 #判断是否有case依赖 89 def is_depend(self,row): 90 col = int(data_config.get_case_depend()) 91 depend_case_id = self.opera_excel.get_cell_value(row,col) 92 if depend_case_id == "": 93 return None 94 else: 95 return depend_case_id 96 97 #获取数据依赖字段 98 def get_depend_field(self,row): 99 col = int(data_config.get_field_depend()) 100 data = self.opera_excel.get_cell_value(row,col) 101 if data == "": 102 return None 103 else: 104 return data
4.2 获取excel中每个列(data_config.py)
1 #coding:utf-8 2 class global_var: 3 #case_id 4 Id = '0' 5 request_name = '1' 6 url = '2' 7 run = '3' 8 request_way = '4' 9 header = '5' 10 case_depend = '6' 11 data_depend = '7' 12 field_depend = '8' 13 data = '9' 14 expect = '10' 15 result = '11' 16 #获取caseid 17 def get_id(): 18 return global_var.Id 19 20 #获取url 21 def get_url(): 22 return global_var.url 23 24 def get_run(): 25 return global_var.run 26 27 def get_run_way(): 28 return global_var.request_way 29 30 def get_header(): 31 return global_var.header 32 33 def get_case_depend(): 34 return global_var.case_depend 35 36 def get_data_depend(): 37 return global_var.data_depend 38 39 def get_field_depend(): 40 return global_var.field_depend 41 42 def get_data(): 43 return global_var.data 44 45 def get_expect(): 46 return global_var.expect 47 48 def get_result(): 49 return global_var.result 50 51 def get_header_value(): 52 return global_var.header
4.3 解决数据依赖(dependent.py )
1 #coding:utf-8 2 import sys 3 import json 4 sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA') 5 from tool.operation_excel import OperationExcel 6 from base.runmethod import RunMethod 7 from operation_data.get_data import GetData 8 from jsonpath_rw import jsonpath,parse 9 class DependdentData: 10 def __init__(self,case_id): 11 self.case_id = case_id 12 self.opera_excel = OperationExcel() 13 self.data = GetData() 14 15 #通过case_id去获取该case_id的整行数据 16 def get_case_line_data(self): 17 rows_data = self.opera_excel.get_rows_data(self.case_id) 18 return rows_data 19 20 #执行依赖测试,获取结果 21 def run_dependent(self): 22 run_method = RunMethod() 23 row_num = self.opera_excel.get_row_num(self.case_id) 24 request_data = self.data.get_data_for_json(row_num) 25 #header = self.data.is_header(row_num) 26 method = self.data.get_request_method(row_num) 27 url = self.data.get_request_url(row_num) 28 res = run_method.run_main(method,url,request_data) 29 return json.loads(res) 30 31 #根据依赖的key去获取执行依赖测试case的响应,然后返回 32 def get_data_for_key(self,row): 33 depend_data = self.data.get_depend_key(row) 34 response_data = self.run_dependent() 35 json_exe = parse(depend_data) 36 madle = json_exe.find(response_data) 37 return [math.value for math in madle][0] 38 39 if __name__ == '__main__': 40 order = { 41 "data": { 42 "_input_charset": "utf-8", 43 "body": "京东订单-1710141907182334", 44 "it_b_pay": "1d", 45 "notify_url": "http://order.imooc.com/pay/notifyalipay", 46 "out_trade_no": "1710141907182334", 47 "partner": "2088002966755334", 48 "payment_type": "1", 49 "seller_id": "yangyan01@tcl.com", 50 "service": "mobile.securitypay.pay", 51 "sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQl
pr%2BaMmdjO30JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D", 52 "sign_type": "RSA", 53 "string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d¬ify_url=http://order.imooc.com/pay/
notifyalipay&out_trade_no=1710141907182334&partner=2088002966755334&payment_type=1&seller_id=yangyan01@
tcl.com&service=mobile.securitypay.pay&subject=京东订单-1710141907182334&total_fee=299&sign=kZBV53KuiUf5H
IrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr%2BaMmdjO30
JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D&sign_type=RSA", 54 "subject": "京东订单-1710141907182334", 55 "total_fee": 299 56 }, 57 "errorCode": 1000, 58 "errorDesc": "成功", 59 "status": 1, 60 "timestamp": 1507979239100 61 } 62 res = "data.out_trade_no" 63 json_exe = parse(res) 64 madle = json_exe.find(order) 65 print [math.value for math in madle][0]
五、工具类包 tool
5.1 操作excel (operation_excel.py)
1 #coding:utf-8 2 import xlrd 3 from xlutils.copy import copy 4 class OperationExcel: 5 def __init__(self,file_name=None,sheet_id=None): 6 if file_name: 7 self.file_name = file_name 8 self.sheet_id = sheet_id 9 else: 10 self.file_name = '../dataconfig/case1.xls' 11 self.sheet_id = 0 12 self.data = self.get_data() 13 14 #获取sheets的内容 15 def get_data(self): 16 data = xlrd.open_workbook(self.file_name) 17 tables = data.sheets()[self.sheet_id] 18 return tables 19 20 #获取单元格的行数 21 def get_lines(self): 22 tables = self.data 23 return tables.nrows 24 25 #获取某一个单元格的内容 26 def get_cell_value(self,row,col): 27 return self.data.cell_value(row,col) 28 29 #写入数据 30 def write_value(self,row,col,value): 31 ''' 32 写入excel数据 33 row,col,value 34 ''' 35 read_data = xlrd.open_workbook(self.file_name) 36 write_data = copy(read_data) 37 sheet_data = write_data.get_sheet(0) 38 sheet_data.write(row,col,value) 39 write_data.save(self.file_name) 40 41 #根据对应的caseid 找到对应行的内容 42 def get_rows_data(self,case_id): 43 row_num = self.get_row_num(case_id) 44 rows_data = self.get_row_values(row_num) 45 return rows_data 46 47 #根据对应的caseid找到对应的行号 48 def get_row_num(self,case_id): 49 num = 0 50 clols_data = self.get_cols_data() 51 for col_data in clols_data: 52 if case_id in col_data: 53 return num 54 num = num+1 55 56 57 #根据行号,找到该行的内容 58 def get_row_values(self,row): 59 tables = self.data 60 row_data = tables.row_values(row) 61 return row_data 62 63 #获取某一列的内容 64 def get_cols_data(self,col_id=None): 65 if col_id != None: 66 cols = self.data.col_values(col_id) 67 else: 68 cols = self.data.col_values(0) 69 return cols 70 71 72 if __name__ == '__main__': 73 opers = OperationExcel() 74 print opers.get_cell_value(1,2)
5.2判断字符串包含,判断字典是否相等(common_util.py)
1 #coding:utf-8 2 import json 3 class CommonUtil: 4 def is_contain(self,str_one,str_two): 5 ''' 6 判断一个字符串是否再另外一个字符串中 7 str_one:查找的字符串 8 str_two:被查找的字符串 9 ''' 10 flag = None 11 if isinstance(str_one,unicode): 12 str_one = str_one.encode('unicode-escape').decode('string_escape') 13 return cmp(str_one,str_two) 14 if str_one in str_two: 15 flag = True 16 else: 17 flag = False 18 return flag 19 20 21 def is_equal_dict(self,dict_one,dict_two): 22 ''' 23 判断两个字典是否相等 24 ''' 25 if isinstance(dict_one,str): 26 dict_one = json.loads(dict_one) 27 if isinstance(dict_two,str): 28 dict_two = json.loads(dict_two) 29 return cmp(dict_one,dict_two)
5.3 操作header(operation_herder.py)
1 #coding:utf-8 2 import requests 3 import json 4 from operation_json import OperetionJson 5 6 7 class OperationHeader: 8 9 def __init__(self,response): 10 self.response = json.loads(response) 11 12 def get_response_url(self): 13 ''' 14 获取登录返回的token的url 15 ''' 16 url = self.response['data']['url'][0] 17 return url 18 19 def get_cookie(self): 20 ''' 21 获取cookie的jar文件 22 ''' 23 url = self.get_response_url()+"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689" 24 cookie = requests.get(url).cookies 25 return cookie 26 27 def write_cookie(self): 28 cookie = requests.utils.dict_from_cookiejar(self.get_cookie()) 29 op_json = OperetionJson() 30 op_json.write_data(cookie) 31 32 if __name__ == '__main__': 33 34 url = "http://www.jd.com/passport/user/login" 35 data = { 36 "username":"18513199586", 37 "password":"111111", 38 "verify":"", 39 "referer":"https://www.jd.com" 40 } 41 res = json.dumps(requests.post(url,data).json()) 42 op_header = OperationHeader(res) 43 op_header.write_cookie()
5.4 操作json文件(operation_json.py)
1 #coding:utf-8 2 import json 3 class OperetionJson: 4 5 def __init__(self,file_path=None): 6 if file_path == None: 7 self.file_path = '../dataconfig/user.json' 8 else: 9 self.file_path = file_path 10 self.data = self.read_data() 11 12 #读取json文件 13 def read_data(self): 14 with open(self.file_path) as fp: 15 data = json.load(fp) 16 return data 17 18 #根据关键字获取数据 19 def get_data(self,id): 20 print type(self.data) 21 return self.data[id] 22 23 #写json 24 def write_data(self,data): 25 with open('../dataconfig/cookie.json','w') as fp: 26 fp.write(json.dumps(data)) 27 28 29 30 if __name__ == '__main__': 31 opjson = OperetionJson() 32 print opjson.get_data('shop')
5.5 操作数据库(connect_db.py)
1 #coding:utf-8 2 import MySQLdb.cursors 3 import json 4 class OperationMysql: 5 def __init__(self): 6 self.conn = MySQLdb.connect( 7 host='localhost', 8 port=3306, 9 user='root', 10 passwd='123456', 11 db='le_study', 12 charset='utf8', 13 cursorclass=MySQLdb.cursors.DictCursor 14 ) 15 self.cur = self.conn.cursor() 16 17 #查询一条数据 18 def search_one(self,sql): 19 self.cur.execute(sql) 20 result = self.cur.fetchone() 21 result = json.dumps(result) 22 return result 23 24 if __name__ == '__main__': 25 op_mysql = OperationMysql() 26 res = op_mysql.search_one("select * from web_user where Name='ailiailan'") 27 print res
5.6 发送报告邮件(send_email.py)
1 #coding:utf-8 2 import smtplib 3 from email.mime.text import MIMEText 4 class SendEmail: 5 global send_user 6 global email_host 7 global password 8 email_host = "smtp.163.com" 9 send_user = "jiaxiaonan666@163.com" 10 password = "jia_668" 11 def send_mail(self,user_list,sub,content): 12 user = "jiaxiaonan"+"<"+send_user+">" 13 message = MIMEText(content,_subtype='plain',_charset='utf-8') 14 message['Subject'] = sub 15 message['From'] = user 16 message['To'] = ";".join(user_list) 17 server = smtplib.SMTP() 18 server.connect(email_host) 19 server.login(send_user,password) 20 server.sendmail(user,user_list,message.as_string()) 21 server.close() 22 23 def send_main(self,pass_list,fail_list): 24 pass_num = float(len(pass_list)) 25 fail_num = float(len(fail_list)) 26 count_num = pass_num+fail_num 27 #90% 28 pass_result = "%.2f%%" %(pass_num/count_num*100) 29 fail_result = "%.2f%%" %(fail_num/count_num*100) 30 31 32 user_list = ['609037724@qq.com'] 33 sub = "接口自动化测试报告" 34 content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result ) 35 self.send_mail(user_list,sub,content) 36 37 if __name__ == '__main__': 38 sen = SendEmail() 39 sen.send_main([1,2,3,4],[2,3,4,5,6,7])
六、主函数
run_test.py
1 #coding:utf-8 2 import sys 3 sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA") 4 from base.runmethod import RunMethod 5 from operation_data.get_data import GetData 6 from tool.common_util import CommonUtil 7 from operation_data.dependent_data import DependdentData 8 from tool.send_email import SendEmail 9 from tool.operation_header import OperationHeader 10 from tool.operation_json import OperetionJson 11 class RunTest: 12 def __init__(self): 13 self.run_method = RunMethod() 14 self.data = GetData() 15 self.com_util = CommonUtil() 16 self.send_mai = SendEmail() 17 18 #程序执行的 19 def go_on_run(self): 20 res = None 21 pass_count = [] 22 fail_count = [] 23 #10 0,1,2,3 24 rows_count = self.data.get_case_lines() 25 for i in range(1,rows_count): 26 is_run = self.data.get_is_run(i) 27 if is_run: 28 url = self.data.get_request_url(i) 29 method = self.data.get_request_method(i) 30 request_data = self.data.get_data_for_json(i) 31 expect = self.data.get_expcet_data_for_mysql(i) 32 header = self.data.is_header(i) 33 depend_case = self.data.is_depend(i) 34 if depend_case != None: 35 self.depend_data = DependdentData(depend_case) 36 #获取的依赖响应数据 37 depend_response_data = self.depend_data.get_data_for_key(i) 38 #获取依赖的key 39 depend_key = self.data.get_depend_field(i) 40 request_data[depend_key] = depend_response_data 41 if header == 'write': 42 res = self.run_method.run_main(method,url,request_data) 43 op_header = OperationHeader(res) 44 op_header.write_cookie() 45 46 elif header == 'yes': 47 op_json = OperetionJson('../dataconfig/cookie.json') 48 cookie = op_json.get_data('apsid') 49 cookies = { 50 'apsid':cookie 51 } 52 res = self.run_method.run_main(method,url,request_data,cookies) 53 else: 54 res = self.run_method.run_main(method,url,request_data) 55 56 if self.com_util.is_equal_dict(expect,res) == 0: 57 self.data.write_result(i,'pass') 58 pass_count.append(i) 59 else: 60 self.data.write_result(i,res) 61 fail_count.append(i) 62 self.send_mai.send_main(pass_count,fail_count) 63 64 #将执行判断封装 65 #def get_cookie_run(self,header): 66 67 68 if __name__ == '__main__': 69 run = RunTest() 70 run.go_on_run()