zoukankan      html  css  js  c++  java
  • flask中路由的本质源码分析

    flask中url的本质:

    吧url和视图函数封装到一个Rule对象里面去了,并且吧这个对象添加到url_map中
    Rule={"url":'/index','method':'index'}
    url_map = [{"url":'/index','method':'index'},{"url":'/index','method':'index'}]

    第一步:

    app = Flask(__name__)

    第二步:实例化一个对象,执行构造方法

    if self.has_static_folder:
         self.add_url_rule(self.static_url_path + '/<path:filename>',
                endpoint='static',
                view_func=self.send_static_file)

    第三步:

    @setupmethod
        def add_url_rule(self, rule, endpoint=None, view_func=None, **options):  
    #吧url和视图函数保存到了一个Rule对象中,并且把这个对象添加到了url_map列表中
            rule = self.url_rule_class(rule, methods=methods, **options)
            rule.provide_automatic_options = provide_automatic_options
    
            self.url_map.add(rule)

    第四步:

       url_rule_class = Rule
    
    @implements_to_string
    class Rule(RuleFactory):
    
        def __init__(self, string, defaults=None, subdomain=None, methods=None,
                     build_only=False, endpoint=None, strict_slashes=None,
                     redirect_to=None, alias=False, host=None):
            if not string.startswith('/'):
                raise ValueError('urls must start with a leading slash')
            self.rule = string
            self.is_leaf = not string.endswith('/')
    
            self.map = None
            self.strict_slashes = strict_slashes
            self.subdomain = subdomain
            self.host = host
            self.defaults = defaults
            self.build_only = build_only
            self.alias = alias
            if methods is None:
                self.methods = None
            else:
                if isinstance(methods, str):
                    raise TypeError('param `methods` should be `Iterable[str]`, not `str`')
                self.methods = set([x.upper() for x in methods])
                if 'HEAD' not in self.methods and 'GET' in self.methods:
                    self.methods.add('HEAD')
            self.endpoint = endpoint
            self.redirect_to = redirect_to
    
            if defaults:
                self.arguments = set(map(str, defaults))
            else:
                self.arguments = set()
            self._trace = self._converters = self._regex = self._argument_weights = None
    
        def empty(self):
            """
            Return an unbound copy of this rule.
    
            This can be useful if want to reuse an already bound URL for another
            map.  See ``get_empty_kwargs`` to override what keyword arguments are
            provided to the new copy.
            """
            return type(self)(self.rule, **self.get_empty_kwargs())
    
        def get_empty_kwargs(self):
            """
            Provides kwargs for instantiating empty copy with empty()
    
            Use this method to provide custom keyword arguments to the subclass of
            ``Rule`` when calling ``some_rule.empty()``.  Helpful when the subclass
            has custom keyword arguments that are needed at instantiation.
    
            Must return a ``dict`` that will be provided as kwargs to the new
            instance of ``Rule``, following the initial ``self.rule`` value which
            is always provided as the first, required positional argument.
            """
            defaults = None
            if self.defaults:
                defaults = dict(self.defaults)
            return dict(defaults=defaults, subdomain=self.subdomain,
                        methods=self.methods, build_only=self.build_only,
                        endpoint=self.endpoint, strict_slashes=self.strict_slashes,
                        redirect_to=self.redirect_to, alias=self.alias,
                        host=self.host)
    
        def get_rules(self, map):
            yield self
    
        def refresh(self):
            """Rebinds and refreshes the URL.  Call this if you modified the
            rule in place.
    
            :internal:
            """
            self.bind(self.map, rebind=True)
    
        def bind(self, map, rebind=False):
            """Bind the url to a map and create a regular expression based on
            the information from the rule itself and the defaults from the map.
    
            :internal:
            """
            if self.map is not None and not rebind:
                raise RuntimeError('url rule %r already bound to map %r' %
                                   (self, self.map))
            self.map = map
            if self.strict_slashes is None:
                self.strict_slashes = map.strict_slashes
            if self.subdomain is None:
                self.subdomain = map.default_subdomain
            self.compile()
    
        def get_converter(self, variable_name, converter_name, args, kwargs):
            """Looks up the converter for the given parameter.
    
            .. versionadded:: 0.9
            """
            if converter_name not in self.map.converters:
                raise LookupError('the converter %r does not exist' % converter_name)
            return self.map.converters[converter_name](self.map, *args, **kwargs)
    
        def compile(self):
            """Compiles the regular expression and stores it."""
            assert self.map is not None, 'rule not bound'
    
            if self.map.host_matching:
                domain_rule = self.host or ''
            else:
                domain_rule = self.subdomain or ''
    
            self._trace = []
            self._converters = {}
            self._static_weights = []
            self._argument_weights = []
            regex_parts = []
    
            def _build_regex(rule):
                index = 0
                for converter, arguments, variable in parse_rule(rule):
                    if converter is None:
                        regex_parts.append(re.escape(variable))
                        self._trace.append((False, variable))
                        for part in variable.split('/'):
                            if part:
                                self._static_weights.append((index, -len(part)))
                    else:
                        if arguments:
                            c_args, c_kwargs = parse_converter_args(arguments)
                        else:
                            c_args = ()
                            c_kwargs = {}
                        convobj = self.get_converter(
                            variable, converter, c_args, c_kwargs)
                        regex_parts.append('(?P<%s>%s)' % (variable, convobj.regex))
                        self._converters[variable] = convobj
                        self._trace.append((True, variable))
                        self._argument_weights.append(convobj.weight)
                        self.arguments.add(str(variable))
                    index = index + 1
    
            _build_regex(domain_rule)
            regex_parts.append('\|')
            self._trace.append((False, '|'))
            _build_regex(self.is_leaf and self.rule or self.rule.rstrip('/'))
            if not self.is_leaf:
                self._trace.append((False, '/'))
    
            if self.build_only:
                return
            regex = r'^%s%s$' % (
                u''.join(regex_parts),
                (not self.is_leaf or not self.strict_slashes) and
                '(?<!/)(?P<__suffix__>/?)' or ''
            )
            self._regex = re.compile(regex, re.UNICODE)
    
        def match(self, path, method=None):
            """Check if the rule matches a given path. Path is a string in the
            form ``"subdomain|/path"`` and is assembled by the map.  If
            the map is doing host matching the subdomain part will be the host
            instead.
    
            If the rule matches a dict with the converted values is returned,
            otherwise the return value is `None`.
    
            :internal:
            """
            if not self.build_only:
                m = self._regex.search(path)
                if m is not None:
                    groups = m.groupdict()
                    # we have a folder like part of the url without a trailing
                    # slash and strict slashes enabled. raise an exception that
                    # tells the map to redirect to the same url but with a
                    # trailing slash
                    if self.strict_slashes and not self.is_leaf and 
                            not groups.pop('__suffix__') and 
                            (method is None or self.methods is None or
                             method in self.methods):
                        raise RequestSlash()
                    # if we are not in strict slashes mode we have to remove
                    # a __suffix__
                    elif not self.strict_slashes:
                        del groups['__suffix__']
    
                    result = {}
                    for name, value in iteritems(groups):
                        try:
                            value = self._converters[name].to_python(value)
                        except ValidationError:
                            return
                        result[str(name)] = value
                    if self.defaults:
                        result.update(self.defaults)
    
                    if self.alias and self.map.redirect_defaults:
                        raise RequestAliasRedirect(result)
    
                    return result
    
        def build(self, values, append_unknown=True):
            """Assembles the relative url for that rule and the subdomain.
            If building doesn't work for some reasons `None` is returned.
    
            :internal:
            """
            tmp = []
            add = tmp.append
            processed = set(self.arguments)
            for is_dynamic, data in self._trace:
                if is_dynamic:
                    try:
                        add(self._converters[data].to_url(values[data]))
                    except ValidationError:
                        return
                    processed.add(data)
                else:
                    add(url_quote(to_bytes(data, self.map.charset), safe='/:|+'))
            domain_part, url = (u''.join(tmp)).split(u'|', 1)
    
            if append_unknown:
                query_vars = MultiDict(values)
                for key in processed:
                    if key in query_vars:
                        del query_vars[key]
    
                if query_vars:
                    url += u'?' + url_encode(query_vars, charset=self.map.charset,
                                             sort=self.map.sort_parameters,
                                             key=self.map.sort_key)
    
            return domain_part, url
    
        def provides_defaults_for(self, rule):
            """Check if this rule has defaults for a given rule.
    
            :internal:
            """
            return not self.build_only and self.defaults and 
                self.endpoint == rule.endpoint and self != rule and 
                self.arguments == rule.arguments
    
        def suitable_for(self, values, method=None):
            """Check if the dict of values has enough data for url generation.
    
            :internal:
            """
            # if a method was given explicitly and that method is not supported
            # by this rule, this rule is not suitable.
            if method is not None and self.methods is not None 
               and method not in self.methods:
                return False
    
            defaults = self.defaults or ()
    
            # all arguments required must be either in the defaults dict or
            # the value dictionary otherwise it's not suitable
            for key in self.arguments:
                if key not in defaults and key not in values:
                    return False
    
            # in case defaults are given we ensure taht either the value was
            # skipped or the value is the same as the default value.
            if defaults:
                for key, value in iteritems(defaults):
                    if key in values and value != values[key]:
                        return False
    
            return True
    
        def match_compare_key(self):
           
            return bool(self.arguments), -len(self._static_weights), self._static_weights,
                -len(self._argument_weights), self._argument_weights
    
        def build_compare_key(self):
            """The build compare key for sorting.
    
            :internal:
            """
            return self.alias and 1 or 0, -len(self.arguments), 
                -len(self.defaults or ())
    
        def __eq__(self, other):
            return self.__class__ is other.__class__ and 
                self._trace == other._trace
    
        __hash__ = None
    
        def __ne__(self, other):
            return not self.__eq__(other)
    
        def __str__(self):
            return self.rule
    
        @native_string_result
        def __repr__(self):
            if self.map is None:
                return u'<%s (unbound)>' % self.__class__.__name__
            tmp = []
            for is_dynamic, data in self._trace:
                if is_dynamic:
                    tmp.append(u'<%s>' % data)
                else:
                    tmp.append(data)
            return u'<%s %s%s -> %s>' % (
                self.__class__.__name__,
                repr((u''.join(tmp)).lstrip(u'|')).lstrip(u'u'),
                self.methods is not None
                and u' (%s)' % u', '.join(self.methods)
                or u'',
                self.endpoint
            )




  • 相关阅读:
    Linux IO接口 监控 (iostat)
    linux 防火墙 命令
    _CommandPtr 添加参数 0xC0000005: Access violation writing location 0xcccccccc 错误
    Visual Studio自动关闭
    Linux vsftpd 安装 配置
    linux 挂载外部存储设备 (mount)
    myeclipse 9.0 激活 for win7 redhat mac 亲测
    英文操作系统 Myeclipse Console 乱码问题
    Linux 基本操作命令
    linux 查看系统相关 命令
  • 原文地址:https://www.cnblogs.com/haiyan123/p/8549757.html
Copyright © 2011-2022 走看看