zoukankan      html  css  js  c++  java
  • OpenStack源码分析 Neutron源码分析(一)-----------Restful API篇

    原文:https://blog.csdn.net/happyanger6/article/details/54586463

    首先,先分析WSGI应用的实现。

        由前面的文章http://blog.csdn.net/happyanger6/article/details/54518491可知,WSGI应用的构建过程主要就是通过paste库的loadapp加载,因此核心就是分析这个过程。我们从neutron-server的起始代码开始逐步分析。

    neutron-server的入口是:

    neutron/cmd/eventlet/server/__init__.py:main

    def main():
        server.boot_server(_main_neutron_server)

    boot_server在neutron/server/__init__.py中,它主要的功能就是解析命令行指定的配置文件,一般是"--config-file=/etc/neutron/neutron.conf",然后就执行_main_neutron_server。

    neutron/cmd/eventlet/server/__init__.py::_main_neutron_serve

    def _main_neutron_server():
        if cfg.CONF.web_framework == 'legacy':
            wsgi_eventlet.eventlet_wsgi_server()
        else:
            wsgi_pecan.pecan_wsgi_server()
    可以看到,接下来根据配置文件中配置的web框架方式,决定如何启动wsgi_server,传统的方式是通过eventlet,现在又新加入了pecan方式。默认情况下,还是使用的eventlet方式,因此接着分析eventlet_wsig_server。这并不响应我们分析WSGI应用的代码,因为这属于WSGI服务器的部分。

    neutron/server/wsgi_eventlet.py:

    def eventlet_wsgi_server():
        neutron_api = service.serve_wsgi(service.NeutronApiService)
        start_api_and_rpc_workers(neutron_api)
    这里也能看到,核心功能一部分是WSGI,另一部分就是rpc部分。这里将Netron提供的API功能封装成了NeutronApiService类。我们来看看serve_wsgi:

    neutron/service.py:

    def serve_wsgi(cls):
    
        try:
            service = cls.create()
            service.start()
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE('Unrecoverable error: please check log '
                                  'for details.'))
    
        return service
    很明显,这是用NeutronApiService的类方法"create"来创建实例,然后"start"启动服务。接着分析下NeutronApiService的代码:

    neutron/service.py:

    class NeutronApiService(WsgiService):
        """Class for neutron-api service."""
    
        @classmethod
        def create(cls, app_name='neutron'):
    
            # Setup logging early, supplying both the CLI options and the
            # configuration mapping from the config file
            # We only update the conf dict for the verbose and debug
            # flags. Everything else must be set up in the conf file...
            # Log the options used when starting if we're in debug mode...
    
            config.setup_logging()
            service = cls(app_name)
            return service
    可以看到NeutronApiService继承自"WsgiService",表明其是一个WSGI服务。然后类方法"create"构造了其实例并返回。
    class WsgiService(object):
        """Base class for WSGI based services.
    
        For each api you define, you must also define these flags:
        :<api>_listen: The address on which to listen
        :<api>_listen_port: The port on which to listen
    
        """
    
        def __init__(self, app_name):
            self.app_name = app_name
            self.wsgi_app = None
    
        def start(self):
            self.wsgi_app = _run_wsgi(self.app_name)
    
        def wait(self):
            self.wsgi_app.wait()
    构造过程很简单,只是简单的记录app_name,这里是"neutron",然后在start函数里真正加载WSGI APP,并运行服务,因此这才是我们分析的开始。
    def _run_wsgi(app_name):
        app = config.load_paste_app(app_name)
        if not app:
            LOG.error(_LE('No known API applications configured.'))
            return
        return run_wsgi_app(app)
    load_paste_app从函数名,也可以明白它的作用就是加载paste定义的WSGI应用。

    neutron/commom/config.py:

    def load_paste_app(app_name):
        """Builds and returns a WSGI app from a paste config file.
    
        :param app_name: Name of the application to load
        """
        loader = wsgi.Loader(cfg.CONF)
        app = loader.load_app(app_name)
        return app
    wsgi.Loader是从neutron.conf中读取deploy配置文件的路径,然后根据指定的配置文件来加载app,默认是"/etc/neutron/api-paste.ini"。然后通过deploy.loadapp来加载app,这个deploy就是PasteDeploy。
    
    
    oslo_service/wsgi.py:
    def load_app(self, name):
        """Return the paste URLMap wrapped WSGI application.
    
        :param name: Name of the application to load.
        :returns: Paste URLMap object wrapping the requested application.
        :raises: PasteAppNotFound
    
        """
        try:
            LOG.debug("Loading app %(name)s from %(path)s",
                      {'name': name, 'path': self.config_path})
            return deploy.loadapp("config:%s" % self.config_path, name=name)
        except LookupError:
            LOG.exception(_LE("Couldn't lookup app: %s"), name)
            raise PasteAppNotFound(name=name, path=self.config_path)

    分析到这里可知,后面app的加载过程就是PasteDeploy的加载过程,有了上篇http://blog.csdn.net/happyanger6/article/details/54518491文章中的基础,我们对着源码来理解:

    先来看下配置文件"/etc/neutron/api-paste.ini":

    [composite:neutron]
    use = egg:Paste#urlmap
    /: neutronversions
    /v2.0: neutronapi_v2_0

    [composite:neutronapi_v2_0]
    use = call:neutron.auth:pipeline_factory
    noauth = cors request_id catch_errors extensions neutronapiapp_v2_0
    keystone = cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

    [filter:request_id]
    paste.filter_factory = oslo_middleware:RequestId.factory

    [filter:catch_errors]
    paste.filter_factory = oslo_middleware:CatchErrors.factory

    [filter:cors]
    paste.filter_factory = oslo_middleware.cors:filter_factory
    oslo_config_project = neutron

    [filter:keystonecontext]
    paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

    [filter:authtoken]
    paste.filter_factory = keystonemiddleware.auth_token:filter_factory

    [filter:extensions]
    paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

    [app:neutronversions]
    paste.app_factory = neutron.api.versions:Versions.factory

    [app:neutronapiapp_v2_0]
    paste.app_factory = neutron.api.v2.router:APIRouter.factory

    首先是一个组合类型的section,这个section表明用Paste.urlmap来构造应用,因此会将对"/"的访问交给另外一个app[app:nuetronversion],而将对"/v2.0"的访问交给另外一个组合[composite:neutronapi_v2_0]生成的app。

    通过这2个就构造了所有的WSGI应用,其中对"/"的访问,而通过neutron.api,version:Versions.factory类方法来构造一个对象,然后将请求交于这个对象处理,

    具体而言就是交于对象的__call__方法。我们来看下是如何构造的:

    neutron/api/versinos.py:

    class Versions(object):
    
        @classmethod
        def factory(cls, global_config, **local_config):
            return cls(app=None)
    通过factory方法构造一个对象,这个对象就是一个WSGI应用。它就处理对"/"的方法,而根据WSGI规范,会调用这个对象的__call__方法:
    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):
        """Respond to a request for all Neutron API versions."""
        version_objs = [
            {
                "id": "v2.0",
                "status": "CURRENT",
            },
        ]
    
        if req.path != '/':
            if self.app:
                return req.get_response(self.app)
            language = req.best_match_language()
            msg = _('Unknown API version specified')
            msg = oslo_i18n.translate(msg, language)
            return webob.exc.HTTPNotFound(explanation=msg)
           ..............

    可以看到,通过@webob.dec.wsgify装饰器将__call__封装成符合WSGI规范的函数,这样"/"请求最终就是由"__call__"处理的。

    这个"/"还比较简单,复杂的是对"/v2.0"的访问,这是大部分API的接口,我们看到这个组合段的app是用一个函数来构造的:

    [composite:neutronapi_v2_0]
    use = call:neutron.auth:pipeline_factory

    use = call:...表示后面的是一个可调用对象,用它来构造最终的app.剩余的参数noauth,keystone等会作为参数传给pipeline_factory。

    neutron/auth.py:

    def pipeline_factory(loader, global_conf, **local_conf):
        """Create a paste pipeline based on the 'auth_strategy' config option."""
        pipeline = local_conf[cfg.CONF.auth_strategy]
        pipeline = pipeline.split()
        filters = [loader.get_filter(n) for n in pipeline[:-1]]
        app = loader.get_app(pipeline[-1])
        filters.reverse()
        for filter in filters:
            app = filter(app)
        return app
    先从配置文件neutron.conf中读取auth策略,默认是"auth_strategy = keystone",因此从api-paste.ini中取到的pipeline为"cors request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0"它们都定义在其它的"filter"或"app" section段中。

    首先,从pipeline中获取最后一个app,即为"neutronapiapp_v2_0",从中加载app,然后依次用各个filter处理构造的app,并最终返回最后构造出的WSGI APP.

    因此,我们按下面的顺序分析即可:

    通过app_factory工厂方法来构造app,然后通过不同的filter_factory方法构造不同的filter对象,并将app依次通过filter对象处理。
    [app:neutronapiapp_v2_0]
    paste.app_factory = neutron.api.v2.router:APIRouter.factory

    neutron/api/v2/router.py:

    class APIRouter(base_wsgi.Router):
    
        @classmethod
        def factory(cls, global_config, **local_config):
            return cls(**local_config)

    工厂方法构造了一个APIRouter对象作为app返回,因此分析其__init__方法:

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        plugin = manager.NeutronManager.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
    
        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)
        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            allow_pagination = cfg.CONF.allow_pagination
            allow_sorting = cfg.CONF.allow_sorting
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=allow_pagination,
                allow_sorting=allow_sorting)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)
    
        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))
            resource_registry.register_resource_by_name(resource)
    
        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])
    
        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)
    这个属于核心API的构造,因此详细分析一下。
    mapper = routes_mapper.Mapper()

    首先,是声明一个routes.Mapper,这个上篇routes分析时讲过,用来构造URL和对应controller的映射,方便根据不同的URL路由给不同的controller处理。

    plugin = manager.NeutronManager.get_plugin()
    然后,先构造了一个NeutronManger的单例,这个对象构造的过程中会根据配置加载核心插件,一般就是"Ml2Plugin",然后会加载以下几个默认的服务插件:
    neutron/plugings/common/constants.py:
    
    DEFAULT_SERVICE_PLUGINS = {
        'auto_allocate': 'auto-allocated-topology',
        'tag': 'tag',
        'timestamp_core': 'timestamp_core',
        'network_ip_availability': 'network-ip-availability'
    }
    然后是加载扩展插件:
    extensions.PluginAwareExtensionManager.get_instance()
    扩展插件的加载会从neutron/extensions目录下加载所有插件。

    通过上面2步就加载完了核心插件,服务插件和扩展插件,然后就是构造不同URL的controller。

    for resource in RESOURCES:
        _map_resource(RESOURCES[resource], resource,
                      attributes.RESOURCE_ATTRIBUTE_MAP.get(
                          RESOURCES[resource], dict()))
    依次构造以下几个URL的controller."/networks","/subnets","/subnetpools","/ports"。
    RESOURCES = {'network': 'networks',
                 'subnet': 'subnets',
                 'subnetpool': 'subnetpools',
                 'port': 'ports'}
    这个构造过程是通过_map_resource函数完成的,构造时会从配置文件中获取一些允许进行的操作,如"allow_bulk"
    等。
    
    在构造具体的mapper时,会传递以下参数:
    
    col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                      member_actions=MEMBER_ACTIONS)
    COLLECTION_ACTIONS = ['index', 'create']
    MEMBER_ACTIONS = ['show', 'update', 'delete']

    这些就是可以对URL发起的操作类型,这些操作最终会根据访问的URL(/networks,ports)转换为create_network,update_port这些函数交给对应的controller处理。这些后面还会分析。

    具体的controller是通过base.create_resource生成的,来看下代码:

    neutron/api/v2/base.py:

    class Controller(object):
        LIST = 'list'
        SHOW = 'show'
        CREATE = 'create'
        UPDATE = 'update'
        DELETE = 'delete'
    ..........

    ..........

    def create_resource(collection, resource, plugin, params, allow_bulk=False,
                        member_actions=None, parent=None, allow_pagination=False,
                        allow_sorting=False):
        controller = Controller(plugin, collection, resource, params, allow_bulk,
                                member_actions=member_actions, parent=parent,
                                allow_pagination=allow_pagination,
                                allow_sorting=allow_sorting)
        return wsgi_resource.Resource(controller, FAULT_MAP)
    可以看到,所有的Controller都是这个文件中定义的Controller类的实例对象,然后还会再将其调用wsgi_resouce.Resource.

    neutron/api/v2/resouce.py:

    def Resource(controller, faults=None, deserializers=None, serializers=None,
                 action_status=None):
        """Represents an API entity resource and the associated serialization and
        deserialization logic
        """
        default_deserializers = {'application/json': wsgi.JSONDeserializer()}
        default_serializers = {'application/json': wsgi.JSONDictSerializer()}
        format_types = {'json': 'application/json'}
        action_status = action_status or dict(create=201, delete=204)
    
        default_deserializers.update(deserializers or {})
        default_serializers.update(serializers or {})
    
        deserializers = default_deserializers
        serializers = default_serializers
        faults = faults or {}
    
        @webob.dec.wsgify(RequestClass=Request)
        def resource(request):
            route_args = request.environ.get('wsgiorg.routing_args')
            if route_args:
                args = route_args[1].copy()
            else:
                args = {}
    
            # NOTE(jkoelker) by now the controller is already found, remove
            #                it from the args if it is in the matchdict
            args.pop('controller', None)
            fmt = args.pop('format', None)
            action = args.pop('action', None)
            content_type = format_types.get(fmt,
                                            request.best_match_content_type())
            language = request.best_match_language()
            deserializer = deserializers.get(content_type)
            serializer = serializers.get(content_type)
    
            try:
                if request.body:
                    args['body'] = deserializer.deserialize(request.body)['body']
    
                method = getattr(controller, action)
    
                result = method(request=request, **args)
            except (exceptions.NeutronException,
                    netaddr.AddrFormatError,
                    oslo_policy.PolicyNotAuthorized) as e:
                for fault in faults:
                    if isinstance(e, fault):
                        mapped_exc = faults[fault]
                        break
                else:
                    mapped_exc = webob.exc.HTTPInternalServerError
                if 400 <= mapped_exc.code < 500:
                    LOG.info(_LI('%(action)s failed (client error): %(exc)s'),
                             {'action': action, 'exc': e})
                else:
                    LOG.exception(_LE('%s failed'), action)
                e = translate(e, language)
                body = serializer.serialize(
                    {'NeutronError': get_exception_data(e)})
                kwargs = {'body': body, 'content_type': content_type}
                raise mapped_exc(**kwargs)
            except webob.exc.HTTPException as e:
                type_, value, tb = sys.exc_info()
                if hasattr(e, 'code') and 400 <= e.code < 500:
                    LOG.info(_LI('%(action)s failed (client error): %(exc)s'),
                             {'action': action, 'exc': e})
                else:
                    LOG.exception(_LE('%s failed'), action)
                translate(e, language)
                value.body = serializer.serialize(
                    {'NeutronError': get_exception_data(e)})
                value.content_type = content_type
                six.reraise(type_, value, tb)
            except NotImplementedError as e:
                e = translate(e, language)
                # NOTE(armando-migliaccio): from a client standpoint
                # it makes sense to receive these errors, because
                # extensions may or may not be implemented by
                # the underlying plugin. So if something goes south,
                # because a plugin does not implement a feature,
                # returning 500 is definitely confusing.
                body = serializer.serialize(
                    {'NotImplementedError': get_exception_data(e)})
                kwargs = {'body': body, 'content_type': content_type}
                raise webob.exc.HTTPNotImplemented(**kwargs)
            except Exception:
                # NOTE(jkoelker) Everything else is 500
                LOG.exception(_LE('%s failed'), action)
                # Do not expose details of 500 error to clients.
                msg = _('Request Failed: internal server error while '
                        'processing your request.')
                msg = translate(msg, language)
                body = serializer.serialize(
                    {'NeutronError': get_exception_data(
                        webob.exc.HTTPInternalServerError(msg))})
                kwargs = {'body': body, 'content_type': content_type}
                raise webob.exc.HTTPInternalServerError(**kwargs)
    
            status = action_status.get(action, 200)
            body = serializer.serialize(result)
            # NOTE(jkoelker) Comply with RFC2616 section 9.7
            if status == 204:
                content_type = ''
                body = None
    
            return webob.Response(request=request, status=status,
                                  content_type=content_type,
                                  body=body)
        # NOTE(blogan): this is something that is needed for the transition to
        # pecan.  This will allow the pecan code to have a handle on the controller
        # for an extension so it can reuse the code instead of forcing every
        # extension to rewrite the code for use with pecan.
        setattr(resource, 'controller', controller)
        return resource
    可以看到,所有的请求都会先交于resouce函数处理,进行反序列化和请求参数的获取,最终再交给controller处理。
    action = args.pop('action', None)
    method = getattr(controller, action)
    
    result = method(request=request, **args)

    这样对于"/networks","/subnets","/subnetpools","/ports"都会最终交于controller对应的action函数,以create_network为例:

    def create(self, request, body=None, **kwargs):
        self._notifier.info(request.context,
                            self._resource + '.create.start',
                            body)
        return self._create(request, body, **kwargs)
    
    
    @db_api.retry_db_errors
    def _create(self, request, body, **kwargs):
           action = self._plugin_handlers[self.CREATE]
    _create中会从selc._plugin_handlers里取对应操作映射的action,这个映射是在controller的构造函数里创建的:
    
    self._plugin_handlers = {
        self.LIST: 'get%s_%s' % (parent_part, self._collection),
        self.SHOW: 'get%s_%s' % (parent_part, self._resource)
    }
    for action in [self.CREATE, self.UPDATE, self.DELETE]:
        self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
                                                     self._resource)
    self._resource为"network","port"这些RESOUCES,因此create对应的为create_network,create_port。
    在_create中最终调用do_create:
    obj_creator = getattr(self._plugin, action)
    try:
        if emulated:
            return self._emulate_bulk_create(obj_creator, request,
                                             body, parent_id)
        else:
            if self._collection in body:
                # This is weird but fixing it requires changes to the
                # plugin interface
                kwargs.update({self._collection: body})
            else:
                kwargs.update({self._resource: body})
            return obj_creator(request.context, **kwargs)
    
    
    可以看到会从self._plugin里获取对应的action,这个_plugin就是核心插件Ml2Plugin,因此所有的核心操作最终都
    会交给Ml2Plugin的对应create_network,create_port等方法执行。这样就明白了所有核心资源的创建删除等
    操作最终都会将给Ml2Plugin的对应方法处理。

    那么Ml2Plugin插件的处理过程又是如何呢?我们先来看下其构造函数:

    def __init__(self):
        # First load drivers, then initialize DB, then initialize drivers
        self.type_manager = managers.TypeManager()
        self.extension_manager = managers.ExtensionManager()
        self.mechanism_manager = managers.MechanismManager()
        super(Ml2Plugin, self).__init__()
    可以看到它初始化了type_manager,mechanism_manager这2个管理器分别用来管理type和mechanism.其中不同的网络拓扑类型对应着Type Driver,而网络实现机制对应着Mechanism Driver。这两个管理器都是通过stevedor来管理的,这样就可以向查找标准库一样管理Type,Mechanism Driver了。

    其中Type插件的加载会以'neutron.ml2.type_drivers'作为命名空间,Mechanism插件的加载会以'neutron.ml2.mechanism_drivers"作为命名空间。

    这样实际上Ml2Plugin的不同操作会交给不同的type,mechanism插件处理,这样的架构十分灵活,比如:

    def create_network(self, context, network):
        result, mech_context = self._create_network_db(context, network)
        try:
            self.mechanism_manager.create_network_postcommit(mech_context)
    创建网络会交由mechanism_manager处理。

    这样就是APIRouter构造出的app的全部内容了,对于核心URL会交由resource->Controller-->Ml2Plugin--->Type,Mechanism层层处理。也很方便我们根据需要自己实现不同的Type,Mechanism Driver.

    然后就是将这个app交由不同的filter处理,我们继续看这些filter干了些什么。第一个filter是:

    [filter:extensions]
    paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

    neutron/api/extensions.py:

    def plugin_aware_extension_middleware_factory(global_config, **local_config):
        """Paste factory."""
        def _factory(app):
            ext_mgr = PluginAwareExtensionManager.get_instance()
            return ExtensionMiddleware(app, ext_mgr=ext_mgr)
        return _factory
    可以看到会用ExtensionMiddleware对象对app进行处理,这个处理和APIRouter的__init__函数处理类似,只不过这次是为扩展插件构造URL和Controller.这些扩展插件的Controller是ExtensionController。由于过程类似,就不再详细展开了,可以自行分析下。这样通过第一个filter就构造出了扩展插件的WSGI应用。

    第二个filter:

    [filter:authtoken]
    paste.filter_factory = keystonemiddleware.auth_token:filter_factory

    keystonemiddleware/auth_token/__init__.py:

    def filter_factory(global_conf, **local_conf):
        """Returns a WSGI filter app for use with paste.deploy."""
        conf = global_conf.copy()
        conf.update(local_conf)
    
        def auth_filter(app):
            return AuthProtocol(app, conf)
        return auth_filter

    可以看到对app封装了一个AuthProtocol对象。分析其代码不难看出其作用是对请求是否通过了认证进行检查,即是否携带合法token。这样后面的filter的作用也类似,就是对请求进行一些预处理,所有预处理都完成后再交由实际的Controller处理。

    这样我们就分析完了整个WSGI应用的构造和处理过程,不难得出下面的处理流程:

  • 相关阅读:
    Mybatis-plus学习笔记(一)
    Mysql基础(四)分组查询及连接查询
    Mysql 基础(三)排序查询及常用函数
    CyclicBarrier 使用详解
    countDownLatch
    pom所有依赖version红色但是不影响运行
    iText5实现Java生成PDF文件完整版
    【Maven】---Nexus私服配置Setting和Pom
    引用、指针、const、define、static、sizeof、左值右值
    事物隔离级别、MVCC以及数据库中常见锁介绍
  • 原文地址:https://www.cnblogs.com/liuhongru/p/10154705.html
Copyright © 2011-2022 走看看