zoukankan      html  css  js  c++  java
  • HttpRunner3源码阅读:5. 参数/函数调用及其值处理

    parser

    上一篇读的loader.py,里面提到的就是文件路径,文件转用例模型、套件模型、加载方法字典,变量写入环境,这篇parser.py主要内容是在解析用例当中引用变量、自定义方法
    变量和方法表达式和实际项目冲突的时候就需要改这个文件了

    可用资料

    https://docs.python.org/zh-cn/3/library/re.html?highlight=re#module-re

    导包

    import ast  # 内置库: 抽象语法树
    import builtins # 内建对象 该模块提供对Python的所有“内置”标识符的直接访问
    import re   # 内置库 正则表达式
    import os   # 内置库 系统
    from typing import Any, Set, Text, Callable, List, Dict, Union
    
    from loguru import logger
    from sentry_sdk import capture_exception
    
    from httprunner import loader, utils, exceptions  
    from httprunner.models import VariablesMapping, FunctionsMapping
    

    源码附注释

    # re.compile 返回一个正则表达式对象, re.I 忽略大小写匹配:忽略大小写匹配
    # 匹配http url 的正则表达式对象
    absolute_http_url_regexp = re.compile(r"^https?://", re.I)
    
    # use $$ to escape $ notation
    dolloar_regex_compile = re.compile(r"$$")
    # variable notation, e.g. ${var} or $var
    # 引用变量 查找 正则表达式
    variable_regex_compile = re.compile(r"${(w+)}|$(w+)")
    # function notation, e.g. ${func1($var_1, $var_3)}  方法
    # (w+) 匹配的函数名  ([$w.-/s=,]*) 匹配的参数
    function_regex_compile = re.compile(r"${(w+)(([$w.-/s=,]*))}")
    
    
    def parse_string_value(str_value: Text) -> Any:
        """ parse string to number if possible
        e.g. "123" => 123
             "12.2" => 12.3
             "abc" => "abc"
             "$var" => "$var"
        """
        try:
            # 字符串转数字,"'123'" 转的则是 123 => str 类型
            return ast.literal_eval(str_value)
        except ValueError:
            return str_value
        except SyntaxError:
            # e.g. $var, ${func}
            return str_value
    
    
    def build_url(base_url, path):
        """拼接请求地址
        base_url => http://www.baidu.com/
        path => /search/name=httprunner
        return => http://www.baidu.com/search/name=httprunner
        """
        
        """ prepend url with base_url unless it's already an absolute URL """
        if absolute_http_url_regexp.match(path):    # 如果是http开头则认为他是完整的url
            return path
        elif base_url:  # 进行url 拼接
            return "{}/{}".format(base_url.rstrip("/"), path.lstrip("/"))
        else:
            raise exceptions.ParamsError("base url missed!")
    
    # 正则表达式查变量
    def regex_findall_variables(raw_string: Text) -> List[Text]:
        """ extract all variable names from content, which is in format $variable
    
        Args:
            raw_string (str): string content
    
        Returns:
            list: variables list extracted from string content
    
        Examples:
            >>> regex_findall_variables("$variable")
            ["variable"]
    
            >>> regex_findall_variables("/blog/$postid")
            ["postid"]
    
            >>> regex_findall_variables("/$var1/$var2")
            ["var1", "var2"]
    
            >>> regex_findall_variables("abc")
            []
    
        """
        try:
            # 返回 $ 在 raw_string 从下标0 开始第一次出现的下标
            match_start_position = raw_string.index("$", 0)
        except ValueError:  # 找不到直接退出这个方法
            return []
    
        vars_list = []
        while match_start_position < len(raw_string):
            # Notice: notation priority
            # $$ > $var
    
            # search $$ 
            dollar_match = dolloar_regex_compile.match(raw_string, match_start_position)
            if dollar_match:
                # 找到了就返回re.match 中的结束下标, 赋值 然后 结束此次循环
                match_start_position = dollar_match.end()
                continue
    
            # search variable like ${var} or $var 找 ${} 、 $x
            var_match = variable_regex_compile.match(raw_string, match_start_position)
            if var_match:
                # age${name}$info => var_name => ${name}
                # var_match.end() 10(下标9)
                var_name = var_match.group(1) or var_match.group(2)
                vars_list.append(var_name)
                match_start_position = var_match.end()
                continue
    
            curr_position = match_start_position
            try:
                # find next $ location # 上述几个if 之后还有的话把从 match_start_position + 1 的下标开始找$出现的坐标继续
                match_start_position = raw_string.index("$", curr_position + 1)
            except ValueError:
                # break while loop
                break
        # 最终得到一个参数列表["${name}","$age",...]
        return vars_list
    
    # 正则查找方法 ${func()}
    def regex_findall_functions(content: Text) -> List[Text]:
        """ extract all functions from string content, which are in format ${fun()}
    
        Args:
            content (str): string content
    
        Returns:
            list: functions list extracted from string content
    
        Examples:
            >>> regex_findall_functions("${func(5)}")
            ["func(5)"]
    
            >>> regex_findall_functions("${func(a=1, b=2)}")
            ["func(a=1, b=2)"]
    
            >>> regex_findall_functions("/api/1000?_t=${get_timestamp()}")
            ["get_timestamp()"]
    
            >>> regex_findall_functions("/api/${add(1, 2)}")
            ["add(1, 2)"]
    
            >>> regex_findall_functions("/api/${add(1, 2)}?_t=${get_timestamp()}")
            ["add(1, 2)", "get_timestamp()"]
            
                result = function_regex_compile.findall("age${name(1,2)}$info${name1()}${aoligei($name)}}")
    print(result)  # [('name', '1,2'), ('name1', ''), ('aoligei', '$name')]
        """
        try:
            # re.findall 返回一个不重复的 pattern 的匹配列表
            return function_regex_compile.findall(content)
        except TypeError as ex:
            capture_exception(ex)
            return []
    
    # 递归提取变量
    def extract_variables(content: Any) -> Set:
        """ extract all variables in content recursively.
        """
        if isinstance(content, (list, set, tuple)):
            variables = set()
            for item in content:
                # 两个集合 并集
                variables = variables | extract_variables(item)
            return variables
    
        elif isinstance(content, dict):
            variables = set()
            for key, value in content.items():
                variables = variables | extract_variables(value)
            return variables
    
        elif isinstance(content, str):
            return set(regex_findall_variables(content))
    
        return set()
    
    # 方法参数解析
    def parse_function_params(params: Text) -> Dict:
        """ parse function params to args and kwargs.
    
        Args:
            params (str): function param in string
    
        Returns:
            dict: function meta dict
    
                {
                    "args": [],
                    "kwargs": {}
                }
    
        Examples:
            >>> parse_function_params("")
            {'args': [], 'kwargs': {}}
    
            >>> parse_function_params("5")
            {'args': [5], 'kwargs': {}}
    
            >>> parse_function_params("1, 2")
            {'args': [1, 2], 'kwargs': {}}
    
            >>> parse_function_params("a=1, b=2")
            {'args': [], 'kwargs': {'a': 1, 'b': 2}}
    
            >>> parse_function_params("1, 2, a=3, b=4")
            {'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
    
        """
        function_meta = {"args": [], "kwargs": {}}
    
        params_str = params.strip()  # 去除首尾空格
        if params_str == "":
            return function_meta
    
        args_list = params_str.split(",") # ,拆分参数列表
        for arg in args_list:
            arg = arg.strip()
            if "=" in arg:  # 关键字入参
                key, value = arg.split("=")
                function_meta["kwargs"][key.strip()] = parse_string_value(value.strip())
            else:
                function_meta["args"].append(parse_string_value(arg))
    
        return function_meta
    
    # 从变量池获取参数
    def get_mapping_variable(
        variable_name: Text, variables_mapping: VariablesMapping
    ) -> Any:
        """ get variable from variables_mapping.
    
        Args:
            variable_name (str): variable name
            variables_mapping (dict): variables mapping
    
        Returns:
            mapping variable value.
    
        Raises:
            exceptions.VariableNotFound: variable is not found.
    
        """
        # TODO: get variable from debugtalk module and environ
        try:
            return variables_mapping[variable_name]
        except KeyError:
            raise exceptions.VariableNotFound(
                f"{variable_name} not found in {variables_mapping}"
            )
    
    # 从方法池里面取方法对象
    def get_mapping_function(
        function_name: Text, functions_mapping: FunctionsMapping
    ) -> Callable:
        """ get function from functions_mapping,
            if not found, then try to check if builtin function.
    
        Args:
            function_name (str): function name
            functions_mapping (dict): functions mapping
    
        Returns:
            mapping function object.
    
        Raises:
            exceptions.FunctionNotFound: function is neither defined in debugtalk.py nor builtin.
    
        """
        if function_name in functions_mapping:
            return functions_mapping[function_name]
         # 参数化
        elif function_name in ["parameterize", "P"]:
            return loader.load_csv_file
         # 环境文件       
        elif function_name in ["environ", "ENV"]:
            return utils.get_os_environ
        # 上传文件
        elif function_name in ["multipart_encoder", "multipart_content_type"]:
            # extension for upload test
            from httprunner.ext import uploader
    
            return getattr(uploader, function_name)
    
        try:
            # 预置方法
            # check if HttpRunner builtin functions
            built_in_functions = loader.load_builtin_functions()
            return built_in_functions[function_name]
        except KeyError:
            pass
    
        try:
            # check if Python builtin functions
            return getattr(builtins, function_name)
        except AttributeError:
            pass
    
        raise exceptions.FunctionNotFound(f"{function_name} is not found.")
    
    # 解析字符串 
    def parse_string(
        raw_string: Text,
        variables_mapping: VariablesMapping,
        functions_mapping: FunctionsMapping,
    ) -> Any:
        """ parse string content with variables and functions mapping.
    
        Args:
            raw_string: raw string content to be parsed.
            variables_mapping: variables mapping.
            functions_mapping: functions mapping.
    
        Returns:
            str: parsed string content.
    
        Examples:
            >>> raw_string = "abc${add_one($num)}def"
            >>> variables_mapping = {"num": 3}
            >>> functions_mapping = {"add_one": lambda x: x + 1}
            >>> parse_string(raw_string, variables_mapping, functions_mapping)
                "abc4def"
    
        """
        try:
            # 1.查找$开头坐标
            match_start_position = raw_string.index("$", 0)
            # 2. 截取到不需要处理的字符串内容
            parsed_string = raw_string[0:match_start_position]
        except ValueError:
            # 上面找不到时出现异常 结束 原封不动的返回字符串所有内容
            parsed_string = raw_string
            return parsed_string
        # 3. 循环查找
        while match_start_position < len(raw_string):
    
            # Notice: notation priority
            # $$ > ${func($a, $b)} > $var
    
            # search $$
            # 4. 从 上面第一次找到$的坐标点开始查找$$,找不到返回None, 找到就返回一个对象
            dollar_match = dolloar_regex_compile.match(raw_string, match_start_position)
            if dollar_match:
                # 5. 返回 $$ 最后$的 坐标 + 1
                match_start_position = dollar_match.end()
                # 6. 不必变化的内容 追加 $
                parsed_string += "$"
                continue
            """4.5.6例子
            str2 = "sdfsadf$${123},$$1123123${func(1,2)}demo"
    i = str2.index("$", 0)
    pars = str2[0:i]
    print(i, pars)  # 7  sdfsadf
    
    # $$
    result = dolloar_regex_compile.match("sdfsadf$${123},$$1123123$", i)
    print(result.end())  # 9
    pars += "$"
    print(str2,pars) # sdfsadf$${123},$$1123123${func(1,2)}demo sdfsadf$
    
            """
    
            # search function like ${func($a, $b)} 都是以$ 开头 匹配 函数引用表达式
            func_match = function_regex_compile.match(raw_string, match_start_position)
            """
            result = function_regex_compile.match("${func1($var_1, $var_3)}", 0)
    print(result.groups())  # ('func1', '$var_1, $var_3')
            """
    
            if func_match:
                # 7. 获取函数名
                func_name = func_match.group(1)
                # 8. 从函数字典中拿到 函数对象
                func = get_mapping_function(func_name, functions_mapping)
                # 9. 得到函数的参数 如 '$var_1, $var_3'
                func_params_str = func_match.group(2)
                # 10. 解析成位置入参/ 关键字入参
                function_meta = parse_function_params(func_params_str)
                args = function_meta["args"] # 顺序入参 传递
                kwargs = function_meta["kwargs"] # 关键字参数传递
                # 11.函数参数 变量转换处理  $name =>  zy7y  
                parsed_args = parse_data(args, variables_mapping, functions_mapping)  # 顺序位置参数 
                parsed_kwargs = parse_data(kwargs, variables_mapping, functions_mapping) # 关键字参数
    
                try:
                    # 12.调用函数(对象),回想apiAutoTest 还是用的内置函数exec 执行 字符函数表达式(欠佳呀)
                    func_eval_value = func(*parsed_args, **parsed_kwargs)
                except Exception as ex:
                    logger.error(
                        f"call function error:
    "
                        f"func_name: {func_name}
    "
                        f"args: {parsed_args}
    "
                        f"kwargs: {parsed_kwargs}
    "
                        f"{type(ex).__name__}: {ex}"
                    )
                    raise
                # 13. 将函数执行结果 和表达式进行替换
                func_raw_str = "${" + func_name + f"({func_params_str})" + "}"
                if func_raw_str == raw_string:
                    # raw_string is a function, e.g. "${add_one(3)}", return its eval value directly
                    return func_eval_value
    
                # raw_string contains one or many functions, e.g. "abc${add_one(3)}def"
                # 14. 包含多个函数表达式 结果转成字符串拼接在不变的位置
                parsed_string += str(func_eval_value)
                # 15. 拿到} 下标 + 1, 下次查找开始位置
                match_start_position = func_match.end()
                continue
    
            # search variable like ${var} or $var
            # 16. 变量查找替换
            var_match = variable_regex_compile.match(raw_string, match_start_position)
            if var_match:
                var_name = var_match.group(1) or var_match.group(2)
                # 17. 从变量池拿到对应值
                var_value = get_mapping_variable(var_name, variables_mapping)
    
                if f"${var_name}" == raw_string or "${" + var_name + "}" == raw_string:
                    # raw_string is a variable, $var or ${var}, return its value directly
                    return var_value
    
                # raw_string contains one or many variables, e.g. "abc${var}def"
                parsed_string += str(var_value)
                match_start_position = var_match.end()
                continue
    
            curr_position = match_start_position
            try:
                # find next $ location
                match_start_position = raw_string.index("$", curr_position + 1)
                # 尾巴不需要处理部分的内容
                remain_string = raw_string[curr_position:match_start_position]
            except ValueError:
                # 从截取最后$ + 1 开始的位置到最后 内容
                remain_string = raw_string[curr_position:]
                # break while loop
                match_start_position = len(raw_string)
            # 处理完成 拼接字符串
            parsed_string += remain_string
    
        return parsed_string
    
    # 处理变量池 变量映射# 变量池换成具体数据
    def parse_data(
        raw_data: Any,
        variables_mapping: VariablesMapping = None,
        functions_mapping: FunctionsMapping = None,
    ) -> Any:
        """ parse raw data with evaluated variables mapping.
            Notice: variables_mapping should not contain any variable or function.
        """
        if isinstance(raw_data, str):
            # content in string format may contains variables and functions
            variables_mapping = variables_mapping or {}
            functions_mapping = functions_mapping or {}
            # only strip whitespaces and tabs, 
    
     is left because they maybe used in changeset
            raw_data = raw_data.strip(" 	")
            # 调用上面的处理字符串
            return parse_string(raw_data, variables_mapping, functions_mapping)
    
        elif isinstance(raw_data, (list, set, tuple)):
            # 列表推导式
            return [
                parse_data(item, variables_mapping, functions_mapping) for item in raw_data
            ]
    
        elif isinstance(raw_data, dict):
            parsed_data = {}
            for key, value in raw_data.items():
                parsed_key = parse_data(key, variables_mapping, functions_mapping)
                parsed_value = parse_data(value, variables_mapping, functions_mapping)
                parsed_data[parsed_key] = parsed_value
    
            return parsed_data
    
        else:
            # other types, e.g. None, int, float, bool
            return raw_data
    
    # 解析变量池
    def parse_variables_mapping(
        variables_mapping: VariablesMapping, functions_mapping: FunctionsMapping = None
    ) -> VariablesMapping:
    
        parsed_variables: VariablesMapping = {}
    
        while len(parsed_variables) != len(variables_mapping):
            for var_name in variables_mapping:
    
                if var_name in parsed_variables:
                    continue
                # 从池子拿到对应value
                var_value = variables_mapping[var_name]
                # 变量名列表
                variables = extract_variables(var_value)  # ["var1", "var2"]
    
                # check if reference variable itself
                if var_name in variables:
                    # e.g.
                    # variables_mapping = {"token": "abc$token"}
                    # variables_mapping = {"key": ["$key", 2]}
                    raise exceptions.VariableNotFound(var_name)
    
                # check if reference variable not in variables_mapping
                not_defined_variables = [
                    v_name for v_name in variables if v_name not in variables_mapping
                ]
                if not_defined_variables:
                    # e.g. {"varA": "123$varB", "varB": "456$varC"}
                    # e.g. {"varC": "${sum_two($a, $b)}"}
                    raise exceptions.VariableNotFound(not_defined_variables)
    
                try:
                    # 返回实际变量对应的值
                    parsed_value = parse_data(
                        var_value, parsed_variables, functions_mapping
                    )
                except exceptions.VariableNotFound:
                    continue
                # 返回解析变量池
                parsed_variables[var_name] = parsed_value
    
        return parsed_variables
    
    
    # 笛卡尔积生成 + 解析参数
    def parse_parameters(parameters: Dict,) -> List[Dict]:
        """ parse parameters and generate cartesian product.
    
        Args:
            parameters (Dict) parameters: parameter name and value mapping
                parameter value may be in three types:
                    (1) data list, e.g. ["iOS/10.1", "iOS/10.2", "iOS/10.3"]
                    (2) call built-in parameterize function, "${parameterize(account.csv)}"
                    (3) call custom function in debugtalk.py, "${gen_app_version()}"
    
        Returns:
            list: cartesian product list
    
        Examples:
            >>> parameters = {
                "user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"],
                "username-password": "${parameterize(account.csv)}",
                "app_version": "${gen_app_version()}",
            }
            >>> parse_parameters(parameters)
    
        """
        parsed_parameters_list: List[List[Dict]] = []
    
        # load project_meta functions
        project_meta = loader.load_project_meta(os.getcwd())
        functions_mapping = project_meta.functions
    
        for parameter_name, parameter_content in parameters.items():
            parameter_name_list = parameter_name.split("-")
    
            if isinstance(parameter_content, List):
                # (1) data list
                # e.g. {"app_version": ["2.8.5", "2.8.6"]}
                #       => [{"app_version": "2.8.5", "app_version": "2.8.6"}]
                # e.g. {"username-password": [["user1", "111111"], ["test2", "222222"]}
                #       => [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
                parameter_content_list: List[Dict] = []
                for parameter_item in parameter_content:
                    if not isinstance(parameter_item, (list, tuple)):
                        # "2.8.5" => ["2.8.5"]
                        parameter_item = [parameter_item]
    
                    # ["app_version"], ["2.8.5"] => {"app_version": "2.8.5"}
                    # ["username", "password"], ["user1", "111111"] => {"username": "user1", "password": "111111"}
                    parameter_content_dict = dict(zip(parameter_name_list, parameter_item))
                    parameter_content_list.append(parameter_content_dict)
    
            elif isinstance(parameter_content, Text):
                # (2) & (3)
                parsed_parameter_content: List = parse_data(
                    parameter_content, {}, functions_mapping
                )
                if not isinstance(parsed_parameter_content, List):
                    raise exceptions.ParamsError(
                        f"parameters content should be in List type, got {parsed_parameter_content} for {parameter_content}"
                    )
    
                parameter_content_list: List[Dict] = []
                for parameter_item in parsed_parameter_content:
                    if isinstance(parameter_item, Dict):
                        # get subset by parameter name
                        # {"app_version": "${gen_app_version()}"}
                        # gen_app_version() => [{'app_version': '2.8.5'}, {'app_version': '2.8.6'}]
                        # {"username-password": "${get_account()}"}
                        # get_account() => [
                        #       {"username": "user1", "password": "111111"},
                        #       {"username": "user2", "password": "222222"}
                        # ]
                        parameter_dict: Dict = {
                            key: parameter_item[key] for key in parameter_name_list
                        }
                    elif isinstance(parameter_item, (List, tuple)):
                        if len(parameter_name_list) == len(parameter_item):
                            # {"username-password": "${get_account()}"}
                            # get_account() => [("user1", "111111"), ("user2", "222222")]
                            parameter_dict = dict(zip(parameter_name_list, parameter_item))
                        else:
                            raise exceptions.ParamsError(
                                f"parameter names length are not equal to value length.
    "
                                f"parameter names: {parameter_name_list}
    "
                                f"parameter values: {parameter_item}"
                            )
                    elif len(parameter_name_list) == 1:
                        # {"user_agent": "${get_user_agent()}"}
                        # get_user_agent() => ["iOS/10.1", "iOS/10.2"]
                        # parameter_dict will get: {"user_agent": "iOS/10.1", "user_agent": "iOS/10.2"}
                        parameter_dict = {parameter_name_list[0]: parameter_item}
                    else:
                        raise exceptions.ParamsError(
                            f"Invalid parameter names and values:
    "
                            f"parameter names: {parameter_name_list}
    "
                            f"parameter values: {parameter_item}"
                        )
    
                    parameter_content_list.append(parameter_dict)
    
            else:
                raise exceptions.ParamsError(
                    f"parameter content should be List or Text(variables or functions call), got {parameter_content}"
                )
    
            parsed_parameters_list.append(parameter_content_list)
    
        return utils.gen_cartesian_product(*parsed_parameters_list)
    
    

    最后

    这一节很多地方没理解到为什么要怎么处理, 但确实被调用自定义函数给亮到了

    • parse_variables_mapping
    • parse_parameters
      留下的问题 后面整体运行遇到了在回来看吧...
    作者:zy7y
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文链接,否则保留追究法律责任的权利。
  • 相关阅读:
    jquery 学习笔记
    session
    六、线程中断机制
    二、CompletableFuture(一)基础概念
    四、常见的锁
    五、synchronized细节
    三、CompletableFuture(二)常见用法
    七、等待唤醒的三种方式
    序列化 和 反序列化
    Trigger
  • 原文地址:https://www.cnblogs.com/zy7y/p/15104337.html
Copyright © 2011-2022 走看看