zoukankan      html  css  js  c++  java
  • HttpRunner3源码阅读:8. 用例文件生成并格式化make

    make

    这个文件中主要实现了 相关的文件生成,目录生成make.py,其实这个文件应该在client.py前看的

    可用资料

    jinja2[模板,常用于Python Web 前后端不分离]: https://jinja.palletsprojects.com/en/3.0.x/intro/
    black[代码格式化]: https://black.readthedocs.io/en/stable/
    from string import Template 也可实现字符串模板替换
    string包下的Template: https://docs.python.org/3/library/string.html#format-string-syntax
    argparse[命令行选项]:https://docs.python.org/zh-cn/3/library/argparse.html

    导包

    import os
    import string
    import subprocess
    import sys
    from typing import Text, List, Tuple, Dict, Set, NoReturn
    
    # 模板库,写过flask 的应该很熟悉,其灵感来自django
    import jinja2
    from loguru import logger
    from sentry_sdk import capture_exception
    
    from httprunner import exceptions, __version__
    # v2 v3 版本兼容
    from httprunner.compat import (
        ensure_testcase_v3_api,
        ensure_testcase_v3,
        convert_variables,
        ensure_path_sep,
    )
    # 导文件处理的
    from httprunner.loader import (
        load_folder_files,
        load_test_file,
        load_testcase,
        load_testsuite,
        load_project_meta,
        convert_relative_project_root_dir,
    )
    # 验证器
    from httprunner.response import uniform_validator
    # 变量合并,多进程处理
    from httprunner.utils import merge_variables, is_support_multiprocessing
    
    

    源码附注释

    
    """ cache converted pytest files, avoid duplicate making
    pytest测试文件,缓存字典
    """
    pytest_files_made_cache_mapping: Dict[Text, Text] = {}
    
    """ save generated pytest files to run, except referenced testcase
        运行集合,排除引入的测试用例
    """
    pytest_files_run_set: Set = set()
    # httprunner3, test_xx.py 模板
    __TEMPLATE__ = jinja2.Template(
        """# NOTE: Generated By HttpRunner v{{ version }}
    # FROM: {{ testcase_path }}
    
    {% if imports_list and diff_levels > 0 %}
    import sys
    from pathlib import Path
    sys.path.insert(0, str(Path(__file__){% for _ in range(diff_levels) %}.parent{% endfor %}))
    {% endif %}
    
    {% if parameters %}
    import pytest
    from httprunner import Parameters
    {% endif %}
    
    from httprunner import HttpRunner, Config, Step, RunRequest, RunTestCase
    {% for import_str in imports_list %}
    {{ import_str }}
    {% endfor %}
    
    class {{ class_name }}(HttpRunner):
    
        {% if parameters %}
        @pytest.mark.parametrize("param", Parameters({{parameters}}))
        def test_start(self, param):
            super().test_start(param)
        {% endif %}
    
        config = {{ config_chain_style }}
    
        teststeps = [
            {% for step_chain_style in teststeps_chain_style %}
                {{ step_chain_style }},
            {% endfor %}
        ]
    
    if __name__ == "__main__":
        {{ class_name }}().test_start()
    
    """
    )
    
    # 不同操作系统地址符 兼容处理, 返回绝对地址
    def __ensure_absolute(path: Text) -> Text:
        if path.startswith("./"):
            # Linux/Darwin, hrun ./test.yml
            path = path[len("./") :]
        elif path.startswith(".\"):
            # Windows, hrun .\test.yml
            path = path[len(".\") :]
    
        path = ensure_path_sep(path)
        project_meta = load_project_meta(path)
    
        if os.path.isabs(path):
            absolute_path = path
        else:
            absolute_path = os.path.join(project_meta.RootDir, path)
    
        if not os.path.isfile(absolute_path):
            logger.error(f"Invalid testcase file path: {absolute_path}")
            sys.exit(1)
    
        return absolute_path
    
    
    def ensure_file_abs_path_valid(file_abs_path: Text) -> Text:
        """ ensure file path valid for pytest, handle cases when directory name includes dot/hyphen/space
                确保地址有效
        Args:
            file_abs_path: absolute file path
    
        Returns:
            ensured valid absolute file path
    
        """
        project_meta = load_project_meta(file_abs_path)
        raw_abs_file_name, file_suffix = os.path.splitext(file_abs_path)
        file_suffix = file_suffix.lower()
    
        raw_file_relative_name = convert_relative_project_root_dir(raw_abs_file_name)
        if raw_file_relative_name == "":
            return file_abs_path
    
        path_names = []
        for name in raw_file_relative_name.rstrip(os.sep).split(os.sep):
            # 包含数字0~9的字符串 string.digits
            if name[0] in string.digits:
                # ensure file name not startswith digit
                # 19 => T19, 2C => T2C
                name = f"T{name}"
    
            if name.startswith("."):
                # avoid ".csv" been converted to "_csv"
                pass
            else:
                # handle cases when directory name includes dot/hyphen/space
                name = name.replace(" ", "_").replace(".", "_").replace("-", "_")
    
            path_names.append(name)
    
        new_file_path = os.path.join(
            project_meta.RootDir, f"{os.sep.join(path_names)}{file_suffix}"
        )
        return new_file_path
    
    # 按需生成测试模块 加让 __init__.py
    def __ensure_testcase_module(path: Text) -> NoReturn:
        """ ensure pytest files are in python module, generate __init__.py on demand
        """
        init_file = os.path.join(os.path.dirname(path), "__init__.py")
        if os.path.isfile(init_file):
            return
    
        with open(init_file, "w", encoding="utf-8") as f:
            f.write("# NOTICE: Generated By HttpRunner. DO NOT EDIT!
    ")
    
    
    def convert_testcase_path(testcase_abs_path: Text) -> Tuple[Text, Text]:
        """convert single YAML/JSON testcase path to python file"""
        testcase_new_path = ensure_file_abs_path_valid(testcase_abs_path)
    
        dir_path = os.path.dirname(testcase_new_path)
        file_name, _ = os.path.splitext(os.path.basename(testcase_new_path))
        testcase_python_abs_path = os.path.join(dir_path, f"{file_name}_test.py")
    
        # convert title case, e.g. request_with_variables => RequestWithVariables
        name_in_title_case = file_name.title().replace("_", "")
    
        return testcase_python_abs_path, name_in_title_case
    
    
    def format_pytest_with_black(*python_paths: Text) -> NoReturn:
        # 格式化Python文件 Black
        logger.info("format pytest cases with black ...")
        try:
            if is_support_multiprocessing() or len(python_paths) <= 1:
                subprocess.run(["black", *python_paths])
            else:
                logger.warning(
                    f"this system does not support multiprocessing well, format files one by one ..."
                )
                # 运行被 arg 描述的指令。等待指令完成,然后返回一个 CompletedProcess 实例。subprocess.run
                [subprocess.run(["black", path]) for path in python_paths]
        except subprocess.CalledProcessError as ex:
            capture_exception(ex)
            logger.error(ex)
            sys.exit(1)
        except FileNotFoundError:
            err_msg = """
    missing dependency tool: black
    install black manually and try again:
    $ pip install black
    """
            logger.error(err_msg)
            sys.exit(1)
    
    
    def make_config_chain_style(config: Dict) -> Text:
        # 模板文件中Config 格式化
        config_chain_style = f'Config("{config["name"]}")'
    
        if config["variables"]:
            variables = config["variables"]
            config_chain_style += f".variables(**{variables})"
    
        if "base_url" in config:
            config_chain_style += f'.base_url("{config["base_url"]}")'
    
        if "verify" in config:
            config_chain_style += f'.verify({config["verify"]})'
    
        if "export" in config:
            config_chain_style += f'.export(*{config["export"]})'
    
        if "weight" in config:
            config_chain_style += f'.locust_weight({config["weight"]})'
    
        return config_chain_style
    
    
    def make_request_chain_style(request: Dict) -> Text:
        # Reuests部分格式化
        method = request["method"].lower()
        url = request["url"]
        request_chain_style = f'.{method}("{url}")'
    
        if "params" in request:
            params = request["params"]
            request_chain_style += f".with_params(**{params})"
    
        if "headers" in request:
            headers = request["headers"]
            request_chain_style += f".with_headers(**{headers})"
    
        if "cookies" in request:
            cookies = request["cookies"]
            request_chain_style += f".with_cookies(**{cookies})"
    
        if "data" in request:
            data = request["data"]
            if isinstance(data, Text):
                data = f'"{data}"'
            request_chain_style += f".with_data({data})"
    
        if "json" in request:
            req_json = request["json"]
            if isinstance(req_json, Text):
                req_json = f'"{req_json}"'
            request_chain_style += f".with_json({req_json})"
    
        if "timeout" in request:
            timeout = request["timeout"]
            request_chain_style += f".set_timeout({timeout})"
    
        if "verify" in request:
            verify = request["verify"]
            request_chain_style += f".set_verify({verify})"
    
        if "allow_redirects" in request:
            allow_redirects = request["allow_redirects"]
            request_chain_style += f".set_allow_redirects({allow_redirects})"
    
        if "upload" in request:
            upload = request["upload"]
            request_chain_style += f".upload(**{upload})"
    
        return request_chain_style
    
    
    def make_teststep_chain_style(teststep: Dict) -> Text:
        # Step 部分格式化
        if teststep.get("request"):  # 全新请求
            step_info = f'RunRequest("{teststep["name"]}")'
        elif teststep.get("testcase"):  # 调用其他测试用例
            step_info = f'RunTestCase("{teststep["name"]}")'
        else:
            raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")
    
        if "variables" in teststep:
            variables = teststep["variables"]
            step_info += f".with_variables(**{variables})"
    
        if "setup_hooks" in teststep:
            setup_hooks = teststep["setup_hooks"]
            for hook in setup_hooks:
                if isinstance(hook, Text):
                    step_info += f'.setup_hook("{hook}")'
                elif isinstance(hook, Dict) and len(hook) == 1:
                    assign_var_name, hook_content = list(hook.items())[0]
                    step_info += f'.setup_hook("{hook_content}", "{assign_var_name}")'
                else:
                    raise exceptions.TestCaseFormatError(f"Invalid setup hook: {hook}")
    
        if teststep.get("request"):
            step_info += make_request_chain_style(teststep["request"])
        elif teststep.get("testcase"):
            testcase = teststep["testcase"]
            call_ref_testcase = f".call({testcase})"
            step_info += call_ref_testcase
    
        if "teardown_hooks" in teststep:
            teardown_hooks = teststep["teardown_hooks"]
            for hook in teardown_hooks:
                if isinstance(hook, Text):
                    step_info += f'.teardown_hook("{hook}")'
                elif isinstance(hook, Dict) and len(hook) == 1:
                    assign_var_name, hook_content = list(hook.items())[0]
                    step_info += f'.teardown_hook("{hook_content}", "{assign_var_name}")'
                else:
                    raise exceptions.TestCaseFormatError(f"Invalid teardown hook: {hook}")
    
        if "extract" in teststep:
            # request step
            step_info += ".extract()"
            for extract_name, extract_path in teststep["extract"].items():
                step_info += f""".with_jmespath('{extract_path}', '{extract_name}')"""
    
        if "export" in teststep:
            # reference testcase step
            export: List[Text] = teststep["export"]
            step_info += f".export(*{export})"
    
        if "validate" in teststep:
            step_info += ".validate()"
    
            for v in teststep["validate"]:
                validator = uniform_validator(v)
                assert_method = validator["assert"]
                check = validator["check"]
                if '"' in check:
                    # e.g. body."user-agent" => 'body."user-agent"'
                    check = f"'{check}'"
                else:
                    check = f'"{check}"'
                expect = validator["expect"]
                if isinstance(expect, Text):
                    expect = f'"{expect}"'
    
                message = validator["message"]
                if message:
                    step_info += f".assert_{assert_method}({check}, {expect}, '{message}')"
                else:
                    step_info += f".assert_{assert_method}({check}, {expect})"
    
        return f"Step({step_info})"
    
    # 生成测试用例文件
    def make_testcase(testcase: Dict, dir_path: Text = None) -> Text:
        """convert valid testcase dict to pytest file path"""
        # ensure compatibility with testcase format v2
        testcase = ensure_testcase_v3(testcase)
    
        # validate testcase format
        load_testcase(testcase)
    
        testcase_abs_path = __ensure_absolute(testcase["config"]["path"])
        logger.info(f"start to make testcase: {testcase_abs_path}")
    
        testcase_python_abs_path, testcase_cls_name = convert_testcase_path(
            testcase_abs_path
        )
        if dir_path:
            testcase_python_abs_path = os.path.join(
                dir_path, os.path.basename(testcase_python_abs_path)
            )
    
        global pytest_files_made_cache_mapping
        if testcase_python_abs_path in pytest_files_made_cache_mapping:
            return testcase_python_abs_path
    
        config = testcase["config"]
        config["path"] = convert_relative_project_root_dir(testcase_python_abs_path)
        config["variables"] = convert_variables(
            config.get("variables", {}), testcase_abs_path
        )
    
        # prepare reference testcase
        imports_list = []
        teststeps = testcase["teststeps"]
        for teststep in teststeps:
            if not teststep.get("testcase"):
                continue
    
            # make ref testcase pytest file
            ref_testcase_path = __ensure_absolute(teststep["testcase"])
            test_content = load_test_file(ref_testcase_path)
    
            if not isinstance(test_content, Dict):
                raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")
    
            # api in v2 format, convert to v3 testcase
            if "request" in test_content and "name" in test_content:
                test_content = ensure_testcase_v3_api(test_content)
    
            test_content.setdefault("config", {})["path"] = ref_testcase_path
            ref_testcase_python_abs_path = make_testcase(test_content)
    
            # override testcase export
            ref_testcase_export: List = test_content["config"].get("export", [])
            if ref_testcase_export:
                step_export: List = teststep.setdefault("export", [])
                step_export.extend(ref_testcase_export)
                teststep["export"] = list(set(step_export))
    
            # prepare ref testcase class name
            ref_testcase_cls_name = pytest_files_made_cache_mapping[
                ref_testcase_python_abs_path
            ]
            teststep["testcase"] = ref_testcase_cls_name
    
            # prepare import ref testcase
            ref_testcase_python_relative_path = convert_relative_project_root_dir(
                ref_testcase_python_abs_path
            )
            ref_module_name, _ = os.path.splitext(ref_testcase_python_relative_path)
            ref_module_name = ref_module_name.replace(os.sep, ".")
            import_expr = f"from {ref_module_name} import TestCase{ref_testcase_cls_name} as {ref_testcase_cls_name}"
            if import_expr not in imports_list:
                imports_list.append(import_expr)
    
        testcase_path = convert_relative_project_root_dir(testcase_abs_path)
        # current file compared to ProjectRootDir
        diff_levels = len(testcase_path.split(os.sep))
    
        data = {
            "version": __version__,
            "testcase_path": testcase_path,
            "diff_levels": diff_levels,
            "class_name": f"TestCase{testcase_cls_name}",
            "imports_list": imports_list,
            "config_chain_style": make_config_chain_style(config),
            "parameters": config.get("parameters"),
            "teststeps_chain_style": [
                make_teststep_chain_style(step) for step in teststeps
            ],
        }
        # 套入数据,替换模板中变量
        content = __TEMPLATE__.render(data)
    
        # ensure new file's directory exists
        dir_path = os.path.dirname(testcase_python_abs_path)
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        # 写入文件
        with open(testcase_python_abs_path, "w", encoding="utf-8") as f:
            f.write(content)
    
        pytest_files_made_cache_mapping[testcase_python_abs_path] = testcase_cls_name
        __ensure_testcase_module(testcase_python_abs_path)
    
        logger.info(f"generated testcase: {testcase_python_abs_path}")
    
        return testcase_python_abs_path
    
    # 生成测试套件目录
    def make_testsuite(testsuite: Dict) -> NoReturn:
        """convert valid testsuite dict to pytest folder with testcases"""
        # validate testsuite format
        load_testsuite(testsuite)
    
        testsuite_config = testsuite["config"]
        testsuite_path = testsuite_config["path"]
        testsuite_variables = convert_variables(
            testsuite_config.get("variables", {}), testsuite_path
        )
    
        logger.info(f"start to make testsuite: {testsuite_path}")
    
        # create directory with testsuite file name, put its testcases under this directory
        testsuite_path = ensure_file_abs_path_valid(testsuite_path)
        testsuite_dir, file_suffix = os.path.splitext(testsuite_path)
        # demo_testsuite.yml => demo_testsuite_yml
        testsuite_dir = f"{testsuite_dir}_{file_suffix.lstrip('.')}"
    
        for testcase in testsuite["testcases"]:
            # get referenced testcase content
            testcase_file = testcase["testcase"]
            testcase_path = __ensure_absolute(testcase_file)
            testcase_dict = load_test_file(testcase_path)
            testcase_dict.setdefault("config", {})
            testcase_dict["config"]["path"] = testcase_path
    
            # override testcase name
            testcase_dict["config"]["name"] = testcase["name"]
            # override base_url
            base_url = testsuite_config.get("base_url") or testcase.get("base_url")
            if base_url:
                testcase_dict["config"]["base_url"] = base_url
            # override verify
            if "verify" in testsuite_config:
                testcase_dict["config"]["verify"] = testsuite_config["verify"]
            # override variables
            # testsuite testcase variables > testsuite config variables
            testcase_variables = convert_variables(
                testcase.get("variables", {}), testcase_path
            )
            testcase_variables = merge_variables(testcase_variables, testsuite_variables)
            # testsuite testcase variables > testcase config variables
            testcase_dict["config"]["variables"] = convert_variables(
                testcase_dict["config"].get("variables", {}), testcase_path
            )
            testcase_dict["config"]["variables"].update(testcase_variables)
    
            # override weight
            if "weight" in testcase:
                testcase_dict["config"]["weight"] = testcase["weight"]
    
            # make testcase
            testcase_pytest_path = make_testcase(testcase_dict, testsuite_dir)
            pytest_files_run_set.add(testcase_pytest_path)
    
    # 生成的测试文件 缓存进入字典
    def __make(tests_path: Text) -> NoReturn:
        """ make testcase(s) with testcase/testsuite/folder absolute path
            generated pytest file path will be cached in pytest_files_made_cache_mapping
    
        Args:
            tests_path: should be in absolute path
    
        """
        logger.info(f"make path: {tests_path}")
        test_files = []
        if os.path.isdir(tests_path):
            files_list = load_folder_files(tests_path)
            test_files.extend(files_list)
        elif os.path.isfile(tests_path):
            test_files.append(tests_path)
        else:
            raise exceptions.TestcaseNotFound(f"Invalid tests path: {tests_path}")
    
        for test_file in test_files:
            if test_file.lower().endswith("_test.py"):
                pytest_files_run_set.add(test_file)
                continue
    
            try:
                test_content = load_test_file(test_file)
            except (exceptions.FileNotFound, exceptions.FileFormatError) as ex:
                logger.warning(f"Invalid test file: {test_file}
    {type(ex).__name__}: {ex}")
                continue
    
            if not isinstance(test_content, Dict):
                logger.warning(
                    f"Invalid test file: {test_file}
    "
                    f"reason: test content not in dict format."
                )
                continue
    
            # api in v2 format, convert to v3 testcase
            if "request" in test_content and "name" in test_content:
                test_content = ensure_testcase_v3_api(test_content)
    
            if "config" not in test_content:
                logger.warning(
                    f"Invalid testcase/testsuite file: {test_file}
    "
                    f"reason: missing config part."
                )
                continue
            elif not isinstance(test_content["config"], Dict):
                logger.warning(
                    f"Invalid testcase/testsuite file: {test_file}
    "
                    f"reason: config should be dict type, got {test_content['config']}"
                )
                continue
    
            # ensure path absolute
            test_content.setdefault("config", {})["path"] = test_file
    
            # testcase
            if "teststeps" in test_content:
                try:
                    testcase_pytest_path = make_testcase(test_content)
                    pytest_files_run_set.add(testcase_pytest_path)
                except exceptions.TestCaseFormatError as ex:
                    logger.warning(
                        f"Invalid testcase file: {test_file}
    {type(ex).__name__}: {ex}"
                    )
                    continue
    
            # testsuite
            elif "testcases" in test_content:
                try:
                    make_testsuite(test_content)
                except exceptions.TestSuiteFormatError as ex:
                    logger.warning(
                        f"Invalid testsuite file: {test_file}
    {type(ex).__name__}: {ex}"
                    )
                    continue
    
            # invalid format
            else:
                logger.warning(
                    f"Invalid test file: {test_file}
    "
                    f"reason: file content is neither testcase nor testsuite"
                )
    
    
    def main_make(tests_paths: List[Text]) -> List[Text]:
        if not tests_paths:
            return []
    
        for tests_path in tests_paths:
            tests_path = ensure_path_sep(tests_path)
            if not os.path.isabs(tests_path): # 如果是个绝对路径 就返回True
                tests_path = os.path.join(os.getcwd(), tests_path)
    
            try:
                __make(tests_path)
            except exceptions.MyBaseError as ex:
                logger.error(ex)
                sys.exit(1)
    
        # format pytest files
        pytest_files_format_list = pytest_files_made_cache_mapping.keys()
        # 可视化所有生成的测试py文件
        format_pytest_with_black(*pytest_files_format_list)
    
        return list(pytest_files_run_set)
    
    
    def init_make_parser(subparsers):
        """ make testcases: parse command line options and run commands.
         add_subparsers() 方法通常不带参数地调用并返回一个特殊的动作对象
        """
        parser = subparsers.add_parser(
            "make", help="Convert YAML/JSON testcases to pytest cases.",
        )
        parser.add_argument(
            "testcase_path", nargs="*", help="Specify YAML/JSON testcase file/folder path"
        )
    
        return parser
    
    作者:zy7y
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文链接,否则保留追究法律责任的权利。
  • 相关阅读:
    linux下gdb常用的调试命令 .
    Programming lessons I learned
    lvalue和rvalue、传值和传引用、木桶
    gnuplot的简明教程——英文版,很不错
    100 的阶乘末尾有多少个0?
    lvalue和rvalue、传值和传引用、木桶
    gnuplot的简明教程——英文版,很不错
    100 的阶乘末尾有多少个0?
    poj1728
    poj1809
  • 原文地址:https://www.cnblogs.com/zy7y/p/15118707.html
Copyright © 2011-2022 走看看