zoukankan      html  css  js  c++  java
  • Python+requests+unittest+excel实现接口自动化测试框架

    Python+requests+unittest+excel实现接口自动化测试框架

    一、框架结构:

     工程目录

    二、Case文件设计

    三、基础包 base

    3.1 封装get/post请求(runmethon.py)

     import requests
     import json
     class RunMethod:
     def post_main(self,url,data,header=None):
     res = None
     if header !=None: 
     res = requests.post(url=url,data=data,headers=header)
     else:
     res = requests.post(url=url,data=data)
     return res.json()
    
     def get_main(self,url,data=None,header=None):
     res = None
     if header !=None: 
     res = requests.get(url=url,data=data,headers=header,verify=False)
     else:
     res = requests.get(url=url,data=data,verify=False)
     return res.json() 
     def run_main(self,method,url,data=None,header=None):
     res = None
     if method == 'Post':
     res = self.post_main(url,data,header)
     else:
     res = self.get_main(url,data,header)
     return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2) 
    

      

    3.2 封装mock(mock.py)

     from mock import mock
     #模拟mock 封装
     def mock_test(mock_method,request_data,url,method,response_data):
     mock_method = mock.Mock(return_value=response_data)
     res = mock_method(url,method,request_data)
     return res
    

      

    四、数据操作包 operation_data

    4.1 获取excel单元格中的内容(get_data.py)  

     #coding:utf-8
     from tool.operation_excel import OperationExcel
     import data_config
     from tool.operation_json import OperetionJson
     from tool.connect_db import OperationMysql
     class GetData:
     def __init__(self):
     self.opera_excel = OperationExcel()
     
     #去获取excel行数,就是case的个数 
     def get_case_lines(self):
     return self.opera_excel.get_lines()
     
     #获取是否执行
     def get_is_run(self,row):
     flag = None
     col = int(data_config.get_run())
     run_model = self.opera_excel.get_cell_value(row,col)
     if run_model == 'yes':
     flag = True
     else:
     flag = False
    return flag
     
     #是否携带header
    def is_header(self,row):
     col = int(data_config.get_header())
     header = self.opera_excel.get_cell_value(row,col)
     if header != '':
     return header
     else:
     return None
     
     #获取请求方式
     def get_request_method(self,row):
     col = int(data_config.get_run_way())
     request_method = self.opera_excel.get_cell_value(row,col)
     return request_method
     
     #获取url
     def get_request_url(self,row):
     col = int(data_config.get_url())
     url = self.opera_excel.get_cell_value(row,col)
     return url
     
     #获取请求数据
     def get_request_data(self,row):
     col = int(data_config.get_data())
     data = self.opera_excel.get_cell_value(row,col)
     if data == '':
     return None
     return data
     
     #通过获取关键字拿到data数据
     def get_data_for_json(self,row):
     opera_json = OperetionJson()
     request_data = opera_json.get_data(self.get_request_data(row))
     return request_data
     
     #获取预期结果
     def get_expcet_data(self,row):
     col = int(data_config.get_expect())
     expect = self.opera_excel.get_cell_value(row,col)
     if expect == '':
     return None
     return expect
     
     #通过sql获取预期结果
     def get_expcet_data_for_mysql(self,row):
     op_mysql = OperationMysql()
     sql = self.get_expcet_data(row)
     res = op_mysql.search_one(sql)
     return res.decode('unicode-escape')
    
     def write_result(self,row,value):
     col = int(data_config.get_result())
     self.opera_excel.write_value(row,col,value)
     
     #获取依赖数据的key
     def get_depend_key(self,row):
     col = int(data_config.get_data_depend())
     depent_key = self.opera_excel.get_cell_value(row,col)
     if depent_key == "":
     return None
     else:
     return depent_key
     
     #判断是否有case依赖
     def is_depend(self,row):
     col = int(data_config.get_case_depend())
     depend_case_id = self.opera_excel.get_cell_value(row,col)
     if depend_case_id == "":
     return None
     else:
     return depend_case_id
     
     #获取数据依赖字段
     def get_depend_field(self,row):
     col = int(data_config.get_field_depend())
     data = self.opera_excel.get_cell_value(row,col)
     if data == "":
     return None
     else:
     return data
    

      

    4.2 获取excel中每个列(data_config.py)

     #coding:utf-8
     class global_var:
     #case_id
     Id = '0'
     request_name = '1'
     url = '2'
     run = '3'
     request_way = '4'
     header = '5'
     case_depend = '6'
     data_depend = '7'
     field_depend = '8'
     data = '9'
     expect = '10'
     result = '11'
     #获取caseid
     def get_id():
     return global_var.Id
     
     #获取url
     def get_url():
     return global_var.url
     
     def get_run():
     return global_var.run
     
     def get_run_way():
     return global_var.request_way
    
     def get_header():
     return global_var.header
     
     def get_case_depend():
     return global_var.case_depend
     
     def get_data_depend():
     return global_var.data_depend
     
     def get_field_depend():
     return global_var.field_depend
     
     def get_data():
     return global_var.data
     
     def get_expect():
     return global_var.expect
    
     def get_result():
     return global_var.result
     
     def get_header_value():
     return global_var.header
    

      

    4.3 解决数据依赖(dependent.py )

     #coding:utf-8
     import sys
     import json
     sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA')
     from tool.operation_excel import OperationExcel
     from base.runmethod import RunMethod
     from operation_data.get_data import GetData
     from jsonpath_rw import jsonpath,parse
     class DependdentData:
     def __init__(self,case_id):
     self.case_id = case_id
     self.opera_excel = OperationExcel()
     self.data = GetData()
     
     #通过case_id去获取该case_id的整行数据
     def get_case_line_data(self):
     rows_data = self.opera_excel.get_rows_data(self.case_id)
     return rows_data
     
     #执行依赖测试,获取结果
     def run_dependent(self):
     run_method = RunMethod()
     row_num = self.opera_excel.get_row_num(self.case_id)
     request_data = self.data.get_data_for_json(row_num)
     #header = self.data.is_header(row_num)
     method = self.data.get_request_method(row_num)
     url = self.data.get_request_url(row_num)
     res = run_method.run_main(method,url,request_data)
     return json.loads(res)
     
     #根据依赖的key去获取执行依赖测试case的响应,然后返回
     def get_data_for_key(self,row):
     depend_data = self.data.get_depend_key(row)
     response_data = self.run_dependent()
     json_exe = parse(depend_data)
     madle = json_exe.find(response_data)
     return [math.value for math in madle][0]
     
     if __name__ == '__main__':
     order = {
     "data": {
     "_input_charset": "utf-8", 
     "body": "京东订单-1710141907182334", 
     "it_b_pay": "1d", 
     "notify_url": "http://order.imooc.com/pay/notifyalipay", 
     "out_trade_no": "1710141907182334", 
     "partner": "2088002966755334", 
     "payment_type": "1", 
     "seller_id": "yangyan01@tcl.com", 
    "service": "mobile.securitypay.pay", 
    "sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQl
    pr%2BaMmdjO30JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D", 
     "sign_type": "RSA", 
     "string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d&notify_url=http://order.imooc.com/pay/
    notifyalipay&out_trade_no=1710141907182334&partner=2088002966755334&payment_type=1&seller_id=yangyan01@
    tcl.com&service=mobile.securitypay.pay&subject=京东订单-1710141907182334&total_fee=299&sign=kZBV53KuiUf5H
    IrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr%2BaMmdjO30
    JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D&sign_type=RSA", 
     "subject": "京东订单-1710141907182334", 
     "total_fee": 299
     }, 
     "errorCode": 1000, 
     "errorDesc": "成功", 
     "status": 1, 
     "timestamp": 1507979239100
     }
     res = "data.out_trade_no"
     json_exe = parse(res)
     madle = json_exe.find(order)
     print [math.value for math in madle][0]
    

      

    五、工具类包 tool

    5.1 操作excel (operation_excel.py)

     #coding:utf-8
     import xlrd
     from xlutils.copy import copy
     class OperationExcel:
     def __init__(self,file_name=None,sheet_id=None):
     if file_name:
     self.file_name = file_name
     self.sheet_id = sheet_id 
     else:
     self.file_name = '../dataconfig/case1.xls'
     self.sheet_id = 0
     self.data = self.get_data()
     
     #获取sheets的内容
     def get_data(self):
     data = xlrd.open_workbook(self.file_name)
     tables = data.sheets()[self.sheet_id]
     return tables
     
     #获取单元格的行数
     def get_lines(self):
     tables = self.data
     return tables.nrows
     
     #获取某一个单元格的内容
     def get_cell_value(self,row,col):
     return self.data.cell_value(row,col)
     
     #写入数据
     def write_value(self,row,col,value):
     '''
     写入excel数据
     row,col,value
     '''
     read_data = xlrd.open_workbook(self.file_name)
     write_data = copy(read_data)
     sheet_data = write_data.get_sheet(0)
     sheet_data.write(row,col,value)
     write_data.save(self.file_name)
     
     #根据对应的caseid 找到对应行的内容
     def get_rows_data(self,case_id):
     row_num = self.get_row_num(case_id)
     rows_data = self.get_row_values(row_num)
     return rows_data
     
     #根据对应的caseid找到对应的行号
     def get_row_num(self,case_id):
     num = 0
     clols_data = self.get_cols_data()
     for col_data in clols_data:
     if case_id in col_data:
     return num
     num = num+1
     
     
     #根据行号,找到该行的内容
     def get_row_values(self,row):
     tables = self.data
     row_data = tables.row_values(row)
     return row_data
    
     #获取某一列的内容
     def get_cols_data(self,col_id=None):
     if col_id != None:
     cols = self.data.col_values(col_id)
     else:
     cols = self.data.col_values(0)
     return cols
     
     
     if __name__ == '__main__':
     opers = OperationExcel()
     print opers.get_cell_value(1,2)
    

      

    5.2判断字符串包含,判断字典是否相等(common_util.py)

     #coding:utf-8
     import json
     class CommonUtil:
     def is_contain(self,str_one,str_two):
     '''
     判断一个字符串是否再另外一个字符串中
     str_one:查找的字符串
    str_two:被查找的字符串
     '''
     flag = None
     if isinstance(str_one,unicode):
     str_one = str_one.encode('unicode-escape').decode('string_escape')
     return cmp(str_one,str_two)
     if str_one in str_two:
     flag = True
     else:
     flag = False
     return flag
     
     
     def is_equal_dict(self,dict_one,dict_two):
     '''
     判断两个字典是否相等
     '''
     if isinstance(dict_one,str):
     dict_one = json.loads(dict_one)
     if isinstance(dict_two,str):
     dict_two = json.loads(dict_two)
     return cmp(dict_one,dict_two)
    

      

    5.3 操作header(operation_herder.py)

     #coding:utf-8
     import requests
     import json
     from operation_json import OperetionJson
      class OperationHeader: 
     def __init__(self,response):
     self.response = json.loads(response)
      def get_response_url(self):
     '''
     获取登录返回的token的url
     '''
     url = self.response['data']['url'][0]
     return url
     
     def get_cookie(self):
     '''
     获取cookie的jar文件
     '''
     url = self.get_response_url()+"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"
     cookie = requests.get(url).cookies
     return cookie
     
     def write_cookie(self):
     cookie = requests.utils.dict_from_cookiejar(self.get_cookie())
     op_json = OperetionJson()
     op_json.write_data(cookie)
     
     if __name__ == '__main__':
     
     url = "http://www.jd.com/passport/user/login"
     data = {
     "username":"18513199586",
     "password":"111111",
     "verify":"",
     "referer":"https://www.jd.com"
     }
     res = json.dumps(requests.post(url,data).json())
     op_header = OperationHeader(res)
     op_header.write_cookie()
    

      

    5.4 操作json文件(operation_json.py)

     #coding:utf-8
     import json
     class OperetionJson:
    
      def __init__(self,file_path=None):
     if file_path == None:
     self.file_path = '../dataconfig/user.json'
     else:
     self.file_path = file_path
     self.data = self.read_data()
     
     #读取json文件
     def read_data(self):
     with open(self.file_path) as fp:
     data = json.load(fp)
     return data
     
     #根据关键字获取数据
     def get_data(self,id):
     print type(self.data)
     return self.data[id]
     
     #写json
     def write_data(self,data):
     with open('../dataconfig/cookie.json','w') as fp:
     fp.write(json.dumps(data))
      
     
     if __name__ == '__main__':
     opjson = OperetionJson()
     print opjson.get_data('shop')
    

      

    5.5 操作数据库(connect_db.py)

     #coding:utf-8
     import MySQLdb.cursors
     import json
     class OperationMysql:
     def __init__(self):
     self.conn = MySQLdb.connect(
     host='localhost',
     port=3306,
     user='root',
     passwd='123456',
     db='le_study',
     charset='utf8',
     cursorclass=MySQLdb.cursors.DictCursor
    )
     self.cur = self.conn.cursor()
     
     #查询一条数据
     def search_one(self,sql):
     self.cur.execute(sql)
     result = self.cur.fetchone()
     result = json.dumps(result)
     return result
     
     if __name__ == '__main__':
     op_mysql = OperationMysql()
     res = op_mysql.search_one("select * from web_user where Name='ailiailan'")
     print res
    

      

    5.6 发送报告邮件(send_email.py)

     #coding:utf-8
    import smtplib
     from email.mime.text import MIMEText
     class SendEmail:
     global send_user
     global email_host
     global password
     email_host = "smtp.163.com"
     send_user = "jiaxiaonan666@163.com"
     password = "jia_668"
     def send_mail(self,user_list,sub,content):
     user = "jiaxiaonan"+"<"+send_user+">"
     message = MIMEText(content,_subtype='plain',_charset='utf-8')
     message['Subject'] = sub
     message['From'] = user
     message['To'] = ";".join(user_list)
     server = smtplib.SMTP()
     server.connect(email_host)
     server.login(send_user,password)
     server.sendmail(user,user_list,message.as_string())
     server.close()
     
     def send_main(self,pass_list,fail_list):
     pass_num = float(len(pass_list))
     fail_num = float(len(fail_list))
     count_num = pass_num+fail_num
     #90%
     pass_result = "%.2f%%" %(pass_num/count_num*100)
     fail_result = "%.2f%%" %(fail_num/count_num*100)
     
     
     user_list = ['609037724@qq.com']
     sub = "接口自动化测试报告"
     content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result )
     self.send_mail(user_list,sub,content)
     
     if __name__ == '__main__':
     sen = SendEmail()
     sen.send_main([1,2,3,4],[2,3,4,5,6,7])
    

      

    六、主函数

    run_test.py
    
     #coding:utf-8
     import sys
     sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")
     from base.runmethod import RunMethod
     from operation_data.get_data import GetData
     from tool.common_util import CommonUtil
     from operation_data.dependent_data import DependdentData
     from tool.send_email import SendEmail
     from tool.operation_header import OperationHeader
     from tool.operation_json import OperetionJson
     class RunTest:
     def __init__(self):
     self.run_method = RunMethod()
     self.data = GetData()
     self.com_util = CommonUtil()
     self.send_mai = SendEmail()
     
     #程序执行的
     def go_on_run(self):
     res = None
     pass_count = []
     fail_count = []
     #10 0,1,2,3
     rows_count = self.data.get_case_lines()
     for i in range(1,rows_count):
     is_run = self.data.get_is_run(i)
     if is_run:
     url = self.data.get_request_url(i)
     method = self.data.get_request_method(i)
     request_data = self.data.get_data_for_json(i)
     expect = self.data.get_expcet_data_for_mysql(i)
     header = self.data.is_header(i)
     depend_case = self.data.is_depend(i)
     if depend_case != None:
     self.depend_data = DependdentData(depend_case)
     #获取的依赖响应数据
     depend_response_data = self.depend_data.get_data_for_key(i)
     #获取依赖的key
     depend_key = self.data.get_depend_field(i)
     request_data[depend_key] = depend_response_data
     if header == 'write':
     res = self.run_method.run_main(method,url,request_data)
     op_header = OperationHeader(res)
     op_header.write_cookie()
     
     elif header == 'yes':
     op_json = OperetionJson('../dataconfig/cookie.json')
     cookie = op_json.get_data('apsid')
     cookies = {
     'apsid':cookie
     }
     res = self.run_method.run_main(method,url,request_data,cookies)
     else:
     res = self.run_method.run_main(method,url,request_data)
     
     if self.com_util.is_equal_dict(expect,res) == 0:
     self.data.write_result(i,'pass')
     pass_count.append(i)
     else:
     self.data.write_result(i,res)
     fail_count.append(i)
     self.send_mai.send_main(pass_count,fail_count)
     
     #将执行判断封装
     #def get_cookie_run(self,header):
    
     if __name__ == '__main__':
     run = RunTest()
     run.go_on_run()
    

      

     
  • 相关阅读:
    python 项目实例
    flash教程
    flask request
    systemd-unit
    kubernets HA集群手动部署
    zookeeper(1)-简单介绍
    apache与nginx原理
    技术文章整理
    CMS垃圾回收器
    Zookeeper
  • 原文地址:https://www.cnblogs.com/chenlimei/p/11673441.html
Copyright © 2011-2022 走看看