zoukankan      html  css  js  c++  java
  • 学习pytest+allure接口自动化记录

    这周末木得事,就整理下自己学习的pytest+allure代码来分享啦。(哈哈直接上代码)

    目录 

    api#存放每个接口的测试方法
    case#存放每个接口的测试用例
    common#存放一些公共方法
    data#存放测试数据
    logs#存放测试日志
    report#存放测试报告
    conftest.py#存放测试用例的一些fixture配置
    pytest.ini#pytest的主配置文件
    getpathinfo# 获取当前路径
    requirements.txt #存放依赖包

    学习框架

    pytest单元测试框架+allure生成测试报告

    结构设计

    1.每一个用例组合在一个测试类里面生成一个py文件

    2.将每个用例调用方法封装在一个测试类里面生成一个py文件

    3.将测试数据存放在yml文件中通过parametrize进行参数化

    4.通过allure生成测试报告

    学习内容

    1.pytes单元测试框架

    2.allure生成测试报告

    3.yml文件存放测试数据通过parametrize进行参数化

    4.sign生成签名传参

    5.加密传参,解密校验

    练习代码

     getpathinfo.py #获取当前路径

    import os
    
    def get_path():
        # 获取当前路径
        curpath = os.path.dirname(os.path.realpath(__file__))
        return curpath
    
    if __name__ == '__main__':# 执行该文件,测试下是否OK
        print('测试路径是否OK,路径为:', get_path())
    getpathinfo.py

     conftest.py #存放测试用例的一些fixture配置

    import os
    
    import pytest
    import requests
    from api.get_token import Get_Token
    
    @pytest.fixture(scope="session")
    def gettokenfixture():
        '''先登录'''
        s = requests.session()
        shili = Get_Token(s)
        shili.get_token()
        if not s.headers.get("Authorization", ""):#没有get到token,跳出用例
            pytest.skip("跳过用用例")
        yield s
        s.close()
    
    def pytest_addoption(parser):
        parser.addoption(
            "--cmdhost", action="store", default="http://*********",
            help="my option: type1 or type2"
        )
    @pytest.fixture(scope="session",autouse=True)
    def host(request):
        '''获取命令行参数'''
        #获取命令行参数给到环境变量
        #通过pytest --cmdhost 运行指定环境
        os.environ["host"] = request.config.getoption("--cmdhost")
        print("当前用例运行测试环境:%s" % os.environ["host"])
    conftest.py

     pytest.ini #存放pytest配置文件

    [pytest]
    
    markers =
        login: Run mark login case
    
    addopts =  -s -p no:warnings
    testpaths = ./case
    python_files = test_*.py
    python_classes = Test*
    python_functions = test_*
    #addopts = -v --reruns 1 --html=./report/report.html --self-contained-html
    #addopts = -v --reruns 1 --alluredir ./report/allure_raw
    #addopts = -v -s -p no:warnings --reruns 1 --pytest_report ./report/Pytest_Report.html
    pytest.ini

    requirements.txt  #存放导入依赖包(我这包有点多,流汗。。pip install -r requirements.txt安装依赖包)

    absl-py==0.9.0
    adbutils==0.3.4
    allure-pytest==2.8.6
    allure-python-commons==2.8.6
    amqp==1.4.9
    ansi2html==1.4.2
    anyjson==0.3.3
    apipkg==1.5
    appdirs==1.4.3
    Appium-Python-Client==0.26
    asn1crypto==0.24.0
    astor==0.8.1
    atomicwrites==1.1.5
    attrs==18.1.0
    Babel==2.7.0
    backports.csv==1.0.7
    bcrypt==3.1.7
    BeautifulReport==0.1.2
    beautifulsoup4==4.6.1
    billiard==3.3.0.23
    bs4==0.0.1
    cachetools==4.1.0
    celery==3.1.26.post2
    certifi==2018.4.16
    cffi==1.11.5
    cfgv==3.1.0
    chardet==3.0.4
    click==6.7
    cma==2.7.0
    colorama==0.3.9
    colorlog==4.0.2
    configparser==5.0.0
    cryptography==2.6.1
    cryptokit==0.0.7
    cssselect==1.0.3
    cycler==0.10.0
    ddt==1.1.2
    decorator==4.4.0
    defusedxml==0.5.0
    demjson==2.2.4
    deprecation==2.0.6
    diff-match-patch==20181111
    distlib==0.3.0
    Django==2.0.3
    django-celery==3.2.2
    django-cors-headers==3.1.1
    django-crispy-forms==1.8.0
    django-filter==2.2.0
    django-formtools==2.1
    django-import-export==1.2.0
    django-ranged-response==0.2.0
    django-reversion==2.0.0
    django-simple-captcha==0.5.10
    django-stdimage==4.0.1
    djangorestframework==3.10.3
    docopt==0.6.2
    dwebsocket==0.4.2
    enum34==1.1.6
    et-xmlfile==1.0.1
    eventlet==0.22.1
    execnet==1.5.0
    facebook-wda==0.3.4
    fake-useragent==0.1.11
    filelock==3.0.12
    fire==0.1.3
    flake8==3.8.1
    Flask==1.0.2
    flower==0.9.2
    funcsigs==1.0.2
    future==0.15.2
    futures==3.1.1
    gast==0.3.3
    gevent==1.3.6
    google-auth==1.15.0
    google-auth-oauthlib==0.4.1
    graphviz==0.14
    greenlet==0.4.15
    grpcio==1.29.0
    har2case==0.3.1
    httpie==1.0.3
    httplib2==0.9.2
    HttpRunner==1.5.8
    humanize==0.5.1
    identify==1.4.15
    idna==2.6
    importlib-metadata==1.2.0
    itsdangerous==0.24
    jdcal==1.4
    Jinja2==2.10
    joblib==0.15.1
    kiwisolver==1.2.0
    kombu==3.0.37
    locustio==0.11.0
    logzero==1.5.0
    lxml==4.2.5
    Markdown==3.1.1
    MarkupSafe==1.0
    matplotlib==3.2.1
    mccabe==0.6.1
    more-itertools==4.2.0
    msgpack==0.5.6
    namedlist==1.7
    nltk==3.5
    nodeenv==1.3.5
    numpy==1.18.4
    oauthlib==3.1.0
    objgraph==3.4.1
    odfpy==1.4.0
    opencv-python==4.2.0.34
    openpyxl==2.5.9
    packaging==19.0
    paddlehub==1.6.0
    paddlepaddle==1.8.1
    pandas==1.0.3
    parameterized==0.7.1
    paramiko==2.4.1
    ParamUnittest==0.2
    parse==1.11.1
    pathlib==1.0.1
    Pillow==5.2.0
    pluggy==0.13.1
    pre-commit==2.4.0
    prettytable==0.7.2
    progress==1.5
    progressbar2==3.39.3
    protobuf==3.12.0
    py==1.5.4
    pyasn1==0.4.7
    pyasn1-modules==0.2.8
    pycodestyle==2.6.0
    pycparser==2.18
    pycryptodome==3.8.2
    pyee==5.0.0
    pyflakes==2.2.0
    Pygments==2.4.2
    PyMySQL==0.9.3
    PyNaCl==1.3.0
    pyOpenSSL==17.5.0
    pyparsing==2.4.0
    pyppeteer==0.0.25
    pyquery==1.4.0
    pytest==4.5.0
    pytest-forked==0.2
    pytest-html==1.19.0
    pytest-metadata==1.7.0
    pytest-repeat==0.7.0
    pytest-rerunfailures==8.0
    pytest-xdist==1.23.2
    PyTestReport==0.2.1
    python-dateutil==2.8.1
    python-utils==2.3.0
    pytz==2018.5
    pywin32==223
    PyYAML==3.12
    pyzmq==17.1.2
    rarfile==3.1
    regex==2020.5.14
    requests==2.22.0
    requests-html==0.10.0
    requests-oauthlib==1.3.0
    requests-toolbelt==0.8.0
    retry==0.9.2
    rsa==4.0
    scipy==1.3.1
    selenium==2.53.6
    sentencepiece==0.1.90
    six==1.10.0
    tablib==0.13.0
    tb-paddle==0.4.0
    tensorboard==2.2.1
    tensorboard-plugin-wit==1.6.0.post3
    toml==0.10.1
    tomorrow==0.2.4
    tornado==6.0.3
    tqdm==4.31.1
    uiautomator2==0.3.3
    urllib3==1.22
    virtualenv==20.0.20
    w3lib==1.20.0
    wcwidth==0.1.7
    websocket-client==0.57.0
    websockets==7.0
    weditor==0.2.3
    Werkzeug==1.0.1
    whichcraft==0.6.0
    xlrd==1.1.0
    xlwt==1.3.0
    yapf==0.26.0
    zipp==3.1.0
    requirements.txt

    common/connect_mysql.py #连接操作数据库

    import pymysql
    
    dbinfo = {
        "host":"******",
        "user":"root",
        "password":"123456",
        "port":3309
    }
    class DbConnect():
        def __init__(self,db_conf,database=""):
            self.db_conf = db_conf
            #打开数据库
            self.db = pymysql.connect(database = database,
                                      cursorclass = pymysql.cursors.DictCursor,
                                      **db_conf)
            #使用cursor()方式获取操作游标
            self.cursor = self.db.cursor()
    
        def select(self,sql):
            #sql查询
            self.cursor.execute(sql)#执行sql
            results = self.cursor.fetchall()
            return results
    
        def execute(self,sql):
            #sql 删除 提示 修改
            try:
                self.cursor.execute(sql)#执行sql
                self.db.commit()#提交修改
            except:
                #发生错误时回滚
                self.db.rollback()
    
        def close(self):
            self.db.close()#关闭连接
    
    def select_sql(select_sql):
        '''查询数据库'''
        db = DbConnect(dbinfo,database='apps')
        result = db.select(select_sql)
        db.close()
        return result
    
    def execute_sql(sql):
        '''执行SQL'''
        db = DbConnect(dbinfo,database='apps')
        db.execute(sql)
        db.close()
    
    if __name__ == '__main__':
        sql = 'DELETE from auth_user WHERE username = "qiushui"'
        delete = execute_sql(sql)
        print(delete)
    connect_mysql.py

    common/read_yaml.py #读取yml文件数据

    import os
    import yaml
    import getpathinfo
    class ReadYaml():
        def __init__(self,filename):
            path = getpathinfo.get_path()#获取本地路径
            self.filepath = os.path.join(path,'data')+"/"+filename#拼接定位到data文件夹
    
        def get_yaml_data(self):
            with open(self.filepath, "r", encoding="utf-8")as f:
                # 调用load方法加载文件流
                return yaml.load(f)
    
    if __name__ == '__main__':
        data = ReadYaml("regiter_one.yml").get_yaml_data()
        print(data)
    read_yaml.py

    cmmon/logger.py # 生成日志输出

    import logging,time
    import os
    import getpathinfo
    
    path = getpathinfo.get_path()#获取本地路径
    log_path = os.path.join(path,'logs')# log_path是存放日志的路径
    # 如果不存在这个logs文件夹,就自动创建一个
    if not os.path.exists(log_path):os.mkdir(log_path)
    
    class Log():
        def __init__(self):
            #文件的命名
            self.logname = os.path.join(log_path,'%s.log'%time.strftime('%Y_%m_%d'))
            self.logger = logging.getLogger()
            self.logger.setLevel(logging.DEBUG)
            #日志输出格式
            self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    
        def __console(self,level,message):
            #创建一个fileHander,用于写入本地
            fh = logging.FileHandler(self.logname,'a',encoding='utf-8')
            fh.setLevel(logging.DEBUG)
            fh.setFormatter(self.formatter)
            self.logger.addHandler(fh)
    
            #创建一个StreamHandler,用于输入到控制台
            ch = logging.StreamHandler()
            ch.setLevel(logging.DEBUG)
            ch.setFormatter(self.formatter)
            self.logger.addHandler(ch)
    
            if level == 'info':
                self.logger.info(message)
            elif level == 'debug':
                self.logger.debug(message)
            elif level == 'warning':
                self.logger.warning(message)
            elif level == 'error':
                self.logger.error(message)
            #避免日志重复
            self.logger.removeHandler(fh)
            self.logger.removeHandler(ch)
            #关闭打开文件
            fh.close()
    
        def debug(self,message):
            self.__console('debug',message)
    
        def info(self,message):
            self.__console('info',message)
    
        def warning(self,message):
            self.__console('warning',message)
    
        def error(self,message):
            self.__console('error',message)
    
    if __name__ == '__main__':
        log = Log()
        log.info('测试')
        log.debug('测试')
        log.warning('测试')
        log.error('测试')
    logger.py

    common/sign.py #生成sign签名

    import hashlib
    
    def sign_body(body, apikey="12345678"):
        '''请求body sign签名'''
        # 列表生成式,生成key = value格式
        a = ["".join(i) for i in body.items() if i[1] and i[0] != "sign"]
        # 参数名ASCII码从小到大排序
        strA = "".join(sorted(a))
        # 在strA后面拼接上apiKey得到strsigntemp字符串
        strsigntemp = strA + apikey
        # 将strsigntemp字符转换为小写字符串进行MD5运算
        # MD5加密
        def jiamimd5(src):
            m = hashlib.md5()
            m.update(src.encode('utf-8'))
            return m.hexdigest()
        sign = jiamimd5(strsigntemp.lower())
        return sign
    sign.py

    cmmon/jiajiemi.py #加密、解密方法

    import json
    from cryptography.hazmat.primitives import padding
    from cryptography.hazmat.primitives.ciphers import algorithms
    from Crypto.Cipher import AES
    from binascii import b2a_hex, a2b_hex
    
    class PrpCrypt(object):
    
        def __init__(self, key='0000000000000000'):
            self.key = key.encode('utf-8')
            self.mode = AES.MODE_CBC
            self.iv = b'0102030405060708'
            # block_size 128位
    
        # 加密函数,如果text不足16位就用空格补足为16位,
        # 如果大于16但是不是16的倍数,那就补足为16的倍数。
        def encrypt(self, text):
            cryptor = AES.new(self.key, self.mode, self.iv)
            text = text.encode('utf-8')
            # 这里密钥key 长度必须为16(AES-128),24(AES-192),或者32 (AES-256)Bytes 长度
            # 目前AES-128 足够目前使用
            text=self.pkcs7_padding(text)
            self.ciphertext = cryptor.encrypt(text)
            # 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题
            # 所以这里统一把加密后的字符串转化为16进制字符串
            return b2a_hex(self.ciphertext).decode().upper()
    
        @staticmethod
        def pkcs7_padding(data):
            if not isinstance(data, bytes):
                data = data.encode()
            padder = padding.PKCS7(algorithms.AES.block_size).padder()
            padded_data = padder.update(data) + padder.finalize()
            return padded_data
    
        @staticmethod
        def pkcs7_unpadding(padded_data):
            unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
            data = unpadder.update(padded_data)
            try:
                uppadded_data = data + unpadder.finalize()
            except ValueError:
                raise Exception('无效的加密信息!')
            else:
                return uppadded_data
    
        # 解密后,去掉补足的空格用strip() 去掉
        def decrypt(self, text):
            #  偏移量'iv'
            cryptor = AES.new(self.key, self.mode, self.iv)
            plain_text = cryptor.decrypt(a2b_hex(text))
            # return plain_text.rstrip('')
            return bytes.decode(plain_text).rstrip("x01").
                rstrip("x02").rstrip("x03").rstrip("x04").rstrip("x05").
                rstrip("x06").rstrip("x07").rstrip("x08").rstrip("x09").
                rstrip("x0a").rstrip("x0b").rstrip("x0c").rstrip("x0d").
                rstrip("x0e").rstrip("x0f").rstrip("x10")
    
        def dict_json(self, d):
            '''python字典转json字符串, 去掉一些空格'''
            j = json.dumps(d).replace('": ', '":').replace(', "', ',"').replace(", {", ",{")
            return j
    
    if __name__ == '__main__':
        pc = PrpCrypt('12345678')
        a = "1"
        print("加密前%s"%a)
        b = pc.encrypt(a)
        print("加密后%s"%b.lower())
    jiajiemi.py

     data/regiter_data.yml #存放test_regiter.py测试数据(都一样,在此只展示一个了)

    test_regiter_data:
      - ["qiushui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["秋水","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["qiu秋","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["qiushui_@","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["qiushui123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["qiushui@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["秋qiu@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["qiu shui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
      - ["qiu shuiq神神道道所多多","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
    
    test_regiter_repeat:
      - ["秋水君","123456","hanxi@163.com", {'msg': '秋水君用户已被注册', 'code': 0}]
    regiter_data.yml

     api/login_method.py #测试方法case/login/test_login.py#测试用例(方式:post 类型:application/json)

    '''
    Code description:登录方法
    Create time:
    Developer:
    '''
    import requests
    
    class Login(object):
        def __init__(self,s:requests.session):
            self.s = s
    
        def login(self,username="test",password="123456"):
            url = "http://*********"
            boby = {
                "username":username,
                "password":password
            }
            return self.s.post(url,json = boby)
    login_method.py
    import allure
    import requests
    import pytest
    from api.login_method import Login
    from common.logger import Log
    from common.read_yaml import ReadYaml
    testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据
    
    @allure.feature('登录测试用例接口')#测试报告显示测试功能
    class Test_login():
        '''测试登录接口'''
        log = Log()
        @pytest.mark.parametrize("username,password,expect",testdata["test_login_data"],
                                 ids = ["正常登录",
                                       "密码为空登录",
                                       "账号为空登录",
                                       "账号错误登录",
                                       "密码错误登录",
                                       "账号存在空格登录",
                                       "密码存在空格登录",
                                       "账号存在特殊符号登录",
                                       "密码存在特殊符号登录",
                                       "账号不完整登录",
                                       "密码不完整登录"])#参数化测试用例
        @allure.step('账号,密码登录')#测试报告显示步骤
        @allure.link('http://**********:6009/api/v1/login',name='测试接口')#测试报告显示链接
        def test_login(self,username,password,expect):
            s = requests.session()#定义session会话
            self.log.info('------用户登录接口-----')
            shili = Login(s)#实例化
            msg = shili.login(username,password)
            self.log.info('获取请求结果:%s'%msg.json())
            #print(msg.json())
            #断言
            assert msg.json()["msg"] == expect['msg']
            assert msg.json()["code"] == expect['code']
    test_login.py

     api/login_xadmin.py#测试方法case/login/test_login_xadmin.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参)

    '''
    Code description:登录方法
    Create time:
    Developer:
    '''
    import os
    
    import requests
    import re
    
    class Login_Admin(object):
    
        def __init__(self,s:requests.session):
            self.s = s
    
        def login(self,username="admin",password="123456",this_is_the_login_form = "1",
                  next = "/xadmin/"):
            '''xadmin登录'''
            url = os.environ["host"]+"/xadmin/"#读取conftest.py文件地址进行拼接
            r1 = self.s.get(url)
            tokens = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
            #print(tokens[0])#获取隐藏参数csrfmiddlewaretoken
            body = {
                "csrfmiddlewaretoken": tokens[0],
                "username": username,
                "password": password,
                "this_is_the_login_form": this_is_the_login_form,
                "next": next
            }
            r2 = self.s.post(url,data = body)
            return r2
            #print(r2.text)
            #assert "主页面 | 后台页面" in r2.text
    
    if __name__ == '__main__':
        s = requests.session()
        Login_Admin(s).login()
    login_xadmin.py
    import allure
    import requests
    import pytest
    from api.login_xadmin import Login_Admin
    from common.read_yaml import ReadYaml
    from common.logger import Log
    testdata = ReadYaml("login_xadmin.yml").get_yaml_data()#读取数据
    @allure.feature('登录接口')#测试报告显示测试功能
    class Test_Login_Xadmin():
        '''xadmin登录'''
        log = Log()
        @pytest.mark.parametrize("username,password,this_is_the_login_form,next,expect",
                                 testdata["test_login_data"],
                                 ids=["正常登录",
                                      "用户名为空登录",
                                      "密码为空登录",
                                      "错误用户名登录",
                                      "错误密码登录",
                                      "错误参数this_is_the_login_form登录",
                                      "错误参数next登录",
                                      "参数next为空登录"
                                      ])#参数化测试用例
        @allure.step("登录界面输入账号,密码登录")#测试报告显示测试步骤
        @allure.link('http://******/xadmin/',name='测试接口')#测试报告链接
        def test_login_xadmin(self,username,password,
                              this_is_the_login_form,next,expect):
            s = requests.session()
            self.log.info('------用户登录接口-----')
            shili = Login_Admin(s)  # 实例化
            msg = shili.login(username, password,this_is_the_login_form,next)
            #print(msg.text)
            self.log.info('获取请求结果:%s' % msg.text)
            assert expect in msg.text#断言验证是否通过
    test_login_xadmin.py

    api/get_token.py #获取登录token,方便调用

    '''
    Code description:获取token testcase
    Create time:
    Developer:
    '''
    import requests
    
    class Get_Token(object):
        def __init__(self,s:requests.session):
            self.s = s
    
        def get_token(self):
            url = "http://******/api/v1/login"
            boby = {
                "username":"hanxi",
                "password":"123456"
            }
            r = self.s.post(url,json = boby)
            #print(r.json())
            #获取token
            token = r.json()["token"]
            header = {
                "Authorization": "Token %s" % token
            }
            self.s.headers.update(header)#更新token到session
            return token
    
    if __name__ == '__main__':
        s = requests.session()
        a = Get_Token(s)
        a.get_token()
    get_token.py

    api/update_info_method.py#测试方法case/updateinfo/test_updateinfo.py#测试用例(方式:post类型:application/json 学习fixture前置登录)

    import requests
    from api.get_token import Get_Token
    class Update_Info():
        '''更新用户信息'''
        def __init__(self,s:requests.sessions):
            self.s = s
    
        def update_info(self,name = "hanxi",mail = "222@163.com",sex = "M",age = 23):
            url = "http://********/api/v1/userinfo"
            body = {
                "name":name,
                "mail":mail,
                "sex":sex,
                "age":age
            }
            return self.s.post(url,json = body)
            #print(r.json())
            #return r.json()
    
    if __name__ == '__main__':
        s = requests.session()
        Get_Token(s).get_token()
        a = Update_Info(s)
        infos = a.update_info(name="hanxi", mail="xxx@qq.com")
        print(infos.text)
    update_info_method.py
    import allure
    import pytest
    from common.logger import Log
    from common.read_yaml import ReadYaml
    from api.update_info_method import Update_Info
    testdata = ReadYaml('update_info.yml').get_yaml_data()#读取数据
    
    @allure.feature('更新用户信息接口')#测试报告显示测试功能
    class Test_UpdateInfo():
        log = Log()
        '''更新用户信息'''
        @pytest.mark.parametrize("name,mail,sex,age,expect",testdata["test_update_data"],
                                 ids=["正常修改",
                                      "修改其他用户名",
                                      "错误xxx163.com邮箱修改",
                                      "错误邮箱xxx@163com修改",
                                      "错误xxx@163.邮箱修改",
                                      "错误邮箱@163.com修改",
                                      "错误hanxi@.com邮箱修改",
                                      "错误han xi@163.com邮箱修改",
                                      "错误邮箱修改hanxi@1 63.com",
                                      "性别为空修改",
                                      "性别F修改",
                                      "性别错误参数X修改",
                                      "性别输出汉字男修改",
                                      "年龄为空修改",
                                      "年龄存在空格修改2 3",
                                      "年龄存在特殊字符修改23_",
                                      "年龄汉字修改",
                                      "年龄数字加字母修改"
                                      ])#参数化测试用例
        @allure.step('获取登录token,可修改用户名、邮箱、性别、年龄')#测试报告显示操作步骤
        @allure.link('http://*********/api/v1/userinfo',name='测试接口')#测试报告显示链接
        def test_updateinfo(self,gettokenfixture,name,mail,sex,age,expect):
            s = gettokenfixture#登录获取token
            self.log.info('------修改用户信息接口-----')
            a = Update_Info(s)#实例化
            msg = a.update_info(name,mail,sex,age)
            #print(msg.json())
            self.log.info('获取请求结果:%s' % msg.json())
            #断言
            assert msg.json()["message"] == expect['message']
            assert msg.json()["code"] == expect['code']
    test_updateinfo.py

    api/regiter_method.py #测试方法 case/regiter/test/regiter.py#测试用例(方式:post 类型:application/json 学习fixture前置连接数据库操作)

    import requests
    
    
    class Regiter():
        def __init__(self,s:requests.session()):
            self.s = s
        '''注册'''
        def regiter_user(self,username,password,mail):
            url = "http://*******:6009/api/v1/register"
            body ={
                "username": username,
                "password": password,
                "mail": mail
            }
            return self.s.post(url,json = body)
    regiter_method.py
    import allure
    import requests
    import pytest
    from common.logger import Log
    from api.regiter_method import Regiter
    from common.connect_mysql import execute_sql
    from common.read_yaml import ReadYaml
    testdata = ReadYaml("regiter_data.yml").get_yaml_data()#读取测试数据
    
    @pytest.fixture(scope="function")#设置前置清除操作
    def delete_data():
        '''执行sql,删除之前注册信息'''
        sql = "DELETE from apps.auth_user WHERE username like 'qiu'"
        execute_sql(sql)
        yield
    
    @allure.feature('注册用户接口')#测试报告显示测试功能
    class TestRegiter():
        '''注册'''
        log = Log()
        @allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤
        @allure.link('http://*******/api/v1/register',name='测试接口')#测试报告显示测试链接
        @pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_data"],
                                 ids =["正常注册",
                                      "用户名汉字注册",
                                      "用户名汉字加英文注册",
                                      "用户名英文加特殊符号注册",
                                      "用户名英文加数字注册",
                                      "用户名英文特殊符号加数字注册",
                                      "用户名汉字英文字符数字注册",
                                      "用户名存在空格注册",
                                      "用户名字符特长注册"])#参数化测试用例
        def test_regirer(self,delete_data,username,password,mail,expect):
            '''注册'''
            s = requests.session()#定义session
            self.log.info('------用户注册接口-----')
            shili = Regiter(s)
            msg = shili.regiter_user(username,password,mail)
            self.log.info('获取请求结果:%s' % msg.json())
            #print(msg.json()['msg'])
            assert msg.json()['msg'] == expect['msg']
    
        @allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤
        @allure.link('http://********/api/v1/register', name='测试接口')#测试报告显示测试链接
        @pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_repeat"],
                                 ids=['重复注册'])
        def test_repeat_regirer(self,username,password,mail,expect):
            '''重复注册'''
            s = requests.session()#定义session
            self.log.info('------用户注册接口-----')
            shili = Regiter(s)
            msg = shili.regiter_user(username,password,mail)
            #print(msg.json())
            self.log.info('获取请求结果:%s' % msg.json())
            assert msg.json()['msg'] == expect['msg']#断言响应信息
    test_regiter.py

    api/add_teacher.py#测试方法 case/test_add_teacher.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参,fixture前置连接数据库操作,获取页面元素断言,数据库断言)

    '''
    Code description:显示老师
    Create time:
    Developer:
    '''
    import os
    
    import requests
    import re
    from requests_toolbelt import MultipartEncoder
    from api.login_xadmin import Login_Admin
    
    class Add_Teacher():
    
        def __init__(self,s:requests.session):
            self.s = s
    
        def add_teacher(self,teacher_name="test",tel="122222222",mail="1111@qq.com",sex = "M"):
            '''添加老师'''
            url = os.environ["host"]+"/xadmin/hello/teacherman/add/"#显示老师页面
            r1 = self.s.get(url)
            #print(r.text)#正则获取隐藏元素
            csrfmiddlewaretoken = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
            #print(csrfmiddlewaretoken)
    
            body = MultipartEncoder(fields=[
                ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
                ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
                ("teacher_name", teacher_name),
                ("tel", tel),
                ("mail", mail),
                ("sex", sex),
                ("_save", "")
            ])
            r2 = self.s.post(url,data = body,headers={"content-Type": body.content_type})
            return r2
            #print(r2.text)
    if __name__ == '__main__':
        s = requests.session()
        Login_Admin(s).login()
        Add_Teacher(s).add_teacher()
    add_teacher.py
    import allure
    import requests
    import pytest
    from lxml import etree
    from common.logger import Log
    from api.add_teacher import Add_Teacher
    from api.login_xadmin import Login_Admin
    from common.read_yaml import ReadYaml
    from common.connect_mysql import execute_sql,select_sql
    testdata = ReadYaml("add_teacher_info.yml").get_yaml_data()
    
    @pytest.fixture(scope="function")#设置前置清除操作
    def delete_newteacher():
        sql = "DELETE FROM djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'"
        execute_sql(sql)
        yield
        #print("数据清理操作")
    @allure.feature('显示教师')#测试报告显示测试功能
    class Test_Add_Teacher():
        '''添加教师'''
        log = Log()
        @pytest.mark.parametrize("teacher_name,tel,mail,sex",testdata["test_addteacher_data"],
                                 ids=["正常显示"])
        @allure.title("显示new teacher")#测试报告显示标题
        @allure.step('先登录,输入显示教师信息,进行显示教师')#测试报告显示步骤
        @allure.link('http://********/xadmin/hello/teacherman/add/',name='测试接口')#测试报告显示链接
        def test_add_teacher(self,delete_newteacher,teacher_name,tel,mail,sex):
            s = requests.session()
            Login_Admin(s).login()#登录
            self.log.info('------显示教师接口-----')
            shili = Add_Teacher(s)#实例化添加教师
            msg = shili.add_teacher(teacher_name,tel,mail,sex)
            demo = etree.HTML(msg.text)
            nodes = demo.xpath('//*[@id="changelist-form"]/div[1]/table/tbody/tr[1]/td[2]/a')
            #print(nodes[0])
            get_result = nodes[0].text#获取元素属性
            #print(get_result)
            self.log.info('获取请求结果:%s' % get_result)
            assert get_result == teacher_name #页面存在该字段验证通过
            sql = "SELECT count(*) as sum from djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'"
            result = select_sql(sql)[0]["sum"]
            assert result == 1 #查找数据库数量为1验证通过
    test_add_teacher.py

     api/upload_picture_file.py#测试方法 case/upload_picturefile/test_upload.py#测试用例(方式:post 类型:multipart/form-data 学习隐藏参数正则提取传参,上传文件,图片,fixture前置连接数据库操作,获取页面元素断言,数据库断言)

    import os
    import re
    import requests
    from requests_toolbelt import MultipartEncoder
    import getpathinfo
    from api.login_xadmin import Login_Admin
    
    
    class Upload_picture_file():
    
        def __init__(self,s:requests.session()):
            self.s = s
            path = getpathinfo.get_path()  # 获取本地路径
            self.filepath = os.path.join(path, 'data') + "/"  # 拼接定位到上传图片,文件
    
        def upload_picture_file(self):
            '''上传图片文件'''
            url = os.environ["host"]+"/xadmin/hello/fileimage/add/"
            r1 = self.s.get(url)
            #print(r1.text)
            csrfmiddlewaretoken = re.findall(" name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
            #print(csrfmiddlewaretoken)#获取隐藏元素
            body = MultipartEncoder(fields=[
                ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
                ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
                ("title", "上传图片文件测试"),
                ("image", ("2.png", open(self.filepath+"2.png", "rb"), "image/png")),
                ("fiels", ("测试.txt", open(self.filepath+"测试.txt", "rb"),"text/plain")),
                ("_save", "")
            ])
            r2 = self.s.post(url,data = body,headers = {"content-Type":body.content_type})
            #print(r2.text)
            return r2
    if __name__ == '__main__':
        s = requests.session()
        Login_Admin(s).login()
        Upload_picture_file(s).upload_picture_file()
    upload_picture_file.py
    import allure
    import requests
    from lxml import etree
    
    from api.upload_picture_file import Upload_picture_file
    import pytest
    from api.login_xadmin import Login_Admin
    from common.connect_mysql import execute_sql,select_sql
    from common.logger import Log
    
    @pytest.fixture(scope="function")#设置前置删除操作
    def delete_file():
        sql = "DELETE FROM djangoweb.hello_fileimage WHERE title = '上传图片文件测试'"
        execute_sql(sql)
        yield
        # print("数据清理操作")
    @allure.feature('上传文件图片接口操作')#测试报告显示测试功能
    class Test_Upload_Picture_file():
        log = Log()
        @allure.title('上传文件图片')#测试报告显示测试标题
        @allure.step('先登录,再上传')#测试报告显示测试步骤
        @allure.link('http://*********/xadmin/hello/fileimage/add/',name='测试接口')#测试报告显示测试链接
        def test_upload_picture_file(self,delete_file):
            s = requests.session()
            Login_Admin(s).login()#登录
            self.log.info('------文件图片上传接口-----')
            shili = Upload_picture_file(s)#实例化上传
            msg = shili.upload_picture_file()
            #print(msg.text)
            demo = etree.HTML(msg.text)
            nodes = demo.xpath('//*[@id="changelist-form"]/div[1]/table/tbody/tr[1]/td[2]/a')
            get_result = nodes[0].text  # 获取元素属性
            #print(get_result)
            self.log.info('获取响应数据:%s' %get_result)
            assert get_result == '上传图片文件测试'#页面存在该字段验证通过
            sql = "SELECT count(*) as sum from djangoweb.hello_fileimage WHERE title = '上传图片文件测试'"
            result = select_sql(sql)[0]["sum"]
            assert result == 1#查找数据库数量为1验证通过
    test_upload.py

    api/api_sign.py#测试方法 case/api_sign/test_sign.py#测试用例(方式:post 类型:application/json  学习sign签名传参)

    import requests
    from common.sign import sign_body
    
    class Api_Sign(object):
    
        def __init__(self,s:requests.session):
            self.s = s
    
        def api_sign(self,username = "test",password = "123456"):
            url = "http://*******/api/v3/login"
            body = {
                "username":username,
                "password":password
            }
            sign = sign_body(body,apikey="12345678")
            body["sign"] = sign
            r = self.s.post(url,json = body)
            return r
    
    if __name__ == '__main__':
        s = requests.session()
        Api_Sign(s).api_sign()
    api_sign.py
    import requests
    import allure
    import pytest
    from common.logger import Log
    from api.api_sign import Api_Sign
    from common.read_yaml import ReadYaml
    testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据
    
    @allure.feature('登录测试sign')
    class Test_Api_Sign():
        '''测试sign登录'''
        log = Log()
    
        @pytest.mark.parametrize("username,password,expect", testdata["test_login_data"],
                                 ids=["正常登录",
                                      "密码为空登录",
                                      "账号为空登录",
                                      "账号错误登录",
                                      "密码错误登录",
                                      "账号存在空格登录",
                                      "密码存在空格登录",
                                      "账号存在特殊符号登录",
                                      "密码存在特殊符号登录",
                                      "账号不完整登录",
                                      "密码不完整登录"])  # 参数化测试用例
        @allure.step('sign签名登录')
        def test_api_sign(self,username,password,expect):
            s = requests.session()
            self.log.info("------用户登录sign接口-----")
            api = Api_Sign(s)#s实例化
            msg = api.api_sign(username,password)
            self.log.info('获取请求结果:%s' % msg.json())
            assert msg.json()["msg"] == expect['msg']
            assert msg.json()["code"] == expect['code']
            assert msg.json()["username"] == username
    test_sign.py

    api/api_jiajiemi.py#测试方法 case/jiajiemi/test_jiajiemi.py#测试用例(方式:post 类型:application/json 学习加密传参,解密返回信息)

    import json
    import requests
    from common.jiajiemi import PrpCrypt
    class Api_Jiajiemi(object):
    
        def __init__(self,s:requests.session):
            self.s = s
    
        def api_jiami(self,username = "test",password = "123456"):
            global r
            url = "http://********/api/v2/login"
            body = {
                "params" :{
                "username":username,
                "password":password
            }}
            params = body.get("params")
            pc = PrpCrypt(key = '12345678')
            #加密
            #print('加密前%s'%params)
            param_en = pc.encrypt(json.dumps(params))
            #print("加密后%s"%param_en)
            body["params"] = param_en
            r = self.s.post(url, json=body)
            return r
    
        def api_jiemi(self):
            try:
                pc = PrpCrypt(key='12345678')
                res_datas = r.json()["datas"]
                en_datas = json.loads(pc.decrypt(res_datas))
                #print("解密后:%s"%en_datas)
                return en_datas
            except KeyError:
                print("不存在datas")
    
    if __name__ == '__main__':
        s = requests.session()
        Api_Jiajiemi(s).api_jiami()
        Api_Jiajiemi(s).api_jiemi()
    api_jiajiemi.py
    import requests
    import pytest
    import allure
    from common.logger import Log
    from api.api_jiajiemi import Api_Jiajiemi
    from common.read_yaml import ReadYaml
    testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据
    
    @allure.feature("登录测试加密解密")
    class Test_Jiajiemi():
        log = Log()
    
        @pytest.mark.parametrize("username,password,expect",testdata["test_login_data"],
                                 ids=["正常登录",
                                      "密码为空登录",
                                      "账号为空登录",
                                      "账号错误登录",
                                      "密码错误登录",
                                      "账号存在空格登录",
                                      "密码存在空格登录",
                                      "账号存在特殊符号登录",
                                      "密码存在特殊符号登录",
                                      "账号不完整登录",
                                      "密码不完整登录"])  # 参数化测试用例
    
        def test_api_jiajiemi(self,username,password,expect):
            s = requests.session()
            self.log.info("------用户登录加密接口-----")
            api = Api_Jiajiemi(s)
            msg = api.api_jiami(username, password)
            #self.log.info('获取请求结果:%s' % msg.json())
            assert msg.json()['msg'] == expect['msg']
            assert msg.json()['code'] == expect['code']
            msg1 = api.api_jiemi()
            print(msg1)
    test_jiajiemi.py

     记录完毕,生成allure报告在pytest.ini中有写,解开注释即可,pytest运行生成报告,或者pytest --alluredir ./report/allure_raw 

  • 相关阅读:
    zbb20180929 dubbo+zookeeper
    zbb20180929 Linux高可用之Keepalived
    zbb20180929 zk Zookeeper的功能以及工作原理
    zbb20180927 Union与Union All的区别
    zbb20180927 MySQL MyISAM InnoDB区别
    zbb20180921 spring事物的七种事物传播属性行为及五种隔离级别
    zbb20180921 java,md5,MD5加密+加盐
    zbb20180921 java,js,javascript 前端加密后端解密Base64通用加密处理
    zbb20180921 springboot 全局异常处理 404 500
    zbb20180919 db,mysql MySQL慢查询
  • 原文地址:https://www.cnblogs.com/MrqiuS/p/13100983.html
Copyright © 2011-2022 走看看