zoukankan      html  css  js  c++  java
  • python--utp接口自动化测试框架

    自动化测试有两种驱动: 1、数据驱动 ,数据从excel/txt中来     

                                             2、代码驱动,有业务流程

    atp只能实现数据驱动

    utp能实现数据驱动和代码驱动

    注册---登录---抽奖  接口测试(代码驱动)

    test_cj.py

    import unittest
    from conf.setting import default_host
    from core.my_requests import MyRequest
    from core.tools import mysql,get_real_value
    from core.tools import r as redis
    
    class Cj(unittest.TestCase):
    
        password = 'aA123456'
        username = 'testhhh'
    
        def reg(self):
            '''注册接口'''
            url = '/api/user/user_reg'
            new_url = default_host + url
            data = {'username':self.username,'pwd':self.password,
                    'cpwd':self.password}
            r = MyRequest(new_url,data)
            result = r.post()
            self.assertEqual(1,result.get('status'),msg='注册接口不通,%s'%result.get('data'))
            #校验接口是不是通的
            error_code = result.get('data').get('error_code')
            self.assertEqual(0,error_code,msg='注册失败,%s'%result.get('data'))
            sql ='select * from app_myuser where username = "%s";'%self.username
            sql_res = mysql.execute_one(sql)#执行sql
            self.assertIsNotNone(sql_res) #判断数据库返回的是否为空
    
        def login(self):
            '''登录接口'''
            url  = default_host + '/api/user/login'
            data = {'username':self.username,'passwd':self.password}
            r = MyRequest(url,data)#发请求
            result = r.post()
            self.assertEqual(1,result.get('status'),msg='登录接口不通,%s'%result.get('data'))
            sign = get_real_value('sign',result)#从返回值里面取到sign的值
            self.assertIsNotNone(sign,msg='登录失败:%s'%result)#校验sign   校验的msg都是在失败的时候才会打印
            userid  = get_real_value('userId',result)#从返回值里面取到userid的值
            return userid,sign
    
        def choujiang(self):
            '''抽奖接口'''
            url = default_host+'/api/product/choice'
            userid,sign = self.login()
            data = {'userid':userid,'sign':sign}
            r = MyRequest(url,data)
            result = r.get()
            self.assertEqual(1, result.get('status'), msg='抽奖接口不通,%s' % result.get('data'))
            redis_key ='choujiang:%s'%self.username
            count = redis.get(redis_key) #操作redis,取key
            self.assertEqual('1',count,'抽奖次数错误%s'%result)   #redis中取出的count值是一个字符串
            sql='select count(*) as cishu  from app_record where user_id = %s ;'%userid
            cishu = mysql.execute_one(sql).get('cishu') #执行sql   execute_one(sql)返回的是一个字典,再从字典中取出cishu
            # {'cishu':1}
            self.assertEqual(1,cishu,'抽奖记录没有落到数据库里面!')   #校验抽奖次数和数据库中是否对的上
    
        def test_choujiang(self):
            '''抽奖流程测试'''
            self.reg()
            self.choujiang()
    
        @classmethod
        def tearDownClass(cls):
            ##数据清除的工作   保证回归测试时 username能重复使用
            sql='delete from app_myuser where username="%s" ;'%cls.username
            mysql.execute_one(sql)
            key='choujiang:%s'%cls.username
            redis.delete(key)
            print('测试数据清理完成。。')
    
    
    # 有很多表要清除时
    #1、先把以前的数据库备份
    #    mysqldump -uroot -p123456 -h192.168.1.13 db>/xxx/xx/a.sql
    # 2、执行自动化case
    # 3、把数据库恢复回来
    #    sql.excute('RENAME database main TO main_20181204;')
    #    sql.excute('create database main;')
    #    os.system('mysql -uroot -p123456 -h192.168.1.13 db>/xxx/xx/a.sql')
    View Code

    setting.py

    import os
    BAE_PATH  = os.path.dirname(
    os.path.dirname(os.path.abspath(__file__))
    ) #atp的目录
    
    LOG_PATH = os.path.join(BAE_PATH,'logs') #log目录
    CASE_PATH = os.path.join(BAE_PATH,'cases') #case目录
    REPORT_PATH = os.path.join(BAE_PATH,'report') #report目录
    SQL_PATH = os.path.join(BAE_PATH,'sql_file') #存放sql的目录
    DATA_PATH = os.path.join(BAE_PATH,'data') #存参数化文件的地方
    
    
    MAIL_INFO = {
        'user':'xxx@qq.com',
        'password':'rtcxxxqrdgjcd',
        'host':'smtp.qq.com',
        'smtp_ssl':True,#发件箱是qq邮箱的话,改成True
    }
    
    TO = ['lihui@meizu.com']
    
    HOSTS = {
        'QA':'http://api.nnzhp.cn',#测试环境
        'DEV':'http://dev.nnzhp.cn',#开发环境
        'PRE':'http://pre.nnzhp.cn' #预生产环境
    }
    
    
    mysql_info = {
        'host':'118.24.3.40',
        'port':3306,
        'user':'besttest',
        'password':'HK139bc',
        'db':'main',
        'charset':'utf8',
        'autocommit':True
    }
    #mysql 配置信息
    redis_info = {
        'host': '118.24.3.40',
        'password': 'HK139bc&*',
        'port': 6379,
        'db': 0,
        'decode_responses': True
    }
    
    
    
    default_host = HOSTS.get('QA')  #默认测试环境的地址
    View Code

    my_requests.py

    import requests
    import nnlog
    import os
    from conf.setting import LOG_PATH
    class MyRequest:
        log_file_name  = os.path.join(LOG_PATH,'MyRequest.log')#日子文件名
        time_out = 10 #请求超时时间
        def __init__(self,url,data=None,headers=None,file=None):
            self.url = url
            self.data = data
            self.headers = headers
            self.file = file
        def post(self):
            try:
                req = requests.post(self.url,data=self.data,headers=self.headers,
                                    files=self.file,timeout=self.time_out)
            except Exception as e:
                res = {"status":0,"data":e.args}  #0代表请求失败
            else:
                try:
                   res = {"status":1,"data":req.json()} #1代表返回的json
                except Exception as e:
                    res = {"status":2,"data":req.text} #2代表返回不是json
            log_str = 'url: %s 请求方式:post  data:%s ,返回数据:%s'%(self.url,self.data,res)
            self.write_log(log_str)
            return res
    
        def get(self):
            try:
                req = requests.get(self.url,params=self.data,headers=self.headers,timeout=self.time_out)
            except Exception as e:
                res = {"status":0,"data":e.args}  #0代表请求失败
            else:
                try:
                   res = {"status":1,"data":req.json()} #1代表返回的json
    
                except Exception as e:
                    res = {"status":2,"data":req.text} #2代表返回不是json
            log_str = 'url: %s get请求 data:%s ,返回数据:%s'%(self.url,self.data,res)
            self.write_log(log_str)
            return res
    
        @classmethod
        def write_log(cls,content):
            log = nnlog.Logger(cls.log_file_name)
            log.debug(content)
    View Code

    op_data.py

    import os
    from conf.setting import mysql_info,SQL_PATH
    from core.tools import mysql
    import datetime
    
    def get_mysql_info():   #从配置文件中取出数据库相关信息
        user = mysql_info.get('user')
        password = mysql_info.get('password')
        host = mysql_info.get('host')
        db = mysql_info.get('db')
        port = mysql_info.get('port')
        return user,password,host,db,port
    
    def bak_db():  #备份数据库
        user, password, host, db, port = get_mysql_info()
        sql_filename = datetime.datetime.now().strftime('%Y%m%d%H%M%S')+'.sql'  #sql文件名称
        sql_filename = os.path.join(SQL_PATH,sql_filename)
        command = 'mysqldump -u{user} -p{pwd} -P{port} -h{host} {db} > {file}'.format(
            user=user,pwd=password,port=port,host=host,db=db,file=sql_filename
        )#执行备份数据库命令    mysqldump 最好写成绝对路径(找到mysqldump命令的位置)
        os.system(command)    #os.system()执行操作系统命令
        print('数据备份完成!')
        return sql_filename   #返回备份的数据库文件,恢复时还要用到
    
    def recover_db(sql_filename):
        user, password, host, db, port = get_mysql_info()
        new_db = db+'_'+datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        sql='RENAME database %s TO %s; '%(db,new_db)#给原来的数据库改名
        mysql.execute_one(sql)
        print('数据库改名')
        sql2='create database %s charset utf8;'%db
        mysql.execute_one(sql2)
        print('新的数据库已经创建%s'%db)
        command = 'mysql -u{user} -p{pwd} -P{port} -h{host} {db} < {file}'.format(
            user=user, pwd=password, port=port, host=host, db=db, file=sql_filename
        )   #恢复数据库
        os.system(command)
        print('数据库恢复完成!')
    View Code

    parse_param_file.py

    from conf.setting import DATA_PATH
    import os
    import xlrd
    
    def textFileToList(file,seq=','):   #默认以逗号分隔
        file=os.path.join(DATA_PATH,file)
        print(file)
        case_data = []
        with open(file,encoding='utf-8') as f:
            for line in f:
                new_line = line.strip()   #去掉空格  /n
                if new_line:    #可能存在空行,去掉空行
                    case_data.append(new_line.split(seq))
        return case_data
    
    def excelToList(file):
        file = os.path.join(DATA_PATH, file)
        workbook = xlrd.open_workbook(file)
        sheet = workbook.sheet_by_index(0)
    
        case_data = []
        for i in range(1,sheet.nrows):
            row_data = sheet.row_values(i)[4:8]  
            case_data.append(row_data)
    
        return case_data
    View Code

    tools.py

    import pymysql
    from conf.setting import mysql_info,redis_info
    import redis
    import jsonpath
    import datetime
    import os,yagmail
    from conf import setting
    
    class MySQL:
        def __init__(self):
            self.conn = pymysql.connect(**mysql_info)
            self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
            #初始化的时候就连接数据库
        def execute_many(self,sql):
            self.cur.execute(sql)
            res = self.cur.fetchall()
            if res:
                return res
        def execute_one(self,sql):
            self.cur.execute(sql)
            res = self.cur.fetchone()
            if res:   #如果res不为NONE,再返回
                return res
    
        def __del__(self):
            self.cur.close()
            self.conn.close()
            print('连接已经关闭')
    
    def get_real_value(key,response):
        #从字典里面获取key对应的value
        res = jsonpath.jsonpath(response,'$..%s'%key)
        #$..%s这个是jsonpath这个模块的用法
        if res:
            return res[0]
    
    def get_redis():
        return redis.Redis(**redis_info)
    
    def make_today_dir():
        #创建当天的文件夹,返回绝对路径
        today = str(datetime.date.today())
        abs_path = os.path.join(setting.REPORT_PATH,today)
        #拼成当天的绝对路径
        if os.path.exists(abs_path):
            pass
        else:
            os.mkdir(abs_path)
        return abs_path
    
    def send_mail(content,file_path=None):
        #发邮件,传入邮件正文和附件
        m = yagmail.SMTP(**setting.MAIL_INFO,)
        subject = '接口测试报告_%s'%str(datetime.datetime.today())
        m.send(subject=subject,to=setting.TO,contents=content,attachments=file_path)
    
    
    mysql = MySQL()
    
    r = get_redis()
    View Code

    start.py

    import unittest
    from BeautifulReport import BeautifulReport as bf
    from conf.setting import CASE_PATH
    from core.tools import make_today_dir,send_mail
    from core.op_data import bak_db,recover_db
    import os
    import datetime
    
    content = '''
    各位好!
            本次测试结果:总共运行%s条用例,通过%s条,失败%s条。详细信息见附件。
        '''
    
    def run_case():
        # sql_file = bak_db() #备份数据库函数
        suite = unittest.TestSuite() #建测试集合
        cases = unittest.defaultTestLoader.discover(CASE_PATH,'test*.py')
        #去某个目录下找测试用例
        for case in cases:
            suite.addTest(case)  #循环把每个文件里面的case加入到测试集合里面
        report = bf(suite) #运行测试用例
    
        path = make_today_dir() #创建今天的文件夹,存放报告
        file_name = 'report_%s.html'%datetime.datetime.now().strftime('%H%M%S')#生成新的文件名
        report.report(filename=file_name,description='接口测试',log_path=path)#生成报告
    
        new_content = content%(report.success_count+report.failure_count,report.success_count,report.failure_count)
        abs_path = os.path.join(path,file_name)
        send_mail(new_content,abs_path)
        # recover_db(sql_file)#恢复数据库
    
    #产生报告
    run_case()
    View Code
  • 相关阅读:
    观察者(Observer)模式
    Stragety Pattern(策略模式)
    数据库设计范式深入浅出
    建造者(Builder)模式
    吉杰,以及快乐男声
    言情小说通用情节[转]
    过年的任务
    将一家创业公司三年之内推动上市是1999年的思维方式
    修改系统时间格式?解决now()
    经济类吴晓波的《大败局》,韩德强的《碰撞》。几年前看的了,现在还很有印象。
  • 原文地址:https://www.cnblogs.com/HathawayLee/p/10061173.html
Copyright © 2011-2022 走看看