zoukankan      html  css  js  c++  java
  • nova-conductor与AMQP(二)

    源码版本:H版

    一、首先看服务的启动脚本

    /usr/bin/nova-conductor

    import sys
    from nova.cmd.conductor import main
    if __name__ == "__main__":
        sys.exit(main())

    nova/cmd/conductor.py

    def main():
        ...
        server = service.Service.create(binary='nova-conductor',
                                        topic=CONF.conductor.topic,
                                        manager=CONF.conductor.manager)
        service.serve(server, workers=CONF.conductor.workers)
        service.wait()

    二、分析RPC服务的创建

    nova/service.py

    Service类:
    @classmethod
    def create(cls, host=None, binary=None, topic=None, manager=None,
               report_interval=None, periodic_enable=None,
               periodic_fuzzy_delay=None, periodic_interval_max=None,
               db_allowed=True):
        ...
        service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_enable=periodic_enable,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          periodic_interval_max=periodic_interval_max,
                          db_allowed=db_allowed)
        return service_obj

    三、分析服务的运行

    nova/service.py

    def serve(server, workers=None):
        global _launcher
        if _launcher:
            raise RuntimeError(_('serve() can only be called once'))
        """ service为nova.openstack.common.service"""
        _launcher = service.launch(server, workers=workers)

    nova/openstack/common/service.py

    def launch(service, workers=None):
        if workers:
            launcher = ProcessLauncher()
            launcher.launch_service(service, workers=workers)
        else:
            launcher = ServiceLauncher()
            launcher.launch_service(service)
        return launcher

      参考nova-api的服务启动过程(http://www.cnblogs.com/littlebugfish/p/4022907.html),即启动协程(使用eventlet)运行Service,主要是start函数。注意,如果在配置文件中指明多个workers的话,将有多个进程监听消息队列,取得消息的进程负责处理。接着看start函数的代码,如下:

    nova/service.py

    Service类:
    def start(self):
        ...
        self.manager.pre_start_hook()
        if self.backdoor_port is not None:
            self.manager.backdoor_port = self.backdoor_port
        """创建AMQP连接,方便后面创建Consumer"""
        self.conn = rpc.create_connection(new=True)
        LOG.debug(_("Creating Consumer connection for Service %s") %
                  self.topic)
        rpc_dispatcher = self.manager.create_rpc_dispatcher(self.backdoor_port)
        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
        node_topic = '%s.%s' % (self.topic, self.host)
        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
        self.conn.consume_in_thread()
        self.manager.post_start_hook()
    
        LOG.debug(_("Join ServiceGroup membership for this service %s")
                  % self.topic)
        self.servicegroup_api.join(self.host, self.topic, self)
    
        if self.periodic_enable:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None
            self.tg.add_dynamic_timer(self.periodic_tasks,
                                     initial_delay=initial_delay,
                                     periodic_interval_max=
                                        self.periodic_interval_max)

      self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)。这里的self.conn主要指nova.openstack.common.rpc.impl_qpid.Connection,(此处设AMQP后端实现为qpid,这个可以在nova.conf中进行配置)所以调用的create_consumer代码如下:

    def create_consumer(self, topic, proxy, fanout=False):
        proxy_cb = rpc_amqp.ProxyCallback(
            self.conf, proxy,
            rpc_amqp.get_connection_pool(self.conf, Connection))
        self.proxy_callbacks.append(proxy_cb)
    
        if fanout:
            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
        else:
            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
    
        self._register_consumer(consumer)
    
        return consumer

      该函数会根据具体的后端AMQP实现,向AMQP服务器发送队列和exchange创建(第一次会创建,后面会复用)和绑定请求,这样就可以将Consumer和具体的队列绑定并进行监听。self.conn.consume_in_thread函数就是负责启动Consumer线程,其使用evelent.spawn创建一个协程一直运行等待消息,在有消息到来时会创建新的协程运行远程调用的函数。当队列有消息到来时,调用proxy_cb进行处理,即ProxyCallback对象的__call__函数,代码如下:

    nova/openstack/common/rpc/amqp.py

    ProxyCallback类:
    def __call__(self, message_data):
        if hasattr(local.store, 'context'):
            del local.store.context
        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
        self.msg_id_cache.check_duplicate_message(message_data)
        ctxt = unpack_context(self.conf, message_data)
        """解析消息"""
        method = message_data.get('method')
        args = message_data.get('args', {})
        version = message_data.get('version')
        namespace = message_data.get('namespace')
        if not method:
            LOG.warn(_('no method for message: %s') % message_data)
            ctxt.reply(_('No method for message: %s') % message_data,
                       connection_pool=self.connection_pool)
            return
        """处理消息"""
        self.pool.spawn_n(self._process_data, ctxt, version, method,
                          namespace, args)
    
    
    
    def _process_data(self, ctxt, version, method, namespace, args):
            ctxt.update_store()
        try:
            rval = self.proxy.dispatch(ctxt, version, method, namespace,
                                       **args)
        ...

       可以看到消息的处理主要是通过新开的协程(使用eventlet)来执行self._process_data函数,而self._process_data函数中主要调用了self.proxy.dispatch函数,那么这个dispatch函数的内容究竟是什么呢?

    1、首先找到self.proxy

      根据对self.proxy的层层追踪,其为上面ProxyCallback构造时传入,即start函数中的 rpc_dispatcher = self.manager.create_rpc_dispatcher(self.backdoor_port),这里的self.manager在创建该Service的时候就已经设定为ConductorManager。ConductorManager的create_rpc_dispatcher函数代码如下:

    def create_rpc_dispatcher(self, *args, **kwargs):
        #self.compute_task_mgr = ComputeTaskManager()
        kwargs['additional_apis'] = [self.compute_task_mgr]
        return super(ConductorManager, self).create_rpc_dispatcher(*args,
                **kwargs)

      根据类的继承关系:

    nova/manager.py

    Manager类:
    def create_rpc_dispatcher(self, backdoor_port=None, additional_apis=None):
        apis = []
        if additional_apis:
            apis.extend(additional_apis)
        base_rpc = baserpc.BaseRPCAPI(self.service_name, backdoor_port)
        apis.extend([self, base_rpc])
        serializer = objects_base.NovaObjectSerializer()
        return rpc_dispatcher.RpcDispatcher(apis, serializer)

    nova/openstack/common/rpc/dispatcher.py

    RpcDispatcher类
    def __init__(self, callbacks, serializer=None):
        #callbacks为一个list,包含ComputeTaskManager,ConductorManager,BaseRPCAPI
        self.callbacks = callbacks
        if serializer is None:
            serializer = rpc_serializer.NoOpSerializer()
        self.serializer = serializer
        super(RpcDispatcher, self).__init__()

      所以最后的self.proxy为RpcDispatcher对象。要注意的是RpcDispatcher对象的callbacks属性包含了一系列的Manager类,它们将被用来处理消息中指定的method。参考下图:

    2、分析self.proxy.dispatch函数

      即RpcDispatcher类的dispatch函数,代码如下:

    def dispatch(self, ctxt, version, method, namespace, **kwargs):
        if not version:
            version = '1.0'
        had_compatible = False
        for proxyobj in self.callbacks:
            try:
                cb_namespace = proxyobj.RPC_API_NAMESPACE
            except AttributeError:
                cb_namespace = None
            if namespace != cb_namespace:
                continue   
            ...         
            if not hasattr(proxyobj, method):
                continue
            if is_compatible:
                kwargs = self._deserialize_args(ctxt, kwargs)
                result = getattr(proxyobj, method)(ctxt, **kwargs)
                return self.serializer.serialize_entity(ctxt, result)
        ...

      该函数会根据namespace匹配Manager,然后分析Manager类中的函数是否和消息中的函数匹配,如果匹配即调用Manager类中对应的函数进行处理。经过观察,其实对外提供rpc服务的组件的rpcapi.py和manager.py文件中的类是对应的,它们通过namespace进行匹配查找,在命名上具有相同的前缀,所以如果要跟踪rpcapi.py中函数的后续实现只需查看该组件的manager.py文件里对应的Manager类的对应函数就可以了。

    参考文章:

    http://bingotree.cn/?p=242

    http://blog.csdn.net/gaoxingnengjisuan/article/details/12231633

     

     

  • 相关阅读:
    最长不重复子串
    add two nums
    logistic 回归
    threesum
    KNN算法思想与实现
    Python的易错点
    ccf 目录格式转换
    Azure 带宽
    Office 365 如何使用powershell查询邮件追踪
    Azure AD Connect 手动同步
  • 原文地址:https://www.cnblogs.com/littlebugfish/p/4058054.html
Copyright © 2011-2022 走看看