zoukankan      html  css  js  c++  java
  • python 实现接口自动化1

    目录结构:

     log包,下面的.log文件是自动生成的,只需新增log包即可

    testFile包:放的接口excel用例,这里需使用xls后缀,不要用xlsx,模板见下方,需一致

    util包:公共包,db_util.py是对数据库操作,log.py是输出日志,往控制台及文件输出

    主函数test_request2.py:

    读取excel,循环进行请求,得到结果,填写到excel。

    一般新增接口会去查询数据库是否新增了,所以模板中增加了数据库的几项操作,填入操作序号和sql即可,新增会返回查询结果,其他操作会返回数据库成功操作几条数据

    一般有可能要先请求一个接口获取返回 token,再进行正式请求时带上token,所以模板 中添加了前置请求,若填写完整,就会获取到token,并放在请求头

    (注意我这里接口取token和放在请求头是在代码写死的,不同接口需修改,有的公司可能token直接放在参数里)

    接口excel模板

    log.py
    import  logging
    from  logging.handlers import TimedRotatingFileHandler
    
    class Logger(object):
        def __init__(self):
            # 创建logger对象
            self.logger=logging.getLogger()#logging.getLogger(name)函数中的name是日志记录的用例名,不指定name会返回root对象
            self.logger.setLevel(logging.DEBUG)
            # logging.root.setLevel(logging.NOTSET) #NOTSET会显示所有输出
            self.log_file_name="E:/PyCharmWorkSpace/AutoInterfaceTest/log/testlog.log"
            self.backup_count=5
            self.console_output_level="INFO"
            self.file_output_level="DEBUG"
            self.formatter=logging.Formatter("%(asctime)s-%(name)s-%(levelname)s-%(message)s")
        def get_logger(self):
            #handler对象:日志对象用于输出日志,而Handler对象用于指定日志向哪里输出(文件、终端等等)
            # 常用handler对象:
            # 1.StreamHandler, 用于向标准输入输出流等输出日志  2.FileHandler,用于向文件输出日志
            # 3.NullHandler,什么也不输出 4.RotatingFileHandler,向文件输出日志,如果文件到达指定大小,创建新文件并继续输出日志。
    
            # 控制台日志
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(self.formatter)
            console_handler.setLevel(self.console_output_level)
            self.logger.addHandler(console_handler)  #将控制台日志对象添加到logger
            # 文件日志
            # 每天重新创建一个日志文件,最多保留backup_count份
            file_handler = TimedRotatingFileHandler(filename=self.log_file_name,
                                                    when='D',
                                                    interval=1,
                                                    backupCount=self.backup_count,
                                                    delay=True,
                                                    encoding='utf-8')
            file_handler.setFormatter(self.formatter)
            file_handler.setLevel(self.file_output_level)
            self.logger.addHandler(file_handler)#将文件日志对象添加到logger
            return self.logger
    if __name__ == '__main__':
        log=Logger().get_logger()
        # 打印优先级:critical>error>warning>info>debug
        # 当等级为debug时,全部等级都能够打印出来,等级为info时,除了debug,其他都能够打印
        log.debug("debug信息,最低级别,一般开发人员用来打印一些调试信息")
        log.info("info信息,正常输出信息,一般用来打印一些正常的操作")
        log.warning("warning,一般用来打印警信息,默认等级为warning")
        log.error("error信息,一般用来打印一些错误信息")
        log.critical("critical信息,一般用来打印一些致命的错误信息,等级最高")
    
    logger=Logger().get_logger()


    db_util.py
    import pymysql.cursors
    from util.log import  logger
    class DBUtil():
        def __init__(self):
            self.connect=None
            self.cursor=None
        def get_con(self):
            try:
                self.connect = pymysql.Connect(
                    host='localhost',
                    port=3306,
                    user='root',
                    passwd='123456',
                    db='test',
                    charset='utf8'
                )
    
    
            except Exception as result:
                print(result)
            else:
                # 获取游标
                self.cursor = self.connect.cursor()
    
    
        def other_data(self,operations,sql):
            # 插入数据/修改数据/删除数据
            if (operations==1):
                try:
                    self.cursor.execute(sql)
                except Exception as  e:
                    self.connect.rollback()  # 事务回滚
                    result_str='数据库事务处理失败'+str(e)
                    logger.info('数据库事务处理失败%s' % e)
                    return result_str
                else:
                    result=self.cursor.fetchone()
                    result_str=result.__str__()
                    result_count=self.cursor.rowcount
                    logger.info('数据库查询结果:%s', result)
                    logger.info('数据库成功操作%d条数据'%result_count)
                    return result_str
            elif(operations in (2,3,4)):
                try:
                    self.cursor.execute(sql)
                except Exception as  e:
                    self.connect.rollback()  # 事务回滚
                    result_str='数据库事务处理失败'+str(e)
                    logger.info('数据库事务处理失败%s' % e)
                    return result_str
    
                else:
                    self.connect.commit()  # 事务提交
                    result_count = self.cursor.rowcount
                    result_str = "数据库成功操作"+str(result_count)+"条数据"
                    logger.info(result_str)
                    return result_str
            else:
                logger.info("输入数据库操作不正确")
    
    
    
        def close_database(self):
            # 关闭连接
            self.cursor.close()
            self.connect.close()
    
    
    
    if __name__ == '__main__':
        db=DBUtil()
        db.get_con()
        db.close_database()
     

    主函数test_request2.py

    #coding="utf-8"
    import xlrd
    from xlutils.copy import copy
    import requests
    from util.log import  logger
    from util.db_util import DBUtil
    
    # xlrd:读取Excel文件数据
    # xlwt:写入Excel 数据,缺点是无法复用,写入会全部覆盖,无法追加数据,为了方便用户,写入的话,比较推荐xlutils模块,它可以复制原excel
    # formatting_info=True,保留Excel的原格式,这样xlutils写入后格式不变
    # xlrd模块0.8版本后不支持以xlsx为后缀名文件,所以excel要用xls格式,不能会打不开
    
    if __name__ == '__main__':
    
        db = DBUtil()
        db.get_con()
        dir_path=r'E:PyCharmWorkSpaceAutoInterfaceTest	estFile	est_interface005.xls'
        run_sheet_name="用例"
        summary_sheet_name="总结"
        # 统计成功数,失败数
        all_cases =0
        statistics_success=0
        statistics_fail=0
        # 前置标志
        pre_flag = False
        try:
            # 目录加r可以取消转义,不加r的话改为\即可
            workbook=xlrd.open_workbook(dir_path,formatting_info=True)
    
        except FileNotFoundError:
            print ("File is not found.") #文件不存在
        except PermissionError:
            print ( "You don't have permission to access this file.") #文件存在无权限访问,例如文件被打开时无法写入
        else:
            table = workbook.sheet_by_name(run_sheet_name)  # 根据sheet名字获取sheet
    
            new_workbook = copy(workbook)  # 复制文件,这样将结果写入excel
            writeSheet = new_workbook.get_sheet(run_sheet_name)  # 获取写入用例sheet
    
            writeSheet_summary=new_workbook.get_sheet(summary_sheet_name)# 获取写入总结sheet
    
            for i in range(1, table.nrows):
                request_method = table.cell(i, 2).value
                url = table.cell(i, 3).value
                params = table.cell(i, 4).value
                expected_results=table.cell(i,5).value
                db_operations = table.cell(i, 9).value
                sql = table.cell(i, 10).value
                pre_request_method=table.cell(i, 12).value
                pre_url = table.cell(i, 13).value
                pre_params = table.cell(i, 14).value
                if( request_method!="" and url!="" and  params!="" and expected_results!=""):
                    logger.info(
                        "********************************************************************************************************************")
                    all_cases +=1
    
                    if(pre_request_method!="" and pre_url!=""and pre_params!=""):
                        logger.info("" + str(i) + "个用例前置请求   url:" + pre_url + "    请求参数:" + pre_params)
    
                        if (pre_request_method == "get"):
                            try:
                                res = requests.get(url=pre_url, params=pre_params)
                            except Exception as result:
                                logger.info("" + str(i) + "个用例前置请求异常:" + str(result))
                            else:
                                logger.info("" + str(i) + "个用例前置请求结果:" + res.text)
    
                                res_json = res.json()  # 将返回参数转为json串,取某字段值,result_json[父元素1][子元素2]
                                pre_response_token = res_json["data"]["token"]
                                headers = {'Authorization-Qkids': pre_response_token}
                                pre_flag=True
                        elif (pre_request_method == "post"):
                            try:
                                res = requests.post(url=pre_url, data=pre_params.encode())
                            except Exception as result:
                                logger.info("" + str(i) + "个用例前置请求异常:" + str(result))
                            else:
                                logger.info("" + str(i) + "个用例前置请求结果:" + res.text)
                                res_json = res.json()  # 将返回参数转为json串,取某字段值,result_json[父元素1][子元素2]
                                pre_response_token = res_json["data"]["token"]
                                headers = {'Authorization-Qkids':pre_response_token}
                                pre_flag = True
                                logger.info(pre_response_token)
                        else:
                            logger.info("前置请求方式格式不正确")
                        headers = {'Authorization-Qkids': pre_response_token}
                    elif(pre_request_method=="" and pre_url==""and pre_params==""):
                        # 不做任何操作
                        pass
                    else:
                        logger.info("" + str(i) + "个用例:前置请求方式/前置url/前置请求参数未填")
    
                     # 前置请求就算报错,第二个请求也会有相应错误提示,所以下面这段代码不用放到前置后面,同级即可
                    logger.info("" + str(i) + "个用例请求   url:" + url + "    请求参数:" + params)
                    if (request_method == "get"):
                        try:
                            if(pre_flag==True):
    
                                res = requests.get(url=url, params=params,headers=headers)
                            else:
                                res = requests.get(url=url, params=params)
                        except Exception as result:
                            statistics_fail += 1
                            logger.info("" + str(i) + "个用例异常:" + str(result))
                            writeSheet.write(i, 8, "N")
                        else:
    
                            logger.info("" + str(i) + "个用例结果:" + res.text)
                            writeSheet.write(i, 6, res.text)  # 写入整个返回结果
                            res_json = res.json()  # 将返回参数转为json串,取某字段值,result_json[父元素1][子元素2]
                            response_message = res_json["message"]
                            writeSheet.write(i, 7, response_message)  # 写入返回的message
                            if (expected_results == response_message):
                                logger.info("结果比对:Y")
                                writeSheet.write(i, 8, "Y")
                                statistics_success += 1
                            else:
                                logger.info("结果比对:N")
                                writeSheet.write(i, 8, "N")
                                statistics_fail += 1
    
    
                    elif (request_method == "post"):
                        try:
                            if(pre_flag==True):
                                # 假如请求body里面有汉字,需对data进行encode(),仅用于post请求
                                res = requests.post(url=url, data=params.encode(),headers=headers)
                            else:
                                res = requests.post(url=url, data=params.encode())
    
                        except Exception as result:
                            statistics_fail += 1
                            logger.info("" + str(i) + "个用例异常:" + str(result))
                            writeSheet.write(i, 8, "N")
                        else:
    
                            logger.info("" + str(i) + "个用例结果:" + res.text)
                            writeSheet.write(i, 6, res.text)  # 写入
                            res_json = res.json()  # 将返回参数转为json串,取某字段值方式:result_json[父元素1][子元素2],例如res_json[data][name]
                            response_message = res_json["message"]
                            writeSheet.write(i, 7, response_message)
                            if (expected_results == response_message):  # 比对预期结果与返回结果
                                logger.info("结果比对:Y")
                                writeSheet.write(i, 8, "Y")
                                statistics_success += 1
                            else:
                                logger.info("结果比对:N")
                                writeSheet.write(i, 8, "N")
                                statistics_fail += 1
    
                    else:
                        logger.info("请求方式格式不正确")
    
                    if (db_operations != "" and sql != ""):
                        if (db_operations in (1, 2, 3, 4)):
                            db_result = db.other_data(db_operations, sql)
                            writeSheet.write(i, 11, db_result)
                        else:
                            logger.info("数据库操作填写不符合规则")
                    elif (db_operations == "" and sql == ""):
                        pass
                    else:
                        logger.info("数据库操作/数据库sql未填")
    
                elif (request_method == "" and url == "" and params == "" and expected_results == ""):
                    # 不做任何操作
                    pass
                else:
                    logger.info("" + str(i) + "个用例:请求方式/url/请求参数/期望结果未填")
    
    
        # %转义方式:%%,其他使用
    
        logger.info(
            "---------------------------------------------------------------------------------------------------------------------------------------------")
    
        summary_str="总的请求用例数%d, 已通过%d,不通过%d, 通过比例%.2f%%"%(all_cases,statistics_success,statistics_fail,(statistics_success/all_cases)*100)
        logger.info(summary_str)
        logger.info(
            "---------------------------------------------------------------------------------------------------------------------------------------------")
    
        writeSheet_summary.write(0,1,summary_str)
        new_workbook.save(dir_path)  #最后将写的保存
        db.close_database()
    
  • 相关阅读:
    java_八大数据类型
    java_实现Hello World
    Linux-ls命令
    Liunx下安装MySql
    Liunx-tail命令
    Liunx-history命令
    Linux-mkdir命令
    Linux-cp命令
    Linux-mv命令
    PBFT_拜占庭容错算法
  • 原文地址:https://www.cnblogs.com/yangjr/p/12932370.html
Copyright © 2011-2022 走看看