zoukankan      html  css  js  c++  java
  • Cinder Volume 服务启动流程分析和周期性任务分析

    1、cinder-volume服务的程序入口

    #!/usr/bin/python2
    # PBR Generated from u'console_scripts'
    import sys
    from cinder.cmd.volume import main
    if __name__ == "__main__":
        sys.exit(main())
    

    2、cinder/cmd/volume.py的main方法实现

    def main():
        objects.register_all()   # import cinder/objects目录下的所有模块
        gmr_opts.set_defaults(CONF)  # oslo_reports模块,用于生成错误报告,如内存泄漏等
        CONF(sys.argv[1:], project='cinder',
             version=version.version_string())
        logging.setup(CONF, "cinder")
        python_logging.captureWarnings(True)
        priv_context.init(root_helper=shlex.split(utils.get_root_helper()))  # oslo_privsep,service的执行权限设置
        utils.monkey_patch()   # monkey-patch,替换库比如socket、thread等,不改变import的行为,改变的是操作的指向函数
        gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
        launcher = service.get_launcher()----------------第一步
        LOG = logging.getLogger(__name__)
        service_started = False
    
        if CONF.enabled_backends:-----------在cinder.conf文件中使能的多存储后端
            for backend in filter(None, CONF.enabled_backends):-------循环读取配置文件中enabled_backends的值
                CONF.register_opt(host_opt, group=backend)
                backend_host = getattr(CONF, backend).backend_host
                host = "%s@%s" % (backend_host or CONF.host, backend)------生成cinder-volume host的名字,使用命令行cinder service-list的时候,查到的名字
                # We also want to set cluster to None on empty strings, and we
                # ignore leading and trailing spaces.
                cluster = CONF.cluster and CONF.cluster.strip()
                cluster = (cluster or None) and '%s@%s' % (cluster, backend)
                try:
                    server = service.Service.create(host=host,------------------s1 步为每一个backend生成一个服务对象
                                                    service_name=backend,
                                                    binary='cinder-volume',
                                                    coordination=True,
                                                    cluster=cluster)
                except Exception:
                    msg = _('Volume service %s failed to start.') % host
                    LOG.exception(msg)
                else:--------如果没有异常发生,那么走这个分支
                    # Dispose of the whole DB connection pool here before
                    # starting another process.  Otherwise we run into cases where
                    # child processes share DB connections which results in errors.
                    session.dispose_engine()
                    launcher.launch_service(server)--------------第二步,调用launcher里面的launch_service
                    service_started = True
        else:
            LOG.error(_LE('Configuration for cinder-volume does not specify '
                          '"enabled_backends". Using DEFAULT section to configure '
                          'drivers is not supported since Ocata.'))
    
        if not service_started:
            msg = _('No volume service(s) started successfully, terminating.')
            LOG.error(msg)
            sys.exit(1)
    
        launcher.wait()
    

    3、对service.Service.create()的详解  

    cinder/service.py:class Service(service.Service)里面的类方法
        @classmethod
        def create(cls, host=None, binary=None, topic=None, manager=None,
                   report_interval=None, periodic_interval=None,
                   periodic_fuzzy_delay=None, service_name=None,
                   coordination=False, cluster=None):
            """Instantiates class and passes back application object.
    
            :param host: defaults to CONF.host
            :param binary: defaults to basename of executable
            :param topic: defaults to bin_name - 'cinder-' part
            :param manager: defaults to CONF.<topic>_manager
            :param report_interval: defaults to CONF.report_interval
            :param periodic_interval: defaults to CONF.periodic_interval
            :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
            :param cluster: Defaults to None, as only some services will have it
    
            """
            if not host:
                host = CONF.host
            if not binary:
                binary = os.path.basename(inspect.stack()[-1][1])
            if not topic:
                topic = binary
            if not manager:
                subtopic = topic.rpartition('cinder-')[2]
                manager = CONF.get('%s_manager' % subtopic, None)
            if report_interval is None:
                report_interval = CONF.report_interval-------------读取配置文件中的上报周期值
            if periodic_interval is None:
                periodic_interval = CONF.periodic_interval---------读取配置文件中的轮训周期值
            if periodic_fuzzy_delay is None:
                periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
            service_obj = cls(host, binary, topic, manager,--------调用类初始化一个服务对象
                              report_interval=report_interval,
                              periodic_interval=periodic_interval,
                              periodic_fuzzy_delay=periodic_fuzzy_delay,
                              service_name=service_name,
                              coordination=coordination,
                              cluster=cluster)
    
            return service_obj
    

    4、 launcher = service.get_launcher()详解

    cinder/cmd/volume.py
    from cinder import service
    launcher = service.get_launcher()
    
    cinder/service.py
    from oslo_service import service
    def get_launcher():
        # Note(lpetrut): ProcessLauncher uses green pipes which fail on Windows
        # due to missing support of non-blocking I/O pipes. For this reason, the
        # service must be spawned differently on Windows, using the ServiceLauncher
        # class instead.
        if os.name == 'nt':
            return Launcher()--------------------windows操作系统
        else:
            return process_launcher()--------------Linux操作系统
    
    def process_launcher():
        return service.ProcessLauncher(CONF)	
    
    oslo_service/service.py
    class ProcessLauncher(object):
        """Launch a service with a given number of workers."""
    
        def __init__(self, conf, wait_interval=0.01, restart_method='reload'):
            """Constructor.
            :param conf: an instance of ConfigOpts
            :param wait_interval: The interval to sleep for between checks
                                  of child process exit.
            :param restart_method: If 'reload', calls reload_config_files on
                SIGHUP. If 'mutate', calls mutate_config_files on SIGHUP. Other
                values produce a ValueError.
            """
            self.conf = conf
            conf.register_opts(_options.service_opts)
            self.children = {}
            self.sigcaught = None
            self.running = True
            self.wait_interval = wait_interval
            self.launcher = None
            rfd, self.writepipe = os.pipe()
            self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
            self.signal_handler = SignalHandler()
            self.handle_signal()
            self.restart_method = restart_method
            if restart_method not in _LAUNCHER_RESTART_METHODS:
                raise ValueError(_("Invalid restart_method: %s") % restart_method)	
    
       def launch_service(self, service, workers=1):-----默认是一个worker,即一个进程
            """Launch a service with a given number of workers.
           :param service: a service to launch, must be an instance of
                  :class:`oslo_service.service.ServiceBase`
           :param workers: a number of processes in which a service
                  will be running
            """
            _check_service_base(service)
            wrap = ServiceWrapper(service, workers)
    
            # Hide existing objects from the garbage collector, so that most
            # existing pages will remain in shared memory rather than being
            # duplicated between subprocesses in the GC mark-and-sweep. (Requires
            # Python 3.7 or later.)
            if hasattr(gc, 'freeze'):
                gc.freeze()
    
            LOG.info('Starting %d workers', wrap.workers)
            while self.running and len(wrap.children) < wrap.workers:
                self._start_child(wrap)
    
        def _start_child(self, wrap):
            if len(wrap.forktimes) > wrap.workers:
                # Limit ourselves to one process a second (over the period of
                # number of workers * 1 second). This will allow workers to
                # start up quickly but ensure we don't fork off children that
                # die instantly too quickly.
                if time.time() - wrap.forktimes[0] < wrap.workers:
                    LOG.info('Forking too fast, sleeping')
                    time.sleep(1)
    
                wrap.forktimes.pop(0)
    
            wrap.forktimes.append(time.time())
    
            pid = os.fork()
    		使用fork创建子进程后,子进程会复制父进程的数据信息,而后,程序就分两个进程继续运行后面的程序
    		在子进程内,这个方法会返回0;在父进程内,这个方法会返回子进程的编号PID
            if pid == 0:
                self.launcher = self._child_process(wrap.service)-----fork的子进程接管服务
                while True:
                    self._child_process_handle_signal()
                    status, signo = self._child_wait_for_exit_or_signal(
                        self.launcher)
                    if not _is_sighup_and_daemon(signo):
                        self.launcher.wait()
                        break
                    self.launcher.restart()------重启这个进程
    
                os._exit(status)
    
            LOG.debug('Started child %d', pid)
    
            wrap.children.add(pid)
            self.children[pid] = wrap
    
            return pid
    因此得出,每个 backend 启动的 cinder-volume service 都是独立的子进程,即每个 launcher 都是一个子进程
    oslo_service/service.py:class Launcher(object):
        def restart(self):
            """Reload config files and restart service.
            :returns: The return value from reload_config_files or
              mutate_config_files, according to the restart_method.
            """
            if self.restart_method == 'reload':
                self.conf.reload_config_files()
            elif self.restart_method == 'mutate':
                self.conf.mutate_config_files()
            self.services.restart()
    oslo_service/service.py:class Services(object)
        def restart(self):
            """Reset services and start them in new threads."""
            self.stop()
            self.done = event.Event()
            for restart_service in self.services:
                restart_service.reset()
                self.tg.add_thread(self.run_service, restart_service, self.done)
    
        @staticmethod
        def run_service(service, done):
            """Service start wrapper.
            :param service: service to run
            :param done: event to wait on until a shutdown is triggered
            :returns: None
            """
            try:
                service.start()-------调用服务的start方法,最终调用的是cinder/service.py:class Service(service.Service):start方法,因为该方法继承oslo_service,service方法
            except Exception:
                LOG.exception('Error starting thread.')
                raise SystemExit(1)
            else:
                done.wait()		
    			
    cinder/service.py:class Service类:start方法,
       def start(self):
            version_string = version.version_string()
            LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'),
                     {'topic': self.topic, 'version_string': version_string})
            self.model_disconnected = False
    
            if self.coordination:
                coordination.COORDINATOR.start()
    
            self.manager.init_host(added_to_cluster=self.added_to_cluster,--------------------初始化驱动程序
                                   service_id=Service.service_id)
    
            LOG.debug("Creating RPC server for service %s", self.topic)
    
            ctxt = context.get_admin_context()
            endpoints = [self.manager]
            endpoints.extend(self.manager.additional_endpoints)
            obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
            LOG.debug("Pinning object versions for RPC server serializer to %s",
                      obj_version_cap)
            serializer = objects_base.CinderObjectSerializer(obj_version_cap)
    
            target = messaging.Target(topic=self.topic, server=self.host)
            self.rpcserver = rpc.get_server(target, endpoints, serializer)
            self.rpcserver.start()
    
            # NOTE(dulek): Kids, don't do that at home. We're relying here on
            # oslo.messaging implementation details to keep backward compatibility
            # with pre-Ocata services. This will not matter once we drop
            # compatibility with them.
            if self.topic == constants.VOLUME_TOPIC:
                target = messaging.Target(
                    topic='%(topic)s.%(host)s' % {'topic': self.topic,
                                                  'host': self.host},
                    server=vol_utils.extract_host(self.host, 'host'))
                self.backend_rpcserver = rpc.get_server(target, endpoints,
                                                        serializer)
                self.backend_rpcserver.start()
    
            # TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part
            if self.cluster and not self.is_svc_upgrading_to_n(self.binary):
                LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version '
                             '%(version)s)'),
                         {'topic': self.topic, 'version': version_string,
                          'cluster': self.cluster})
                target = messaging.Target(
                    topic='%s.%s' % (self.topic, self.cluster),
                    server=vol_utils.extract_host(self.cluster, 'host'))
                serializer = objects_base.CinderObjectSerializer(obj_version_cap)
                self.cluster_rpcserver = rpc.get_server(target, endpoints,
                                                        serializer)
                self.cluster_rpcserver.start()
    
            self.manager.init_host_with_rpc()
    
            if self.report_interval:
                pulse = loopingcall.FixedIntervalLoopingCall(----------------启动上报给scheduler定时任务
                    self.report_state)
                pulse.start(interval=self.report_interval,
                            initial_delay=self.report_interval)
                self.timers.append(pulse)
    
            if self.periodic_interval:
                if self.periodic_fuzzy_delay:
                    initial_delay = random.randint(0, self.periodic_fuzzy_delay)
                else:
                    initial_delay = None
    
                periodic = loopingcall.FixedIntervalLoopingCall(--------------启动轮训根性定时任务
                    self.periodic_tasks)
                periodic.start(interval=self.periodic_interval,
                               initial_delay=initial_delay)
                self.timers.append(periodic)
    
    cinder/volume/manager.py:VolumeManager
    init_host 过程分析
        def init_host(self, added_to_cluster=None, **kwargs):
            """Perform any required initialization."""
            ctxt = context.get_admin_context()
            if not self.driver.supported:
                utils.log_unsupported_driver_warning(self.driver)
    
                if not self.configuration.enable_unsupported_driver:
                    LOG.error(_LE("Unsupported drivers are disabled."
                                  " You can re-enable by adding "
                                  "enable_unsupported_driver=True to the "
                                  "driver section in cinder.conf"),
                              resource={'type': 'driver',
                                        'id': self.__class__.__name__})
                    return
    
            # If we have just added this host to a cluster we have to include all
            # our resources in that cluster.
            if added_to_cluster:
                self._include_resources_in_cluster(ctxt)
    
            LOG.info(_LI("Starting volume driver %(driver_name)s (%(version)s)"),
                     {'driver_name': self.driver.__class__.__name__,
                      'version': self.driver.get_version()})
            try:
                self.driver.do_setup(ctxt)---------------------存储的初始化操作放到了 do_setup 中,当 do_setup 失败时,并不会导致服务启动失败,也不会影响 multi backend 的其他 backend。
                self.driver.check_for_setup_error()
            except Exception:
                LOG.exception(_LE("Failed to initialize driver."),
                              resource={'type': 'driver',
                                        'id': self.__class__.__name__})
                # we don't want to continue since we failed
                # to initialize the driver correctly.
                return
    
            # Initialize backend capabilities list
            self.driver.init_capabilities()
    
            volumes = self._get_my_volumes(ctxt)
            snapshots = self._get_my_snapshots(ctxt)
            self._sync_provider_info(ctxt, volumes, snapshots)
            # FIXME volume count for exporting is wrong
    
            self.stats['pools'] = {}
            self.stats.update({'allocated_capacity_gb': 0})
    
            try:
                for volume in volumes:
                    # available volume should also be counted into allocated
                    if volume['status'] in ['in-use', 'available']:
                        # calculate allocated capacity for driver
                        self._count_allocated_capacity(ctxt, volume)
    
                        try:
                            if volume['status'] in ['in-use']:
                                self.driver.ensure_export(ctxt, volume)
                        except Exception:
                            LOG.exception(_LE("Failed to re-export volume, "
                                              "setting to ERROR."),
                                          resource=volume)
                            volume.conditional_update({'status': 'error'},
                                                      {'status': 'in-use'})
                # All other cleanups are processed by parent class CleanableManager
    
            except Exception:
                LOG.exception(_LE("Error during re-export on driver init."),
                              resource=volume)
                return
    
            self.driver.set_throttle()
    
            # at this point the driver is considered initialized.
            # NOTE(jdg): Careful though because that doesn't mean
            # that an entry exists in the service table
            self.driver.set_initialized()-------------set_initialized 将驱动的 initialized 属性设定为 true,标志驱动成功启动并初始化完成。
    
            # Keep the image tmp file clean when init host.
            backend_name = vol_utils.extract_host(self.service_topic_queue)
            image_utils.cleanup_temporary_file(backend_name)
    
            # collect and publish service capabilities
            self.publish_service_capabilities(ctxt)
            LOG.info(_LI("Driver initialization completed successfully."),
                     resource={'type': 'driver',
                               'id': self.driver.__class__.__name__})
    
            # Make sure to call CleanableManager to do the cleanup
            super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster,
                                                 **kwargs)
    

      

     

  • 相关阅读:
    优雅地从Python入门到入土*序与目录
    【NOI2008】假面舞会
    【HNOI2009】梦幻布丁
    【题解】前k大子段和
    【NOIP2017】宝藏
    【NOIP2014】飞扬的小鸟
    【NOIP2014】解方程
    【NOIP2012】开车旅行
    【模板】线性同余方程组
    java实现省市区三级联动
  • 原文地址:https://www.cnblogs.com/potato-chip/p/12299220.html
Copyright © 2011-2022 走看看