这周末木得事,就整理下自己学习的pytest+allure代码来分享啦。(哈哈直接上代码)
目录
api#存放每个接口的测试方法 case#存放每个接口的测试用例 common#存放一些公共方法 data#存放测试数据 logs#存放测试日志 report#存放测试报告 conftest.py#存放测试用例的一些fixture配置 pytest.ini#pytest的主配置文件 getpathinfo# 获取当前路径 requirements.txt #存放依赖包
学习框架
pytest单元测试框架+allure生成测试报告
结构设计
1.每一个用例组合在一个测试类里面生成一个py文件
2.将每个用例调用方法封装在一个测试类里面生成一个py文件
3.将测试数据存放在yml文件中通过parametrize进行参数化
4.通过allure生成测试报告
学习内容
1.pytes单元测试框架
2.allure生成测试报告
3.yml文件存放测试数据通过parametrize进行参数化
4.sign生成签名传参
5.加密传参,解密校验
练习代码
getpathinfo.py #获取当前路径
import os def get_path(): # 获取当前路径 curpath = os.path.dirname(os.path.realpath(__file__)) return curpath if __name__ == '__main__':# 执行该文件,测试下是否OK print('测试路径是否OK,路径为:', get_path())
conftest.py #存放测试用例的一些fixture配置
import os import pytest import requests from api.get_token import Get_Token @pytest.fixture(scope="session") def gettokenfixture(): '''先登录''' s = requests.session() shili = Get_Token(s) shili.get_token() if not s.headers.get("Authorization", ""):#没有get到token,跳出用例 pytest.skip("跳过用用例") yield s s.close() def pytest_addoption(parser): parser.addoption( "--cmdhost", action="store", default="http://*********", help="my option: type1 or type2" ) @pytest.fixture(scope="session",autouse=True) def host(request): '''获取命令行参数''' #获取命令行参数给到环境变量 #通过pytest --cmdhost 运行指定环境 os.environ["host"] = request.config.getoption("--cmdhost") print("当前用例运行测试环境:%s" % os.environ["host"])
pytest.ini #存放pytest配置文件
[pytest] markers = login: Run mark login case addopts = -s -p no:warnings testpaths = ./case python_files = test_*.py python_classes = Test* python_functions = test_* #addopts = -v --reruns 1 --html=./report/report.html --self-contained-html #addopts = -v --reruns 1 --alluredir ./report/allure_raw #addopts = -v -s -p no:warnings --reruns 1 --pytest_report ./report/Pytest_Report.html
requirements.txt #存放导入依赖包(我这包有点多,流汗。。pip install -r requirements.txt安装依赖包)
absl-py==0.9.0 adbutils==0.3.4 allure-pytest==2.8.6 allure-python-commons==2.8.6 amqp==1.4.9 ansi2html==1.4.2 anyjson==0.3.3 apipkg==1.5 appdirs==1.4.3 Appium-Python-Client==0.26 asn1crypto==0.24.0 astor==0.8.1 atomicwrites==1.1.5 attrs==18.1.0 Babel==2.7.0 backports.csv==1.0.7 bcrypt==3.1.7 BeautifulReport==0.1.2 beautifulsoup4==4.6.1 billiard==3.3.0.23 bs4==0.0.1 cachetools==4.1.0 celery==3.1.26.post2 certifi==2018.4.16 cffi==1.11.5 cfgv==3.1.0 chardet==3.0.4 click==6.7 cma==2.7.0 colorama==0.3.9 colorlog==4.0.2 configparser==5.0.0 cryptography==2.6.1 cryptokit==0.0.7 cssselect==1.0.3 cycler==0.10.0 ddt==1.1.2 decorator==4.4.0 defusedxml==0.5.0 demjson==2.2.4 deprecation==2.0.6 diff-match-patch==20181111 distlib==0.3.0 Django==2.0.3 django-celery==3.2.2 django-cors-headers==3.1.1 django-crispy-forms==1.8.0 django-filter==2.2.0 django-formtools==2.1 django-import-export==1.2.0 django-ranged-response==0.2.0 django-reversion==2.0.0 django-simple-captcha==0.5.10 django-stdimage==4.0.1 djangorestframework==3.10.3 docopt==0.6.2 dwebsocket==0.4.2 enum34==1.1.6 et-xmlfile==1.0.1 eventlet==0.22.1 execnet==1.5.0 facebook-wda==0.3.4 fake-useragent==0.1.11 filelock==3.0.12 fire==0.1.3 flake8==3.8.1 Flask==1.0.2 flower==0.9.2 funcsigs==1.0.2 future==0.15.2 futures==3.1.1 gast==0.3.3 gevent==1.3.6 google-auth==1.15.0 google-auth-oauthlib==0.4.1 graphviz==0.14 greenlet==0.4.15 grpcio==1.29.0 har2case==0.3.1 httpie==1.0.3 httplib2==0.9.2 HttpRunner==1.5.8 humanize==0.5.1 identify==1.4.15 idna==2.6 importlib-metadata==1.2.0 itsdangerous==0.24 jdcal==1.4 Jinja2==2.10 joblib==0.15.1 kiwisolver==1.2.0 kombu==3.0.37 locustio==0.11.0 logzero==1.5.0 lxml==4.2.5 Markdown==3.1.1 MarkupSafe==1.0 matplotlib==3.2.1 mccabe==0.6.1 more-itertools==4.2.0 msgpack==0.5.6 namedlist==1.7 nltk==3.5 nodeenv==1.3.5 numpy==1.18.4 oauthlib==3.1.0 objgraph==3.4.1 odfpy==1.4.0 opencv-python==4.2.0.34 openpyxl==2.5.9 packaging==19.0 paddlehub==1.6.0 paddlepaddle==1.8.1 pandas==1.0.3 parameterized==0.7.1 paramiko==2.4.1 ParamUnittest==0.2 parse==1.11.1 pathlib==1.0.1 Pillow==5.2.0 pluggy==0.13.1 pre-commit==2.4.0 prettytable==0.7.2 progress==1.5 progressbar2==3.39.3 protobuf==3.12.0 py==1.5.4 pyasn1==0.4.7 pyasn1-modules==0.2.8 pycodestyle==2.6.0 pycparser==2.18 pycryptodome==3.8.2 pyee==5.0.0 pyflakes==2.2.0 Pygments==2.4.2 PyMySQL==0.9.3 PyNaCl==1.3.0 pyOpenSSL==17.5.0 pyparsing==2.4.0 pyppeteer==0.0.25 pyquery==1.4.0 pytest==4.5.0 pytest-forked==0.2 pytest-html==1.19.0 pytest-metadata==1.7.0 pytest-repeat==0.7.0 pytest-rerunfailures==8.0 pytest-xdist==1.23.2 PyTestReport==0.2.1 python-dateutil==2.8.1 python-utils==2.3.0 pytz==2018.5 pywin32==223 PyYAML==3.12 pyzmq==17.1.2 rarfile==3.1 regex==2020.5.14 requests==2.22.0 requests-html==0.10.0 requests-oauthlib==1.3.0 requests-toolbelt==0.8.0 retry==0.9.2 rsa==4.0 scipy==1.3.1 selenium==2.53.6 sentencepiece==0.1.90 six==1.10.0 tablib==0.13.0 tb-paddle==0.4.0 tensorboard==2.2.1 tensorboard-plugin-wit==1.6.0.post3 toml==0.10.1 tomorrow==0.2.4 tornado==6.0.3 tqdm==4.31.1 uiautomator2==0.3.3 urllib3==1.22 virtualenv==20.0.20 w3lib==1.20.0 wcwidth==0.1.7 websocket-client==0.57.0 websockets==7.0 weditor==0.2.3 Werkzeug==1.0.1 whichcraft==0.6.0 xlrd==1.1.0 xlwt==1.3.0 yapf==0.26.0 zipp==3.1.0
common/connect_mysql.py #连接操作数据库
import pymysql dbinfo = { "host":"******", "user":"root", "password":"123456", "port":3309 } class DbConnect(): def __init__(self,db_conf,database=""): self.db_conf = db_conf #打开数据库 self.db = pymysql.connect(database = database, cursorclass = pymysql.cursors.DictCursor, **db_conf) #使用cursor()方式获取操作游标 self.cursor = self.db.cursor() def select(self,sql): #sql查询 self.cursor.execute(sql)#执行sql results = self.cursor.fetchall() return results def execute(self,sql): #sql 删除 提示 修改 try: self.cursor.execute(sql)#执行sql self.db.commit()#提交修改 except: #发生错误时回滚 self.db.rollback() def close(self): self.db.close()#关闭连接 def select_sql(select_sql): '''查询数据库''' db = DbConnect(dbinfo,database='apps') result = db.select(select_sql) db.close() return result def execute_sql(sql): '''执行SQL''' db = DbConnect(dbinfo,database='apps') db.execute(sql) db.close() if __name__ == '__main__': sql = 'DELETE from auth_user WHERE username = "qiushui"' delete = execute_sql(sql) print(delete)
common/read_yaml.py #读取yml文件数据
import os import yaml import getpathinfo class ReadYaml(): def __init__(self,filename): path = getpathinfo.get_path()#获取本地路径 self.filepath = os.path.join(path,'data')+"/"+filename#拼接定位到data文件夹 def get_yaml_data(self): with open(self.filepath, "r", encoding="utf-8")as f: # 调用load方法加载文件流 return yaml.load(f) if __name__ == '__main__': data = ReadYaml("regiter_one.yml").get_yaml_data() print(data)
cmmon/logger.py # 生成日志输出
import logging,time import os import getpathinfo path = getpathinfo.get_path()#获取本地路径 log_path = os.path.join(path,'logs')# log_path是存放日志的路径 # 如果不存在这个logs文件夹,就自动创建一个 if not os.path.exists(log_path):os.mkdir(log_path) class Log(): def __init__(self): #文件的命名 self.logname = os.path.join(log_path,'%s.log'%time.strftime('%Y_%m_%d')) self.logger = logging.getLogger() self.logger.setLevel(logging.DEBUG) #日志输出格式 self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') def __console(self,level,message): #创建一个fileHander,用于写入本地 fh = logging.FileHandler(self.logname,'a',encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(self.formatter) self.logger.addHandler(fh) #创建一个StreamHandler,用于输入到控制台 ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(self.formatter) self.logger.addHandler(ch) if level == 'info': self.logger.info(message) elif level == 'debug': self.logger.debug(message) elif level == 'warning': self.logger.warning(message) elif level == 'error': self.logger.error(message) #避免日志重复 self.logger.removeHandler(fh) self.logger.removeHandler(ch) #关闭打开文件 fh.close() def debug(self,message): self.__console('debug',message) def info(self,message): self.__console('info',message) def warning(self,message): self.__console('warning',message) def error(self,message): self.__console('error',message) if __name__ == '__main__': log = Log() log.info('测试') log.debug('测试') log.warning('测试') log.error('测试')
common/sign.py #生成sign签名
import hashlib def sign_body(body, apikey="12345678"): '''请求body sign签名''' # 列表生成式,生成key = value格式 a = ["".join(i) for i in body.items() if i[1] and i[0] != "sign"] # 参数名ASCII码从小到大排序 strA = "".join(sorted(a)) # 在strA后面拼接上apiKey得到strsigntemp字符串 strsigntemp = strA + apikey # 将strsigntemp字符转换为小写字符串进行MD5运算 # MD5加密 def jiamimd5(src): m = hashlib.md5() m.update(src.encode('utf-8')) return m.hexdigest() sign = jiamimd5(strsigntemp.lower()) return sign
cmmon/jiajiemi.py #加密、解密方法
import json from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import algorithms from Crypto.Cipher import AES from binascii import b2a_hex, a2b_hex class PrpCrypt(object): def __init__(self, key='0000000000000000'): self.key = key.encode('utf-8') self.mode = AES.MODE_CBC self.iv = b'0102030405060708' # block_size 128位 # 加密函数,如果text不足16位就用空格补足为16位, # 如果大于16但是不是16的倍数,那就补足为16的倍数。 def encrypt(self, text): cryptor = AES.new(self.key, self.mode, self.iv) text = text.encode('utf-8') # 这里密钥key 长度必须为16(AES-128),24(AES-192),或者32 (AES-256)Bytes 长度 # 目前AES-128 足够目前使用 text=self.pkcs7_padding(text) self.ciphertext = cryptor.encrypt(text) # 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题 # 所以这里统一把加密后的字符串转化为16进制字符串 return b2a_hex(self.ciphertext).decode().upper() @staticmethod def pkcs7_padding(data): if not isinstance(data, bytes): data = data.encode() padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(data) + padder.finalize() return padded_data @staticmethod def pkcs7_unpadding(padded_data): unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() data = unpadder.update(padded_data) try: uppadded_data = data + unpadder.finalize() except ValueError: raise Exception('无效的加密信息!') else: return uppadded_data # 解密后,去掉补足的空格用strip() 去掉 def decrypt(self, text): # 偏移量'iv' cryptor = AES.new(self.key, self.mode, self.iv) plain_text = cryptor.decrypt(a2b_hex(text)) # return plain_text.rstrip(' ') return bytes.decode(plain_text).rstrip("x01"). rstrip("x02").rstrip("x03").rstrip("x04").rstrip("x05"). rstrip("x06").rstrip("x07").rstrip("x08").rstrip("x09"). rstrip("x0a").rstrip("x0b").rstrip("x0c").rstrip("x0d"). rstrip("x0e").rstrip("x0f").rstrip("x10") def dict_json(self, d): '''python字典转json字符串, 去掉一些空格''' j = json.dumps(d).replace('": ', '":').replace(', "', ',"').replace(", {", ",{") return j if __name__ == '__main__': pc = PrpCrypt('12345678 ') a = "1" print("加密前%s"%a) b = pc.encrypt(a) print("加密后%s"%b.lower())
data/regiter_data.yml #存放test_regiter.py测试数据(都一样,在此只展示一个了)
test_regiter_data: - ["qiushui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["秋水","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["qiu秋","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["qiushui_@","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["qiushui123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["qiushui@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["秋qiu@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["qiu shui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] - ["qiu shuiq神神道道所多多","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}] test_regiter_repeat: - ["秋水君","123456","hanxi@163.com", {'msg': '秋水君用户已被注册', 'code': 0}]
api/login_method.py #测试方法case/login/test_login.py#测试用例(方式:post 类型:application/json)
''' Code description:登录方法 Create time: Developer: ''' import requests class Login(object): def __init__(self,s:requests.session): self.s = s def login(self,username="test",password="123456"): url = "http://*********" boby = { "username":username, "password":password } return self.s.post(url,json = boby)
import allure import requests import pytest from api.login_method import Login from common.logger import Log from common.read_yaml import ReadYaml testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据 @allure.feature('登录测试用例接口')#测试报告显示测试功能 class Test_login(): '''测试登录接口''' log = Log() @pytest.mark.parametrize("username,password,expect",testdata["test_login_data"], ids = ["正常登录", "密码为空登录", "账号为空登录", "账号错误登录", "密码错误登录", "账号存在空格登录", "密码存在空格登录", "账号存在特殊符号登录", "密码存在特殊符号登录", "账号不完整登录", "密码不完整登录"])#参数化测试用例 @allure.step('账号,密码登录')#测试报告显示步骤 @allure.link('http://**********:6009/api/v1/login',name='测试接口')#测试报告显示链接 def test_login(self,username,password,expect): s = requests.session()#定义session会话 self.log.info('------用户登录接口-----') shili = Login(s)#实例化 msg = shili.login(username,password) self.log.info('获取请求结果:%s'%msg.json()) #print(msg.json()) #断言 assert msg.json()["msg"] == expect['msg'] assert msg.json()["code"] == expect['code']
api/login_xadmin.py#测试方法case/login/test_login_xadmin.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参)
''' Code description:登录方法 Create time: Developer: ''' import os import requests import re class Login_Admin(object): def __init__(self,s:requests.session): self.s = s def login(self,username="admin",password="123456",this_is_the_login_form = "1", next = "/xadmin/"): '''xadmin登录''' url = os.environ["host"]+"/xadmin/"#读取conftest.py文件地址进行拼接 r1 = self.s.get(url) tokens = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text) #print(tokens[0])#获取隐藏参数csrfmiddlewaretoken body = { "csrfmiddlewaretoken": tokens[0], "username": username, "password": password, "this_is_the_login_form": this_is_the_login_form, "next": next } r2 = self.s.post(url,data = body) return r2 #print(r2.text) #assert "主页面 | 后台页面" in r2.text if __name__ == '__main__': s = requests.session() Login_Admin(s).login()
import allure import requests import pytest from api.login_xadmin import Login_Admin from common.read_yaml import ReadYaml from common.logger import Log testdata = ReadYaml("login_xadmin.yml").get_yaml_data()#读取数据 @allure.feature('登录接口')#测试报告显示测试功能 class Test_Login_Xadmin(): '''xadmin登录''' log = Log() @pytest.mark.parametrize("username,password,this_is_the_login_form,next,expect", testdata["test_login_data"], ids=["正常登录", "用户名为空登录", "密码为空登录", "错误用户名登录", "错误密码登录", "错误参数this_is_the_login_form登录", "错误参数next登录", "参数next为空登录" ])#参数化测试用例 @allure.step("登录界面输入账号,密码登录")#测试报告显示测试步骤 @allure.link('http://******/xadmin/',name='测试接口')#测试报告链接 def test_login_xadmin(self,username,password, this_is_the_login_form,next,expect): s = requests.session() self.log.info('------用户登录接口-----') shili = Login_Admin(s) # 实例化 msg = shili.login(username, password,this_is_the_login_form,next) #print(msg.text) self.log.info('获取请求结果:%s' % msg.text) assert expect in msg.text#断言验证是否通过
api/get_token.py #获取登录token,方便调用
''' Code description:获取token testcase Create time: Developer: ''' import requests class Get_Token(object): def __init__(self,s:requests.session): self.s = s def get_token(self): url = "http://******/api/v1/login" boby = { "username":"hanxi", "password":"123456" } r = self.s.post(url,json = boby) #print(r.json()) #获取token token = r.json()["token"] header = { "Authorization": "Token %s" % token } self.s.headers.update(header)#更新token到session return token if __name__ == '__main__': s = requests.session() a = Get_Token(s) a.get_token()
api/update_info_method.py#测试方法case/updateinfo/test_updateinfo.py#测试用例(方式:post类型:application/json 学习fixture前置登录)
import requests from api.get_token import Get_Token class Update_Info(): '''更新用户信息''' def __init__(self,s:requests.sessions): self.s = s def update_info(self,name = "hanxi",mail = "222@163.com",sex = "M",age = 23): url = "http://********/api/v1/userinfo" body = { "name":name, "mail":mail, "sex":sex, "age":age } return self.s.post(url,json = body) #print(r.json()) #return r.json() if __name__ == '__main__': s = requests.session() Get_Token(s).get_token() a = Update_Info(s) infos = a.update_info(name="hanxi", mail="xxx@qq.com") print(infos.text)
import allure import pytest from common.logger import Log from common.read_yaml import ReadYaml from api.update_info_method import Update_Info testdata = ReadYaml('update_info.yml').get_yaml_data()#读取数据 @allure.feature('更新用户信息接口')#测试报告显示测试功能 class Test_UpdateInfo(): log = Log() '''更新用户信息''' @pytest.mark.parametrize("name,mail,sex,age,expect",testdata["test_update_data"], ids=["正常修改", "修改其他用户名", "错误xxx163.com邮箱修改", "错误邮箱xxx@163com修改", "错误xxx@163.邮箱修改", "错误邮箱@163.com修改", "错误hanxi@.com邮箱修改", "错误han xi@163.com邮箱修改", "错误邮箱修改hanxi@1 63.com", "性别为空修改", "性别F修改", "性别错误参数X修改", "性别输出汉字男修改", "年龄为空修改", "年龄存在空格修改2 3", "年龄存在特殊字符修改23_", "年龄汉字修改", "年龄数字加字母修改" ])#参数化测试用例 @allure.step('获取登录token,可修改用户名、邮箱、性别、年龄')#测试报告显示操作步骤 @allure.link('http://*********/api/v1/userinfo',name='测试接口')#测试报告显示链接 def test_updateinfo(self,gettokenfixture,name,mail,sex,age,expect): s = gettokenfixture#登录获取token self.log.info('------修改用户信息接口-----') a = Update_Info(s)#实例化 msg = a.update_info(name,mail,sex,age) #print(msg.json()) self.log.info('获取请求结果:%s' % msg.json()) #断言 assert msg.json()["message"] == expect['message'] assert msg.json()["code"] == expect['code']
api/regiter_method.py #测试方法 case/regiter/test/regiter.py#测试用例(方式:post 类型:application/json 学习fixture前置连接数据库操作)
import requests class Regiter(): def __init__(self,s:requests.session()): self.s = s '''注册''' def regiter_user(self,username,password,mail): url = "http://*******:6009/api/v1/register" body ={ "username": username, "password": password, "mail": mail } return self.s.post(url,json = body)
import allure import requests import pytest from common.logger import Log from api.regiter_method import Regiter from common.connect_mysql import execute_sql from common.read_yaml import ReadYaml testdata = ReadYaml("regiter_data.yml").get_yaml_data()#读取测试数据 @pytest.fixture(scope="function")#设置前置清除操作 def delete_data(): '''执行sql,删除之前注册信息''' sql = "DELETE from apps.auth_user WHERE username like 'qiu'" execute_sql(sql) yield @allure.feature('注册用户接口')#测试报告显示测试功能 class TestRegiter(): '''注册''' log = Log() @allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤 @allure.link('http://*******/api/v1/register',name='测试接口')#测试报告显示测试链接 @pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_data"], ids =["正常注册", "用户名汉字注册", "用户名汉字加英文注册", "用户名英文加特殊符号注册", "用户名英文加数字注册", "用户名英文特殊符号加数字注册", "用户名汉字英文字符数字注册", "用户名存在空格注册", "用户名字符特长注册"])#参数化测试用例 def test_regirer(self,delete_data,username,password,mail,expect): '''注册''' s = requests.session()#定义session self.log.info('------用户注册接口-----') shili = Regiter(s) msg = shili.regiter_user(username,password,mail) self.log.info('获取请求结果:%s' % msg.json()) #print(msg.json()['msg']) assert msg.json()['msg'] == expect['msg'] @allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤 @allure.link('http://********/api/v1/register', name='测试接口')#测试报告显示测试链接 @pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_repeat"], ids=['重复注册']) def test_repeat_regirer(self,username,password,mail,expect): '''重复注册''' s = requests.session()#定义session self.log.info('------用户注册接口-----') shili = Regiter(s) msg = shili.regiter_user(username,password,mail) #print(msg.json()) self.log.info('获取请求结果:%s' % msg.json()) assert msg.json()['msg'] == expect['msg']#断言响应信息
api/add_teacher.py#测试方法 case/test_add_teacher.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参,fixture前置连接数据库操作,获取页面元素断言,数据库断言)
''' Code description:显示老师 Create time: Developer: ''' import os import requests import re from requests_toolbelt import MultipartEncoder from api.login_xadmin import Login_Admin class Add_Teacher(): def __init__(self,s:requests.session): self.s = s def add_teacher(self,teacher_name="test",tel="122222222",mail="1111@qq.com",sex = "M"): '''添加老师''' url = os.environ["host"]+"/xadmin/hello/teacherman/add/"#显示老师页面 r1 = self.s.get(url) #print(r.text)#正则获取隐藏元素 csrfmiddlewaretoken = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text) #print(csrfmiddlewaretoken) body = MultipartEncoder(fields=[ ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]), ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]), ("teacher_name", teacher_name), ("tel", tel), ("mail", mail), ("sex", sex), ("_save", "") ]) r2 = self.s.post(url,data = body,headers={"content-Type": body.content_type}) return r2 #print(r2.text) if __name__ == '__main__': s = requests.session() Login_Admin(s).login() Add_Teacher(s).add_teacher()
import allure import requests import pytest from lxml import etree from common.logger import Log from api.add_teacher import Add_Teacher from api.login_xadmin import Login_Admin from common.read_yaml import ReadYaml from common.connect_mysql import execute_sql,select_sql testdata = ReadYaml("add_teacher_info.yml").get_yaml_data() @pytest.fixture(scope="function")#设置前置清除操作 def delete_newteacher(): sql = "DELETE FROM djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'" execute_sql(sql) yield #print("数据清理操作") @allure.feature('显示教师')#测试报告显示测试功能 class Test_Add_Teacher(): '''添加教师''' log = Log() @pytest.mark.parametrize("teacher_name,tel,mail,sex",testdata["test_addteacher_data"], ids=["正常显示"]) @allure.title("显示new teacher")#测试报告显示标题 @allure.step('先登录,输入显示教师信息,进行显示教师')#测试报告显示步骤 @allure.link('http://********/xadmin/hello/teacherman/add/',name='测试接口')#测试报告显示链接 def test_add_teacher(self,delete_newteacher,teacher_name,tel,mail,sex): s = requests.session() Login_Admin(s).login()#登录 self.log.info('------显示教师接口-----') shili = Add_Teacher(s)#实例化添加教师 msg = shili.add_teacher(teacher_name,tel,mail,sex) demo = etree.HTML(msg.text) nodes = demo.xpath('//*[@id="changelist-form"]/div[1]/table/tbody/tr[1]/td[2]/a') #print(nodes[0]) get_result = nodes[0].text#获取元素属性 #print(get_result) self.log.info('获取请求结果:%s' % get_result) assert get_result == teacher_name #页面存在该字段验证通过 sql = "SELECT count(*) as sum from djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'" result = select_sql(sql)[0]["sum"] assert result == 1 #查找数据库数量为1验证通过
api/upload_picture_file.py#测试方法 case/upload_picturefile/test_upload.py#测试用例(方式:post 类型:multipart/form-data 学习隐藏参数正则提取传参,上传文件,图片,fixture前置连接数据库操作,获取页面元素断言,数据库断言)
import os import re import requests from requests_toolbelt import MultipartEncoder import getpathinfo from api.login_xadmin import Login_Admin class Upload_picture_file(): def __init__(self,s:requests.session()): self.s = s path = getpathinfo.get_path() # 获取本地路径 self.filepath = os.path.join(path, 'data') + "/" # 拼接定位到上传图片,文件 def upload_picture_file(self): '''上传图片文件''' url = os.environ["host"]+"/xadmin/hello/fileimage/add/" r1 = self.s.get(url) #print(r1.text) csrfmiddlewaretoken = re.findall(" name='csrfmiddlewaretoken' value='(.+?)'",r1.text) #print(csrfmiddlewaretoken)#获取隐藏元素 body = MultipartEncoder(fields=[ ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]), ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]), ("title", "上传图片文件测试"), ("image", ("2.png", open(self.filepath+"2.png", "rb"), "image/png")), ("fiels", ("测试.txt", open(self.filepath+"测试.txt", "rb"),"text/plain")), ("_save", "") ]) r2 = self.s.post(url,data = body,headers = {"content-Type":body.content_type}) #print(r2.text) return r2 if __name__ == '__main__': s = requests.session() Login_Admin(s).login() Upload_picture_file(s).upload_picture_file()
import allure import requests from lxml import etree from api.upload_picture_file import Upload_picture_file import pytest from api.login_xadmin import Login_Admin from common.connect_mysql import execute_sql,select_sql from common.logger import Log @pytest.fixture(scope="function")#设置前置删除操作 def delete_file(): sql = "DELETE FROM djangoweb.hello_fileimage WHERE title = '上传图片文件测试'" execute_sql(sql) yield # print("数据清理操作") @allure.feature('上传文件图片接口操作')#测试报告显示测试功能 class Test_Upload_Picture_file(): log = Log() @allure.title('上传文件图片')#测试报告显示测试标题 @allure.step('先登录,再上传')#测试报告显示测试步骤 @allure.link('http://*********/xadmin/hello/fileimage/add/',name='测试接口')#测试报告显示测试链接 def test_upload_picture_file(self,delete_file): s = requests.session() Login_Admin(s).login()#登录 self.log.info('------文件图片上传接口-----') shili = Upload_picture_file(s)#实例化上传 msg = shili.upload_picture_file() #print(msg.text) demo = etree.HTML(msg.text) nodes = demo.xpath('//*[@id="changelist-form"]/div[1]/table/tbody/tr[1]/td[2]/a') get_result = nodes[0].text # 获取元素属性 #print(get_result) self.log.info('获取响应数据:%s' %get_result) assert get_result == '上传图片文件测试'#页面存在该字段验证通过 sql = "SELECT count(*) as sum from djangoweb.hello_fileimage WHERE title = '上传图片文件测试'" result = select_sql(sql)[0]["sum"] assert result == 1#查找数据库数量为1验证通过
api/api_sign.py#测试方法 case/api_sign/test_sign.py#测试用例(方式:post 类型:application/json 学习sign签名传参)
import requests from common.sign import sign_body class Api_Sign(object): def __init__(self,s:requests.session): self.s = s def api_sign(self,username = "test",password = "123456"): url = "http://*******/api/v3/login" body = { "username":username, "password":password } sign = sign_body(body,apikey="12345678") body["sign"] = sign r = self.s.post(url,json = body) return r if __name__ == '__main__': s = requests.session() Api_Sign(s).api_sign()
import requests import allure import pytest from common.logger import Log from api.api_sign import Api_Sign from common.read_yaml import ReadYaml testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据 @allure.feature('登录测试sign') class Test_Api_Sign(): '''测试sign登录''' log = Log() @pytest.mark.parametrize("username,password,expect", testdata["test_login_data"], ids=["正常登录", "密码为空登录", "账号为空登录", "账号错误登录", "密码错误登录", "账号存在空格登录", "密码存在空格登录", "账号存在特殊符号登录", "密码存在特殊符号登录", "账号不完整登录", "密码不完整登录"]) # 参数化测试用例 @allure.step('sign签名登录') def test_api_sign(self,username,password,expect): s = requests.session() self.log.info("------用户登录sign接口-----") api = Api_Sign(s)#s实例化 msg = api.api_sign(username,password) self.log.info('获取请求结果:%s' % msg.json()) assert msg.json()["msg"] == expect['msg'] assert msg.json()["code"] == expect['code'] assert msg.json()["username"] == username
api/api_jiajiemi.py#测试方法 case/jiajiemi/test_jiajiemi.py#测试用例(方式:post 类型:application/json 学习加密传参,解密返回信息)
import json import requests from common.jiajiemi import PrpCrypt class Api_Jiajiemi(object): def __init__(self,s:requests.session): self.s = s def api_jiami(self,username = "test",password = "123456"): global r url = "http://********/api/v2/login" body = { "params" :{ "username":username, "password":password }} params = body.get("params") pc = PrpCrypt(key = '12345678 ') #加密 #print('加密前%s'%params) param_en = pc.encrypt(json.dumps(params)) #print("加密后%s"%param_en) body["params"] = param_en r = self.s.post(url, json=body) return r def api_jiemi(self): try: pc = PrpCrypt(key='12345678 ') res_datas = r.json()["datas"] en_datas = json.loads(pc.decrypt(res_datas)) #print("解密后:%s"%en_datas) return en_datas except KeyError: print("不存在datas") if __name__ == '__main__': s = requests.session() Api_Jiajiemi(s).api_jiami() Api_Jiajiemi(s).api_jiemi()
import requests import pytest import allure from common.logger import Log from api.api_jiajiemi import Api_Jiajiemi from common.read_yaml import ReadYaml testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据 @allure.feature("登录测试加密解密") class Test_Jiajiemi(): log = Log() @pytest.mark.parametrize("username,password,expect",testdata["test_login_data"], ids=["正常登录", "密码为空登录", "账号为空登录", "账号错误登录", "密码错误登录", "账号存在空格登录", "密码存在空格登录", "账号存在特殊符号登录", "密码存在特殊符号登录", "账号不完整登录", "密码不完整登录"]) # 参数化测试用例 def test_api_jiajiemi(self,username,password,expect): s = requests.session() self.log.info("------用户登录加密接口-----") api = Api_Jiajiemi(s) msg = api.api_jiami(username, password) #self.log.info('获取请求结果:%s' % msg.json()) assert msg.json()['msg'] == expect['msg'] assert msg.json()['code'] == expect['code'] msg1 = api.api_jiemi() print(msg1)
记录完毕,生成allure报告在pytest.ini中有写,解开注释即可,pytest运行生成报告,或者pytest --alluredir ./report/allure_raw