zoukankan      html  css  js  c++  java
  • Python+requests+unittest+excel实现接口自动化测试框架

    一、框架结构:

     工程目录

    二、Case文件设计

    三、基础包 base

    3.1 封装get/post请求(runmethon.py)

    复制代码
    复制代码
     1 import requests
     2 import json
     3 class RunMethod:
     4     def post_main(self,url,data,header=None):
     5         res = None
     6         if header !=None:    
     7             res = requests.post(url=url,data=data,headers=header)
     8         else:
     9             res = requests.post(url=url,data=data)
    10         return res.json()
    11 
    12     def get_main(self,url,data=None,header=None):
    13         res = None
    14         if header !=None:    
    15             res = requests.get(url=url,data=data,headers=header,verify=False)
    16         else:
    17             res = requests.get(url=url,data=data,verify=False)
    18         return res.json()
    19 
    20     def run_main(self,method,url,data=None,header=None):
    21         res = None
    22         if method == 'Post':
    23             res = self.post_main(url,data,header)
    24         else:
    25             res = self.get_main(url,data,header)
    26         return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2) 
    复制代码
    复制代码

    3.2 封装mock(mock.py)

    复制代码
    复制代码
    1 from mock import mock
    2 #模拟mock 封装
    3 def mock_test(mock_method,request_data,url,method,response_data):
    4     mock_method = mock.Mock(return_value=response_data)
    5     res = mock_method(url,method,request_data)
    6     return res
    复制代码
    复制代码

    四、数据操作包 operation_data

    4.1 获取excel单元格中的内容(get_data.py)  

    复制代码
    复制代码
      1 #coding:utf-8
      2 from tool.operation_excel import OperationExcel
      3 import data_config
      4 from tool.operation_json import OperetionJson
      5 from tool.connect_db import OperationMysql
      6 class GetData:
      7     def __init__(self):
      8         self.opera_excel = OperationExcel()
      9 
     10     #去获取excel行数,就是case的个数    
     11     def get_case_lines(self):
     12         return self.opera_excel.get_lines()
     13 
     14     #获取是否执行
     15     def get_is_run(self,row):
     16         flag = None
     17         col = int(data_config.get_run())
     18         run_model = self.opera_excel.get_cell_value(row,col)
     19         if run_model == 'yes':
     20             flag = True
     21         else:
     22             flag = False
     23         return flag
     24 
     25     #是否携带header
     26     def is_header(self,row):
     27         col = int(data_config.get_header())
     28         header = self.opera_excel.get_cell_value(row,col)
     29         if header != '':
     30             return header
     31         else:
     32             return None
     33 
     34     #获取请求方式
     35     def get_request_method(self,row):
     36         col = int(data_config.get_run_way())
     37         request_method = self.opera_excel.get_cell_value(row,col)
     38         return request_method
     39 
     40     #获取url
     41     def get_request_url(self,row):
     42         col = int(data_config.get_url())
     43         url = self.opera_excel.get_cell_value(row,col)
     44         return url
     45 
     46     #获取请求数据
     47     def get_request_data(self,row):
     48         col = int(data_config.get_data())
     49         data = self.opera_excel.get_cell_value(row,col)
     50         if data == '':
     51             return None
     52         return data
     53 
     54     #通过获取关键字拿到data数据
     55     def get_data_for_json(self,row):
     56         opera_json = OperetionJson()
     57         request_data = opera_json.get_data(self.get_request_data(row))
     58         return request_data
     59 
     60     #获取预期结果
     61     def get_expcet_data(self,row):
     62         col = int(data_config.get_expect())
     63         expect = self.opera_excel.get_cell_value(row,col)
     64         if expect == '':
     65             return None
     66         return expect
     67 
     68     #通过sql获取预期结果
     69     def get_expcet_data_for_mysql(self,row):
     70         op_mysql = OperationMysql()
     71         sql = self.get_expcet_data(row)
     72         res = op_mysql.search_one(sql)
     73         return res.decode('unicode-escape')
     74 
     75     def write_result(self,row,value):
     76         col = int(data_config.get_result())
     77         self.opera_excel.write_value(row,col,value)
     78 
     79     #获取依赖数据的key
     80     def get_depend_key(self,row):
     81         col = int(data_config.get_data_depend())
     82         depent_key = self.opera_excel.get_cell_value(row,col)
     83         if depent_key == "":
     84             return None
     85         else:
     86             return depent_key
     87 
     88     #判断是否有case依赖
     89     def is_depend(self,row):
     90         col = int(data_config.get_case_depend())
     91         depend_case_id = self.opera_excel.get_cell_value(row,col)
     92         if depend_case_id == "":
     93             return None
     94         else:
     95             return depend_case_id
     96 
     97     #获取数据依赖字段
     98     def get_depend_field(self,row):
     99         col = int(data_config.get_field_depend())
    100         data = self.opera_excel.get_cell_value(row,col)
    101         if data == "":
    102             return None
    103         else:
    104             return data
    复制代码
    复制代码

    4.2 获取excel中每个列(data_config.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 class global_var:
     3     #case_id
     4     Id = '0'
     5     request_name = '1'
     6     url = '2'
     7     run = '3'
     8     request_way = '4'
     9     header = '5'
    10     case_depend = '6'
    11     data_depend = '7'
    12     field_depend = '8'
    13     data = '9'
    14     expect = '10'
    15     result = '11'
    16 #获取caseid
    17 def get_id():
    18     return global_var.Id
    19 
    20 #获取url
    21 def get_url():
    22     return global_var.url
    23 
    24 def get_run():
    25     return global_var.run
    26 
    27 def get_run_way():
    28     return global_var.request_way
    29 
    30 def get_header():
    31     return global_var.header
    32 
    33 def get_case_depend():
    34     return global_var.case_depend
    35 
    36 def get_data_depend():
    37     return global_var.data_depend
    38 
    39 def get_field_depend():
    40     return global_var.field_depend
    41 
    42 def get_data():
    43     return global_var.data
    44 
    45 def get_expect():
    46     return global_var.expect
    47 
    48 def get_result():
    49     return global_var.result
    50 
    51 def get_header_value():
    52     return global_var.header
    复制代码
    复制代码

    4.3 解决数据依赖(dependent.py )

    复制代码
    复制代码
     1 #coding:utf-8
     2 import sys
     3 import json
     4 sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA')
     5 from tool.operation_excel import OperationExcel
     6 from base.runmethod import RunMethod
     7 from operation_data.get_data import GetData
     8 from jsonpath_rw import jsonpath,parse
     9 class DependdentData:
    10     def __init__(self,case_id):
    11         self.case_id = case_id
    12         self.opera_excel = OperationExcel()
    13         self.data = GetData()
    14 
    15     #通过case_id去获取该case_id的整行数据
    16     def get_case_line_data(self):
    17         rows_data = self.opera_excel.get_rows_data(self.case_id)
    18         return rows_data
    19 
    20     #执行依赖测试,获取结果
    21     def run_dependent(self):
    22         run_method = RunMethod()
    23         row_num  = self.opera_excel.get_row_num(self.case_id)
    24         request_data = self.data.get_data_for_json(row_num)
    25         #header = self.data.is_header(row_num)
    26         method = self.data.get_request_method(row_num)
    27         url = self.data.get_request_url(row_num)
    28         res = run_method.run_main(method,url,request_data)
    29         return json.loads(res)
    30 
    31     #根据依赖的key去获取执行依赖测试case的响应,然后返回
    32     def get_data_for_key(self,row):
    33         depend_data = self.data.get_depend_key(row)
    34         response_data = self.run_dependent()
    35         json_exe = parse(depend_data)
    36         madle = json_exe.find(response_data)
    37         return [math.value for math in madle][0]
    38 
    39 if __name__ == '__main__':
    40     order = {
    41         "data": {
    42             "_input_charset": "utf-8", 
    43             "body": "京东订单-1710141907182334", 
    44             "it_b_pay": "1d", 
    45             "notify_url": "http://order.imooc.com/pay/notifyalipay", 
    46             "out_trade_no": "1710141907182334", 
    47             "partner": "2088002966755334", 
    48             "payment_type": "1", 
    49             "seller_id": "yangyan01@tcl.com", 
    50             "service": "mobile.securitypay.pay", 
    51             "sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQl
    pr%2BaMmdjO30JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D", 52 "sign_type": "RSA", 53 "string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d&notify_url=http://order.imooc.com/pay/
    notifyalipay&out_trade_no=1710141907182334&partner=2088002966755334&payment_type=1&seller_id=yangyan01@
    tcl.com&service=mobile.securitypay.pay&subject=京东订单-1710141907182334&total_fee=299&sign=kZBV53KuiUf5H
    IrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr%2BaMmdjO30
    JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D&sign_type=RSA", 54 "subject": "京东订单-1710141907182334", 55 "total_fee": 299 56 }, 57 "errorCode": 1000, 58 "errorDesc": "成功", 59 "status": 1, 60 "timestamp": 1507979239100 61 } 62 res = "data.out_trade_no" 63 json_exe = parse(res) 64 madle = json_exe.find(order) 65 print [math.value for math in madle][0]
    复制代码
    复制代码

    五、工具类包 tool

    5.1 操作excel (operation_excel.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 import xlrd
     3 from xlutils.copy import copy
     4 class OperationExcel:
     5     def __init__(self,file_name=None,sheet_id=None):
     6         if file_name:
     7             self.file_name = file_name
     8             self.sheet_id = sheet_id    
     9         else:
    10             self.file_name = '../dataconfig/case1.xls'
    11             self.sheet_id = 0
    12         self.data = self.get_data()
    13 
    14     #获取sheets的内容
    15     def get_data(self):
    16         data = xlrd.open_workbook(self.file_name)
    17         tables = data.sheets()[self.sheet_id]
    18         return tables
    19 
    20     #获取单元格的行数
    21     def get_lines(self):
    22         tables = self.data
    23         return tables.nrows
    24 
    25     #获取某一个单元格的内容
    26     def get_cell_value(self,row,col):
    27         return self.data.cell_value(row,col)
    28 
    29     #写入数据
    30     def write_value(self,row,col,value):
    31         '''
    32         写入excel数据
    33         row,col,value
    34         '''
    35         read_data = xlrd.open_workbook(self.file_name)
    36         write_data = copy(read_data)
    37         sheet_data = write_data.get_sheet(0)
    38         sheet_data.write(row,col,value)
    39         write_data.save(self.file_name)
    40 
    41     #根据对应的caseid 找到对应行的内容
    42     def get_rows_data(self,case_id):
    43         row_num = self.get_row_num(case_id)
    44         rows_data = self.get_row_values(row_num)
    45         return rows_data
    46 
    47     #根据对应的caseid找到对应的行号
    48     def get_row_num(self,case_id):
    49         num = 0
    50         clols_data = self.get_cols_data()
    51         for col_data in clols_data:
    52             if case_id in col_data:
    53                 return num
    54             num = num+1
    55 
    56 
    57     #根据行号,找到该行的内容
    58     def get_row_values(self,row):
    59         tables = self.data
    60         row_data = tables.row_values(row)
    61         return row_data
    62 
    63     #获取某一列的内容
    64     def get_cols_data(self,col_id=None):
    65         if col_id != None:
    66             cols = self.data.col_values(col_id)
    67         else:
    68             cols = self.data.col_values(0)
    69         return cols
    70 
    71 
    72 if __name__ == '__main__':
    73     opers = OperationExcel()
    74     print opers.get_cell_value(1,2)
    复制代码
    复制代码

    5.2判断字符串包含,判断字典是否相等(common_util.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 import json
     3 class CommonUtil:
     4     def is_contain(self,str_one,str_two):
     5         '''
     6         判断一个字符串是否再另外一个字符串中
     7         str_one:查找的字符串
     8         str_two:被查找的字符串
     9         '''
    10         flag = None
    11         if isinstance(str_one,unicode):
    12             str_one = str_one.encode('unicode-escape').decode('string_escape')
    13         return cmp(str_one,str_two)
    14         if str_one in str_two:
    15             flag = True
    16         else:
    17             flag = False
    18         return flag
    19 
    20 
    21     def is_equal_dict(self,dict_one,dict_two):
    22         '''
    23         判断两个字典是否相等
    24         '''
    25         if isinstance(dict_one,str):
    26             dict_one = json.loads(dict_one)
    27         if isinstance(dict_two,str):
    28             dict_two = json.loads(dict_two)
    29         return cmp(dict_one,dict_two)
    复制代码
    复制代码

    5.3 操作header(operation_herder.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 import requests
     3 import json
     4 from operation_json import OperetionJson
     5 
     6 
     7 class OperationHeader:
     8 
     9     def __init__(self,response):
    10         self.response = json.loads(response)
    11 
    12     def get_response_url(self):
    13         '''
    14         获取登录返回的token的url
    15         '''
    16         url = self.response['data']['url'][0]
    17         return url
    18 
    19     def get_cookie(self):
    20         '''
    21         获取cookie的jar文件
    22         '''
    23         url = self.get_response_url()+"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"
    24         cookie = requests.get(url).cookies
    25         return cookie
    26 
    27     def write_cookie(self):
    28         cookie = requests.utils.dict_from_cookiejar(self.get_cookie())
    29         op_json = OperetionJson()
    30         op_json.write_data(cookie)
    31         
    32 if __name__ == '__main__':
    33     
    34     url = "http://www.jd.com/passport/user/login"
    35     data = {
    36         "username":"18513199586",
    37         "password":"111111",
    38         "verify":"",
    39         "referer":"https://www.jd.com"
    40     }
    41     res = json.dumps(requests.post(url,data).json())
    42     op_header = OperationHeader(res)
    43     op_header.write_cookie()
    复制代码
    复制代码

    5.4 操作json文件(operation_json.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 import json
     3 class OperetionJson:
     4 
     5     def __init__(self,file_path=None):
     6         if file_path  == None:
     7             self.file_path = '../dataconfig/user.json'
     8         else:
     9             self.file_path = file_path
    10         self.data = self.read_data()
    11 
    12     #读取json文件
    13     def read_data(self):
    14         with open(self.file_path) as fp:
    15             data = json.load(fp)
    16             return data
    17 
    18     #根据关键字获取数据
    19     def get_data(self,id):
    20         print type(self.data)
    21         return self.data[id]
    22 
    23     #写json
    24     def write_data(self,data):
    25         with open('../dataconfig/cookie.json','w') as fp:
    26             fp.write(json.dumps(data))
    27 
    28 
    29 
    30 if __name__ == '__main__':
    31     opjson = OperetionJson()
    32     print opjson.get_data('shop')
    复制代码
    复制代码

    5.5 操作数据库(connect_db.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 import MySQLdb.cursors
     3 import json
     4 class OperationMysql:
     5     def __init__(self):
     6         self.conn = MySQLdb.connect(
     7             host='localhost',
     8             port=3306,
     9             user='root',
    10             passwd='123456',
    11             db='le_study',
    12             charset='utf8',
    13             cursorclass=MySQLdb.cursors.DictCursor
    14             )
    15         self.cur = self.conn.cursor()
    16 
    17     #查询一条数据
    18     def search_one(self,sql):
    19         self.cur.execute(sql)
    20         result = self.cur.fetchone()
    21         result = json.dumps(result)
    22         return result
    23 
    24 if __name__ == '__main__':
    25     op_mysql = OperationMysql()
    26     res = op_mysql.search_one("select * from web_user where Name='ailiailan'")
    27     print res
    复制代码
    复制代码

    5.6 发送报告邮件(send_email.py)

    复制代码
    复制代码
     1 #coding:utf-8
     2 import smtplib
     3 from email.mime.text import MIMEText
     4 class SendEmail:
     5     global send_user
     6     global email_host
     7     global password
     8     email_host = "smtp.163.com"
     9     send_user = "jiaxiaonan666@163.com"
    10     password = "jia_668"
    11     def send_mail(self,user_list,sub,content):
    12         user = "jiaxiaonan"+"<"+send_user+">"
    13         message = MIMEText(content,_subtype='plain',_charset='utf-8')
    14         message['Subject'] = sub
    15         message['From'] = user
    16         message['To'] = ";".join(user_list)
    17         server = smtplib.SMTP()
    18         server.connect(email_host)
    19         server.login(send_user,password)
    20         server.sendmail(user,user_list,message.as_string())
    21         server.close()
    22 
    23     def send_main(self,pass_list,fail_list):
    24         pass_num = float(len(pass_list))
    25         fail_num = float(len(fail_list))
    26         count_num = pass_num+fail_num
    27         #90%
    28         pass_result = "%.2f%%" %(pass_num/count_num*100)
    29         fail_result = "%.2f%%" %(fail_num/count_num*100)
    30 
    31 
    32         user_list = ['609037724@qq.com']
    33         sub = "接口自动化测试报告"
    34         content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result )
    35         self.send_mail(user_list,sub,content)
    36 
    37 if __name__ == '__main__':
    38     sen = SendEmail()
    39     sen.send_main([1,2,3,4],[2,3,4,5,6,7])
    复制代码
    复制代码

    六、主函数

    run_test.py

    复制代码
    复制代码
     1 #coding:utf-8
     2 import sys
     3 sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")
     4 from base.runmethod import RunMethod
     5 from operation_data.get_data import GetData
     6 from tool.common_util import CommonUtil
     7 from operation_data.dependent_data import DependdentData
     8 from tool.send_email import SendEmail
     9 from tool.operation_header import OperationHeader
    10 from tool.operation_json import OperetionJson
    11 class RunTest:
    12     def __init__(self):
    13         self.run_method = RunMethod()
    14         self.data = GetData()
    15         self.com_util = CommonUtil()
    16         self.send_mai = SendEmail()
    17 
    18     #程序执行的
    19     def go_on_run(self):
    20         res = None
    21         pass_count = []
    22         fail_count = []
    23         #10  0,1,2,3
    24         rows_count = self.data.get_case_lines()
    25         for i in range(1,rows_count):
    26             is_run = self.data.get_is_run(i)
    27             if is_run:
    28                 url = self.data.get_request_url(i)
    29                 method = self.data.get_request_method(i)
    30                 request_data = self.data.get_data_for_json(i)
    31                 expect = self.data.get_expcet_data_for_mysql(i)
    32                 header = self.data.is_header(i)
    33                 depend_case = self.data.is_depend(i)
    34                 if depend_case != None:
    35                     self.depend_data = DependdentData(depend_case)
    36                     #获取的依赖响应数据
    37                     depend_response_data = self.depend_data.get_data_for_key(i)
    38                     #获取依赖的key
    39                     depend_key = self.data.get_depend_field(i)
    40                     request_data[depend_key] = depend_response_data
    41                 if header == 'write':
    42                     res = self.run_method.run_main(method,url,request_data)
    43                     op_header = OperationHeader(res)
    44                     op_header.write_cookie()
    45 
    46                 elif header == 'yes':
    47                     op_json = OperetionJson('../dataconfig/cookie.json')
    48                     cookie = op_json.get_data('apsid')
    49                     cookies = {
    50                         'apsid':cookie
    51                     }
    52                     res = self.run_method.run_main(method,url,request_data,cookies)
    53                 else:
    54                     res = self.run_method.run_main(method,url,request_data)
    55 
    56                 if self.com_util.is_equal_dict(expect,res) == 0:
    57                     self.data.write_result(i,'pass')
    58                     pass_count.append(i)
    59                 else:
    60                     self.data.write_result(i,res)
    61                     fail_count.append(i)
    62         self.send_mai.send_main(pass_count,fail_count)
    63 
    64     #将执行判断封装
    65     #def get_cookie_run(self,header):
    66 
    67 
    68 if __name__ == '__main__':
    69     run = RunTest()
    70     run.go_on_run()
    喜欢我们自动化的小伙伴们,可以加入我们的技术交流扣扣群:929347797(里面有超多学习资料免费分享哟)
  • 相关阅读:
    【APIO2008】免费道路[最小生成树 kruskal]
    【2019.8.13】
    【矩阵】
    [POI2008]BLO-Blockade [tarjan 割点]
    poj1458 最长公共子序列 (动态规划)
    最长上升子序列
    poj1163 数字三角形 (动态规划)
    快速幂 (分治)
    求排列的逆序数(分治)
    快速排序 (分治)
  • 原文地址:https://www.cnblogs.com/w1121/p/15259872.html
Copyright © 2011-2022 走看看