zoukankan      html  css  js  c++  java
  • HttpRunner3源码阅读:8. 用例文件生成并格式化make

    make

    这个文件中主要实现了 相关的文件生成,目录生成make.py,其实这个文件应该在client.py前看的

    可用资料

    jinja2[模板,常用于Python Web 前后端不分离]: https://jinja.palletsprojects.com/en/3.0.x/intro/
    black[代码格式化]: https://black.readthedocs.io/en/stable/
    from string import Template 也可实现字符串模板替换
    string包下的Template: https://docs.python.org/3/library/string.html#format-string-syntax
    argparse[命令行选项]:https://docs.python.org/zh-cn/3/library/argparse.html

    导包

    import os
    import string
    import subprocess
    import sys
    from typing import Text, List, Tuple, Dict, Set, NoReturn
    
    # 模板库,写过flask 的应该很熟悉,其灵感来自django
    import jinja2
    from loguru import logger
    from sentry_sdk import capture_exception
    
    from httprunner import exceptions, __version__
    # v2 v3 版本兼容
    from httprunner.compat import (
        ensure_testcase_v3_api,
        ensure_testcase_v3,
        convert_variables,
        ensure_path_sep,
    )
    # 导文件处理的
    from httprunner.loader import (
        load_folder_files,
        load_test_file,
        load_testcase,
        load_testsuite,
        load_project_meta,
        convert_relative_project_root_dir,
    )
    # 验证器
    from httprunner.response import uniform_validator
    # 变量合并,多进程处理
    from httprunner.utils import merge_variables, is_support_multiprocessing
    
    

    源码附注释

    
    """ cache converted pytest files, avoid duplicate making
    pytest测试文件,缓存字典
    """
    pytest_files_made_cache_mapping: Dict[Text, Text] = {}
    
    """ save generated pytest files to run, except referenced testcase
        运行集合,排除引入的测试用例
    """
    pytest_files_run_set: Set = set()
    # httprunner3, test_xx.py 模板
    __TEMPLATE__ = jinja2.Template(
        """# NOTE: Generated By HttpRunner v{{ version }}
    # FROM: {{ testcase_path }}
    
    {% if imports_list and diff_levels > 0 %}
    import sys
    from pathlib import Path
    sys.path.insert(0, str(Path(__file__){% for _ in range(diff_levels) %}.parent{% endfor %}))
    {% endif %}
    
    {% if parameters %}
    import pytest
    from httprunner import Parameters
    {% endif %}
    
    from httprunner import HttpRunner, Config, Step, RunRequest, RunTestCase
    {% for import_str in imports_list %}
    {{ import_str }}
    {% endfor %}
    
    class {{ class_name }}(HttpRunner):
    
        {% if parameters %}
        @pytest.mark.parametrize("param", Parameters({{parameters}}))
        def test_start(self, param):
            super().test_start(param)
        {% endif %}
    
        config = {{ config_chain_style }}
    
        teststeps = [
            {% for step_chain_style in teststeps_chain_style %}
                {{ step_chain_style }},
            {% endfor %}
        ]
    
    if __name__ == "__main__":
        {{ class_name }}().test_start()
    
    """
    )
    
    # 不同操作系统地址符 兼容处理, 返回绝对地址
    def __ensure_absolute(path: Text) -> Text:
        if path.startswith("./"):
            # Linux/Darwin, hrun ./test.yml
            path = path[len("./") :]
        elif path.startswith(".\"):
            # Windows, hrun .\test.yml
            path = path[len(".\") :]
    
        path = ensure_path_sep(path)
        project_meta = load_project_meta(path)
    
        if os.path.isabs(path):
            absolute_path = path
        else:
            absolute_path = os.path.join(project_meta.RootDir, path)
    
        if not os.path.isfile(absolute_path):
            logger.error(f"Invalid testcase file path: {absolute_path}")
            sys.exit(1)
    
        return absolute_path
    
    
    def ensure_file_abs_path_valid(file_abs_path: Text) -> Text:
        """ ensure file path valid for pytest, handle cases when directory name includes dot/hyphen/space
                确保地址有效
        Args:
            file_abs_path: absolute file path
    
        Returns:
            ensured valid absolute file path
    
        """
        project_meta = load_project_meta(file_abs_path)
        raw_abs_file_name, file_suffix = os.path.splitext(file_abs_path)
        file_suffix = file_suffix.lower()
    
        raw_file_relative_name = convert_relative_project_root_dir(raw_abs_file_name)
        if raw_file_relative_name == "":
            return file_abs_path
    
        path_names = []
        for name in raw_file_relative_name.rstrip(os.sep).split(os.sep):
            # 包含数字0~9的字符串 string.digits
            if name[0] in string.digits:
                # ensure file name not startswith digit
                # 19 => T19, 2C => T2C
                name = f"T{name}"
    
            if name.startswith("."):
                # avoid ".csv" been converted to "_csv"
                pass
            else:
                # handle cases when directory name includes dot/hyphen/space
                name = name.replace(" ", "_").replace(".", "_").replace("-", "_")
    
            path_names.append(name)
    
        new_file_path = os.path.join(
            project_meta.RootDir, f"{os.sep.join(path_names)}{file_suffix}"
        )
        return new_file_path
    
    # 按需生成测试模块 加让 __init__.py
    def __ensure_testcase_module(path: Text) -> NoReturn:
        """ ensure pytest files are in python module, generate __init__.py on demand
        """
        init_file = os.path.join(os.path.dirname(path), "__init__.py")
        if os.path.isfile(init_file):
            return
    
        with open(init_file, "w", encoding="utf-8") as f:
            f.write("# NOTICE: Generated By HttpRunner. DO NOT EDIT!
    ")
    
    
    def convert_testcase_path(testcase_abs_path: Text) -> Tuple[Text, Text]:
        """convert single YAML/JSON testcase path to python file"""
        testcase_new_path = ensure_file_abs_path_valid(testcase_abs_path)
    
        dir_path = os.path.dirname(testcase_new_path)
        file_name, _ = os.path.splitext(os.path.basename(testcase_new_path))
        testcase_python_abs_path = os.path.join(dir_path, f"{file_name}_test.py")
    
        # convert title case, e.g. request_with_variables => RequestWithVariables
        name_in_title_case = file_name.title().replace("_", "")
    
        return testcase_python_abs_path, name_in_title_case
    
    
    def format_pytest_with_black(*python_paths: Text) -> NoReturn:
        # 格式化Python文件 Black
        logger.info("format pytest cases with black ...")
        try:
            if is_support_multiprocessing() or len(python_paths) <= 1:
                subprocess.run(["black", *python_paths])
            else:
                logger.warning(
                    f"this system does not support multiprocessing well, format files one by one ..."
                )
                # 运行被 arg 描述的指令。等待指令完成,然后返回一个 CompletedProcess 实例。subprocess.run
                [subprocess.run(["black", path]) for path in python_paths]
        except subprocess.CalledProcessError as ex:
            capture_exception(ex)
            logger.error(ex)
            sys.exit(1)
        except FileNotFoundError:
            err_msg = """
    missing dependency tool: black
    install black manually and try again:
    $ pip install black
    """
            logger.error(err_msg)
            sys.exit(1)
    
    
    def make_config_chain_style(config: Dict) -> Text:
        # 模板文件中Config 格式化
        config_chain_style = f'Config("{config["name"]}")'
    
        if config["variables"]:
            variables = config["variables"]
            config_chain_style += f".variables(**{variables})"
    
        if "base_url" in config:
            config_chain_style += f'.base_url("{config["base_url"]}")'
    
        if "verify" in config:
            config_chain_style += f'.verify({config["verify"]})'
    
        if "export" in config:
            config_chain_style += f'.export(*{config["export"]})'
    
        if "weight" in config:
            config_chain_style += f'.locust_weight({config["weight"]})'
    
        return config_chain_style
    
    
    def make_request_chain_style(request: Dict) -> Text:
        # Reuests部分格式化
        method = request["method"].lower()
        url = request["url"]
        request_chain_style = f'.{method}("{url}")'
    
        if "params" in request:
            params = request["params"]
            request_chain_style += f".with_params(**{params})"
    
        if "headers" in request:
            headers = request["headers"]
            request_chain_style += f".with_headers(**{headers})"
    
        if "cookies" in request:
            cookies = request["cookies"]
            request_chain_style += f".with_cookies(**{cookies})"
    
        if "data" in request:
            data = request["data"]
            if isinstance(data, Text):
                data = f'"{data}"'
            request_chain_style += f".with_data({data})"
    
        if "json" in request:
            req_json = request["json"]
            if isinstance(req_json, Text):
                req_json = f'"{req_json}"'
            request_chain_style += f".with_json({req_json})"
    
        if "timeout" in request:
            timeout = request["timeout"]
            request_chain_style += f".set_timeout({timeout})"
    
        if "verify" in request:
            verify = request["verify"]
            request_chain_style += f".set_verify({verify})"
    
        if "allow_redirects" in request:
            allow_redirects = request["allow_redirects"]
            request_chain_style += f".set_allow_redirects({allow_redirects})"
    
        if "upload" in request:
            upload = request["upload"]
            request_chain_style += f".upload(**{upload})"
    
        return request_chain_style
    
    
    def make_teststep_chain_style(teststep: Dict) -> Text:
        # Step 部分格式化
        if teststep.get("request"):  # 全新请求
            step_info = f'RunRequest("{teststep["name"]}")'
        elif teststep.get("testcase"):  # 调用其他测试用例
            step_info = f'RunTestCase("{teststep["name"]}")'
        else:
            raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")
    
        if "variables" in teststep:
            variables = teststep["variables"]
            step_info += f".with_variables(**{variables})"
    
        if "setup_hooks" in teststep:
            setup_hooks = teststep["setup_hooks"]
            for hook in setup_hooks:
                if isinstance(hook, Text):
                    step_info += f'.setup_hook("{hook}")'
                elif isinstance(hook, Dict) and len(hook) == 1:
                    assign_var_name, hook_content = list(hook.items())[0]
                    step_info += f'.setup_hook("{hook_content}", "{assign_var_name}")'
                else:
                    raise exceptions.TestCaseFormatError(f"Invalid setup hook: {hook}")
    
        if teststep.get("request"):
            step_info += make_request_chain_style(teststep["request"])
        elif teststep.get("testcase"):
            testcase = teststep["testcase"]
            call_ref_testcase = f".call({testcase})"
            step_info += call_ref_testcase
    
        if "teardown_hooks" in teststep:
            teardown_hooks = teststep["teardown_hooks"]
            for hook in teardown_hooks:
                if isinstance(hook, Text):
                    step_info += f'.teardown_hook("{hook}")'
                elif isinstance(hook, Dict) and len(hook) == 1:
                    assign_var_name, hook_content = list(hook.items())[0]
                    step_info += f'.teardown_hook("{hook_content}", "{assign_var_name}")'
                else:
                    raise exceptions.TestCaseFormatError(f"Invalid teardown hook: {hook}")
    
        if "extract" in teststep:
            # request step
            step_info += ".extract()"
            for extract_name, extract_path in teststep["extract"].items():
                step_info += f""".with_jmespath('{extract_path}', '{extract_name}')"""
    
        if "export" in teststep:
            # reference testcase step
            export: List[Text] = teststep["export"]
            step_info += f".export(*{export})"
    
        if "validate" in teststep:
            step_info += ".validate()"
    
            for v in teststep["validate"]:
                validator = uniform_validator(v)
                assert_method = validator["assert"]
                check = validator["check"]
                if '"' in check:
                    # e.g. body."user-agent" => 'body."user-agent"'
                    check = f"'{check}'"
                else:
                    check = f'"{check}"'
                expect = validator["expect"]
                if isinstance(expect, Text):
                    expect = f'"{expect}"'
    
                message = validator["message"]
                if message:
                    step_info += f".assert_{assert_method}({check}, {expect}, '{message}')"
                else:
                    step_info += f".assert_{assert_method}({check}, {expect})"
    
        return f"Step({step_info})"
    
    # 生成测试用例文件
    def make_testcase(testcase: Dict, dir_path: Text = None) -> Text:
        """convert valid testcase dict to pytest file path"""
        # ensure compatibility with testcase format v2
        testcase = ensure_testcase_v3(testcase)
    
        # validate testcase format
        load_testcase(testcase)
    
        testcase_abs_path = __ensure_absolute(testcase["config"]["path"])
        logger.info(f"start to make testcase: {testcase_abs_path}")
    
        testcase_python_abs_path, testcase_cls_name = convert_testcase_path(
            testcase_abs_path
        )
        if dir_path:
            testcase_python_abs_path = os.path.join(
                dir_path, os.path.basename(testcase_python_abs_path)
            )
    
        global pytest_files_made_cache_mapping
        if testcase_python_abs_path in pytest_files_made_cache_mapping:
            return testcase_python_abs_path
    
        config = testcase["config"]
        config["path"] = convert_relative_project_root_dir(testcase_python_abs_path)
        config["variables"] = convert_variables(
            config.get("variables", {}), testcase_abs_path
        )
    
        # prepare reference testcase
        imports_list = []
        teststeps = testcase["teststeps"]
        for teststep in teststeps:
            if not teststep.get("testcase"):
                continue
    
            # make ref testcase pytest file
            ref_testcase_path = __ensure_absolute(teststep["testcase"])
            test_content = load_test_file(ref_testcase_path)
    
            if not isinstance(test_content, Dict):
                raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")
    
            # api in v2 format, convert to v3 testcase
            if "request" in test_content and "name" in test_content:
                test_content = ensure_testcase_v3_api(test_content)
    
            test_content.setdefault("config", {})["path"] = ref_testcase_path
            ref_testcase_python_abs_path = make_testcase(test_content)
    
            # override testcase export
            ref_testcase_export: List = test_content["config"].get("export", [])
            if ref_testcase_export:
                step_export: List = teststep.setdefault("export", [])
                step_export.extend(ref_testcase_export)
                teststep["export"] = list(set(step_export))
    
            # prepare ref testcase class name
            ref_testcase_cls_name = pytest_files_made_cache_mapping[
                ref_testcase_python_abs_path
            ]
            teststep["testcase"] = ref_testcase_cls_name
    
            # prepare import ref testcase
            ref_testcase_python_relative_path = convert_relative_project_root_dir(
                ref_testcase_python_abs_path
            )
            ref_module_name, _ = os.path.splitext(ref_testcase_python_relative_path)
            ref_module_name = ref_module_name.replace(os.sep, ".")
            import_expr = f"from {ref_module_name} import TestCase{ref_testcase_cls_name} as {ref_testcase_cls_name}"
            if import_expr not in imports_list:
                imports_list.append(import_expr)
    
        testcase_path = convert_relative_project_root_dir(testcase_abs_path)
        # current file compared to ProjectRootDir
        diff_levels = len(testcase_path.split(os.sep))
    
        data = {
            "version": __version__,
            "testcase_path": testcase_path,
            "diff_levels": diff_levels,
            "class_name": f"TestCase{testcase_cls_name}",
            "imports_list": imports_list,
            "config_chain_style": make_config_chain_style(config),
            "parameters": config.get("parameters"),
            "teststeps_chain_style": [
                make_teststep_chain_style(step) for step in teststeps
            ],
        }
        # 套入数据,替换模板中变量
        content = __TEMPLATE__.render(data)
    
        # ensure new file's directory exists
        dir_path = os.path.dirname(testcase_python_abs_path)
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        # 写入文件
        with open(testcase_python_abs_path, "w", encoding="utf-8") as f:
            f.write(content)
    
        pytest_files_made_cache_mapping[testcase_python_abs_path] = testcase_cls_name
        __ensure_testcase_module(testcase_python_abs_path)
    
        logger.info(f"generated testcase: {testcase_python_abs_path}")
    
        return testcase_python_abs_path
    
    # 生成测试套件目录
    def make_testsuite(testsuite: Dict) -> NoReturn:
        """convert valid testsuite dict to pytest folder with testcases"""
        # validate testsuite format
        load_testsuite(testsuite)
    
        testsuite_config = testsuite["config"]
        testsuite_path = testsuite_config["path"]
        testsuite_variables = convert_variables(
            testsuite_config.get("variables", {}), testsuite_path
        )
    
        logger.info(f"start to make testsuite: {testsuite_path}")
    
        # create directory with testsuite file name, put its testcases under this directory
        testsuite_path = ensure_file_abs_path_valid(testsuite_path)
        testsuite_dir, file_suffix = os.path.splitext(testsuite_path)
        # demo_testsuite.yml => demo_testsuite_yml
        testsuite_dir = f"{testsuite_dir}_{file_suffix.lstrip('.')}"
    
        for testcase in testsuite["testcases"]:
            # get referenced testcase content
            testcase_file = testcase["testcase"]
            testcase_path = __ensure_absolute(testcase_file)
            testcase_dict = load_test_file(testcase_path)
            testcase_dict.setdefault("config", {})
            testcase_dict["config"]["path"] = testcase_path
    
            # override testcase name
            testcase_dict["config"]["name"] = testcase["name"]
            # override base_url
            base_url = testsuite_config.get("base_url") or testcase.get("base_url")
            if base_url:
                testcase_dict["config"]["base_url"] = base_url
            # override verify
            if "verify" in testsuite_config:
                testcase_dict["config"]["verify"] = testsuite_config["verify"]
            # override variables
            # testsuite testcase variables > testsuite config variables
            testcase_variables = convert_variables(
                testcase.get("variables", {}), testcase_path
            )
            testcase_variables = merge_variables(testcase_variables, testsuite_variables)
            # testsuite testcase variables > testcase config variables
            testcase_dict["config"]["variables"] = convert_variables(
                testcase_dict["config"].get("variables", {}), testcase_path
            )
            testcase_dict["config"]["variables"].update(testcase_variables)
    
            # override weight
            if "weight" in testcase:
                testcase_dict["config"]["weight"] = testcase["weight"]
    
            # make testcase
            testcase_pytest_path = make_testcase(testcase_dict, testsuite_dir)
            pytest_files_run_set.add(testcase_pytest_path)
    
    # 生成的测试文件 缓存进入字典
    def __make(tests_path: Text) -> NoReturn:
        """ make testcase(s) with testcase/testsuite/folder absolute path
            generated pytest file path will be cached in pytest_files_made_cache_mapping
    
        Args:
            tests_path: should be in absolute path
    
        """
        logger.info(f"make path: {tests_path}")
        test_files = []
        if os.path.isdir(tests_path):
            files_list = load_folder_files(tests_path)
            test_files.extend(files_list)
        elif os.path.isfile(tests_path):
            test_files.append(tests_path)
        else:
            raise exceptions.TestcaseNotFound(f"Invalid tests path: {tests_path}")
    
        for test_file in test_files:
            if test_file.lower().endswith("_test.py"):
                pytest_files_run_set.add(test_file)
                continue
    
            try:
                test_content = load_test_file(test_file)
            except (exceptions.FileNotFound, exceptions.FileFormatError) as ex:
                logger.warning(f"Invalid test file: {test_file}
    {type(ex).__name__}: {ex}")
                continue
    
            if not isinstance(test_content, Dict):
                logger.warning(
                    f"Invalid test file: {test_file}
    "
                    f"reason: test content not in dict format."
                )
                continue
    
            # api in v2 format, convert to v3 testcase
            if "request" in test_content and "name" in test_content:
                test_content = ensure_testcase_v3_api(test_content)
    
            if "config" not in test_content:
                logger.warning(
                    f"Invalid testcase/testsuite file: {test_file}
    "
                    f"reason: missing config part."
                )
                continue
            elif not isinstance(test_content["config"], Dict):
                logger.warning(
                    f"Invalid testcase/testsuite file: {test_file}
    "
                    f"reason: config should be dict type, got {test_content['config']}"
                )
                continue
    
            # ensure path absolute
            test_content.setdefault("config", {})["path"] = test_file
    
            # testcase
            if "teststeps" in test_content:
                try:
                    testcase_pytest_path = make_testcase(test_content)
                    pytest_files_run_set.add(testcase_pytest_path)
                except exceptions.TestCaseFormatError as ex:
                    logger.warning(
                        f"Invalid testcase file: {test_file}
    {type(ex).__name__}: {ex}"
                    )
                    continue
    
            # testsuite
            elif "testcases" in test_content:
                try:
                    make_testsuite(test_content)
                except exceptions.TestSuiteFormatError as ex:
                    logger.warning(
                        f"Invalid testsuite file: {test_file}
    {type(ex).__name__}: {ex}"
                    )
                    continue
    
            # invalid format
            else:
                logger.warning(
                    f"Invalid test file: {test_file}
    "
                    f"reason: file content is neither testcase nor testsuite"
                )
    
    
    def main_make(tests_paths: List[Text]) -> List[Text]:
        if not tests_paths:
            return []
    
        for tests_path in tests_paths:
            tests_path = ensure_path_sep(tests_path)
            if not os.path.isabs(tests_path): # 如果是个绝对路径 就返回True
                tests_path = os.path.join(os.getcwd(), tests_path)
    
            try:
                __make(tests_path)
            except exceptions.MyBaseError as ex:
                logger.error(ex)
                sys.exit(1)
    
        # format pytest files
        pytest_files_format_list = pytest_files_made_cache_mapping.keys()
        # 可视化所有生成的测试py文件
        format_pytest_with_black(*pytest_files_format_list)
    
        return list(pytest_files_run_set)
    
    
    def init_make_parser(subparsers):
        """ make testcases: parse command line options and run commands.
         add_subparsers() 方法通常不带参数地调用并返回一个特殊的动作对象
        """
        parser = subparsers.add_parser(
            "make", help="Convert YAML/JSON testcases to pytest cases.",
        )
        parser.add_argument(
            "testcase_path", nargs="*", help="Specify YAML/JSON testcase file/folder path"
        )
    
        return parser
    
    作者:zy7y
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文链接,否则保留追究法律责任的权利。
  • 相关阅读:
    SAP S/4HANA extensibility扩展原理介绍
    SAP CRM系统订单模型的设计与实现
    使用nodejs代码在SAP C4C里创建Individual customer
    SAP Cloud for Customer Account和individual customer的区别
    Let the Balloon Rise map一个数组
    How Many Tables 简单并查集
    Heap Operations 优先队列
    Arpa’s obvious problem and Mehrdad’s terrible solution 思维
    Passing the Message 单调栈两次
    The Suspects 并查集
  • 原文地址:https://www.cnblogs.com/zy7y/p/15118707.html
Copyright © 2011-2022 走看看