zoukankan      html  css  js  c++  java
  • cinder migrate基础内容-源码分析

    一、cinder-api服务入口

    D:code-programcinder-codejunoapicontribadmin_actions.py

    from cinder import volume
    class VolumeAdminController(AdminController):
        """AdminController for Volumes."""
        @wsgi.action('os-migrate_volume')
        def _migrate_volume(self, req, id, body):
            """Migrate a volume to the specified host."""
            context = req.environ['cinder.context']
            self.authorize(context, 'migrate_volume')
            try:
                volume = self._get(context, id)--------根据volume id获取卷对象
            except exception.NotFound:
                raise exc.HTTPNotFound()
            params = body['os-migrate_volume']-----获取request请求体中参数
            try:
                host = params['host']-------卷要迁移到的主机
            except KeyError:
                raise exc.HTTPBadRequest(explanation=_("Must specify 'host'"))
            force_host_copy = params.get('force_host_copy', False)------从请求体中,获取force_host_copy的值,默认情况下,force_host_copy为False,
            if isinstance(force_host_copy, basestring):
                try:
                    force_host_copy = strutils.bool_from_string(force_host_copy,-----字符串类型转化为bool类型
                                                                strict=True)
                except ValueError:
                    raise exc.HTTPBadRequest(
                        explanation=_("Bad value for 'force_host_copy'"))
            elif not isinstance(force_host_copy, bool):
                raise exc.HTTPBadRequest(
                    explanation=_("'force_host_copy' not string or bool"))
            self.volume_api.migrate_volume(context, volume, host, force_host_copy)----调用volume.API类里面的方法 步骤一
            return webob.Response(status_int=202)
    

    对步骤一进行详解
    D:code-programcinder-codejunovolumeapi.py

    class API(base.Base):
        """API for interacting with the volume manager."""
    
        @wrap_check_policy
        def migrate_volume(self, context, volume, host, force_host_copy):
            """Migrate the volume to the specified host."""
    
            # We only handle "available" volumes for now
            if volume['status'] not in ['available', 'in-use']:------这卷的状态进行判断,只有available', 'in-use这两种状态有效
                msg = _('Volume status must be available/in-use.')
                LOG.error(msg)
                raise exception.InvalidVolume(reason=msg)
    
            # Make sure volume is not part of a migration
            if volume['migration_status'] is not None:-----确保卷不是出于迁移的状态
                msg = _("Volume is already part of an active migration")
                raise exception.InvalidVolume(reason=msg)
    
            # We only handle volumes without snapshots for now-----------判断卷是否有快照,如果有,那么就抛出异常
            snaps = self.db.snapshot_get_all_for_volume(context, volume['id'])
            if snaps:
                msg = _("volume must not have snapshots")
                LOG.error(msg)
                raise exception.InvalidVolume(reason=msg)
    
            # We only handle non-replicated volumes for now
            rep_status = volume['replication_status']
            if rep_status is not None and rep_status != 'disabled':
                msg = _("Volume must not be replicated.")
                LOG.error(msg)
                raise exception.InvalidVolume(reason=msg)
    
            cg_id = volume.get('consistencygroup_id', None)
            if cg_id:
                msg = _("Volume must not be part of a consistency group.")
                LOG.error(msg)
                raise exception.InvalidVolume(reason=msg)
    
            #Make juno volume migration api support icehouse.
            #Default lvm backend pool is LVM_iSCSI,
            # gluster backend poll is GlusterFS.
            if "#" not in host:-----------为了兼容ice版本,对host进行改造
                backend_driver = host.split("@")[1]
                if backend_driver == "lvmdriver":
                    host = host + "#LVM_iSCSI"
                elif backend_driver == "GLUSTERFS_DRIVER1":
                    host = host + "#GlusterFS"
                else:
                    msg = _("Volume host is bad format.")
                    raise exception.InvalidVolume(reason=msg)
    
            # Make sure the host is in the list of available hosts
            elevated = context.elevated()
            topic = CONF.volume_topic
            services = self.db.service_get_all_by_topic(elevated,
                                                        topic,
                                                        disabled=False)
            found = False
            for service in services:
                svc_host = volume_utils.extract_host(host, 'backend')-------对卷要迁移到的主机的service状态,进行判断,是否有效,首先该主机的cinder-volume服务状态是up的
                if utils.service_is_up(service) and service['host'] == svc_host:
                    found = True
            if not found:
                msg = (_('No available service named %s') % host)
                LOG.error(msg)
                raise exception.InvalidHost(reason=msg)
    
            # Make sure the destination host is different than the current one
            if host == volume['host']:-------------卷要迁移到的主机必须与卷目前所在的主机不一样
                msg = _('Destination host must be different than current host')
                LOG.error(msg)
                raise exception.InvalidHost(reason=msg)
    
            self.update(context, volume, {'migration_status': 'starting'})-----此时更新卷的状态的migration_status:starting
    
            # Call the scheduler to ensure that the host exists and that it can
            # accept the volume
            volume_type = {}
            volume_type_id = volume['volume_type_id']
            if volume_type_id:
                volume_type = volume_types.get_volume_type(context, volume_type_id)
            request_spec = {'volume_properties': volume,
                            'volume_type': volume_type,
                            'volume_id': volume['id']}
            self.scheduler_rpcapi.migrate_volume_to_host(context,------给exchange为cinder-volume的发送rpc请求--对步骤1.1详解
                                                         CONF.volume_topic,
                                                         volume['id'],
                                                         host,
                                                         force_host_copy,
                                                         request_spec)
    

    对步骤1.1详解

    D:code-programcinder-codejunoscheduler pcapi.py,给rabbitmq中,exchange为cinder-volume的发送rpc.cast请求

    class SchedulerAPI(object):
        '''Client side of the scheduler rpc API.
        def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
                                   force_host_copy=False, request_spec=None,
                                   filter_properties=None):
    
            cctxt = self.client.prepare(version='1.3')
            request_spec_p = jsonutils.to_primitive(request_spec)
            return cctxt.cast(ctxt, 'migrate_volume_to_host',
                              topic=topic,
                              volume_id=volume_id,
                              host=host,
                              force_host_copy=force_host_copy,
                              request_spec=request_spec_p,
                              filter_properties=filter_properties)
    

     二、cinder-scheduler 接受rpc请求,对迁移的主机进行判断 

    D:code-programcinder-codejunoschedulermanager.py
    from cinder.volume import rpcapi as volume_rpcapi
    # Default scheduler driver to use (string value)
    #scheduler_driver=cinder.scheduler.filter_scheduler.FilterScheduler
    class SchedulerManager(manager.Manager):
        """Chooses a host to create volumes."""
    
        def migrate_volume_to_host(self, context, topic, volume_id, host,
                                   force_host_copy, request_spec,
                                   filter_properties=None):
            """Ensure that the host exists and can accept the volume."""
    
            def _migrate_volume_set_error(self, context, ex, request_spec):
                volume_state = {'volume_state': {'migration_status': None}}
                self._set_volume_state_and_notify('migrate_volume_to_host',
                                                  volume_state,
                                                  context, ex, request_spec)
    
            try:
                tgt_host = self.driver.host_passes_filters(context, host,-----------driver的取值是配置文件中scheduler_driver的取值--对步骤二进行详解
                                                           request_spec,
                                                           filter_properties)
            except exception.NoValidHost as ex:
                _migrate_volume_set_error(self, context, ex, request_spec)
            except Exception as ex:
                with excutils.save_and_reraise_exception():
                    _migrate_volume_set_error(self, context, ex, request_spec)
            else:
                volume_ref = db.volume_get(context, volume_id)-----从数据库中,获取卷的信息
                volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,-----对步骤三进行详解
                                                         tgt_host,----目标主机
                                                         force_host_copy)
    
    对步骤二进行详解					  
    D:code-programcinder-codejunoschedulerfilter_scheduler.py
    class FilterScheduler(driver.Scheduler):
        """Scheduler that can be used for filtering and weighing."""
        def host_passes_filters(self, context, host, request_spec,
                                filter_properties):
            """Check if the specified host passes the filters."""
            weighed_hosts = self._get_weighted_candidates(context, request_spec,------根据请求卷的特性,过滤出可用的存储节点
                                                          filter_properties)
            for weighed_host in weighed_hosts:------对过滤出来的可用存储节点中,寻找是否有request请求参数中,携带的host的主机信息,如果有,则返回该主机的状态,否则抛出异常
                host_state = weighed_host.obj
                if host_state.host == host:
                    return host_state
    
            msg = (_('Cannot place volume %(id)s on %(host)s')
                   % {'id': request_spec['volume_id'], 'host': host})
            raise exception.NoValidHost(reason=msg)	
    		
    对步骤三进行详解
    给cinder-volume发送rpc.cast请求,进行卷的迁移
    D:code-programcinder-codejunovolume
    pcapi.py
    class VolumeAPI(object):
        '''Client side of the volume rpc API.
        def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
            new_host = utils.extract_host(volume['host'])----获取卷所在的host主机
            cctxt = self.client.prepare(server=new_host, version='1.8')----更改rcp.client的环境信息,向卷所在的特定主机host发送rpc 请求
            host_p = {'host': dest_host.host,-----卷迁移的目标主机
                      'capabilities': dest_host.capabilities}
            cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'],
                       host=host_p, force_host_copy=force_host_copy)
    

     三、卷所在的主机的cinder-volume服务接受rpc请求 

    # Driver to use for volume creation (string value)
    #volume_driver=cinder.volume.drivers.lvm.LVMISCSIDriver
    D:code-programcinder-codejunovolumemanager.py
    class VolumeManager(manager.SchedulerDependentManager):
        """Manages attachable block storage devices."""
        RPC_API_VERSION = '1.19'
        target = messaging.Target(version=RPC_API_VERSION)
    
        def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
                           new_type_id=None):
            """Migrate the volume to the specified host (called on source host)."""
            try:
                # NOTE(flaper87): Verify the driver is enabled
                # before going forward. The exception will be caught
                # and the migration status updated.
                utils.require_driver_initialized(self.driver)-----后端存储驱动的初始化
            except exception.DriverNotInitialized:
                with excutils.save_and_reraise_exception():
                    self.db.volume_update(ctxt, volume_id,
                                          {'migration_status': 'error'})
    
            volume_ref = self.db.volume_get(ctxt, volume_id)----根据volumeid 获取卷的信息
            model_update = None
            moved = False
    
            status_update = None
            if volume_ref['status'] == 'retyping':
                status_update = {'status': self._get_original_status(volume_ref)}
    
            self.db.volume_update(ctxt, volume_ref['id'],
                                  {'migration_status': 'migrating'})------更新卷的状态的migration_status:migrating
            if not force_host_copy and new_type_id is None:------默认force_host_copy为假,走这个分支,由后端存储驱动的迁移卷函数完成卷迁移
                try:
                    LOG.debug("volume %s: calling driver migrate_volume",
                              volume_ref['id'])
                    moved, model_update = self.driver.migrate_volume(ctxt,-----------步骤四详解
                                                                     volume_ref,
                                                                     host)
                    if moved:
                        updates = {'host': host['host'],
                                   'migration_status': None}
                        if status_update:
                            updates.update(status_update)
                        if model_update:
                            updates.update(model_update)
                        volume_ref = self.db.volume_update(ctxt,
                                                           volume_ref['id'],
                                                           updates)
                except Exception:
                    with excutils.save_and_reraise_exception():
                        updates = {'migration_status': None}
                        if status_update:
                            updates.update(status_update)
                        model_update = self.driver.create_export(ctxt, volume_ref)
                        if model_update:
                            updates.update(model_update)
                        self.db.volume_update(ctxt, volume_ref['id'], updates)
            if not moved:----如果force_host_copy为真,走这个分支
                try:
                    self._migrate_volume_generic(ctxt, volume_ref, host,----步骤五详解
                                                 new_type_id)
                except Exception:
                    with excutils.save_and_reraise_exception():
                        updates = {'migration_status': None}
                        if status_update:
                            updates.update(status_update)
                        model_update = self.driver.create_export(ctxt, volume_ref)
                        if model_update:
                            updates.update(model_update)
                        self.db.volume_update(ctxt, volume_ref['id'], updates)

     四、 force_host_copy取值不同,所走分支不同的详解

    默认情况下,force_host_copy为假,由后端存储驱动来完成卷的迁移工作,如果force_host_copy为真,那么有cinder-volume服务所在的主机完成,卷的迁移工作

    force_host_copy为假的情况

    对步骤四进行详解
    ceph不支持卷迁移,以lvm卷为前提进行分析
    D:code-programcinder-codejunovolumedriverslvm.py
    class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):
      def migrate_volume(self, ctxt, volume, host, thin=False, mirror_count=0):
            """Optimize the migration if the destination is on the same server.
    
            If the specified host is another back-end on the same server, and
            the volume is not attached, we can do the migration locally without
            going through iSCSI.
            """
    
            false_ret = (False, None)
            if volume['status'] != 'available':
                return false_ret
            if 'location_info' not in host['capabilities']:
                return false_ret
            info = host['capabilities']['location_info']
            try:
                (dest_type, dest_hostname, dest_vg, lvm_type, lvm_mirrors) =
                    info.split(':')
                lvm_mirrors = int(lvm_mirrors)
            except ValueError:
                return false_ret
            if (dest_type != 'LVMVolumeDriver' or dest_hostname != self.hostname):
                return false_ret
    
            if dest_vg != self.vg.vg_name:
                vg_list = volutils.get_all_volume_groups()
                try:
                    (vg for vg in vg_list if vg['name'] == dest_vg).next()
                except StopIteration:
                    message = (_("Destination Volume Group %s does not exist") %
                               dest_vg)
                    LOG.error(message)
                    return false_ret
    
                helper = utils.get_root_helper()
                dest_vg_ref = lvm.LVM(dest_vg, helper,
                                      lvm_type=lvm_type,
                                      executor=self._execute)
                self.remove_export(ctxt, volume)
                self._create_volume(volume['name'],
                                    self._sizestr(volume['size']),
                                    lvm_type,
                                    lvm_mirrors,
                                    dest_vg_ref)
    
            volutils.copy_volume(self.local_path(volume),
                                 self.local_path(volume, vg=dest_vg),
                                 volume['size'],
                                 self.configuration.volume_dd_blocksize,
                                 execute=self._execute)
            self._delete_volume(volume)
            model_update = self._create_export(ctxt, volume, vg=dest_vg)
    
            return (True, model_update)
    如果迁移卷的dest_vg,与该节点配置的vg相同,那么就在该vg上对源卷进行一个拷贝,删除源卷的动作
    如果迁移卷的dest_vg,与该节点配置的vg不相同,那么判断dest_vg有效的情况下,移除源卷的export,在dest_vg上创建一个新卷,拷贝数据到目标卷,删除源卷
    

    force_host_copy为真的情况,在这种情况下,是把目的卷,挂载到源卷所在的存储节点上,然后执行linux dd的命令,进行数据的拷贝,因此,拷贝数据的时间长短,完全取决于卷的大小,及存储节点物理主机的cpu及io性能,所以这种情况下,会导致一种情况是,因为数据量太大,数据dd拷贝的时间太长,导致迁移失败的问题

    对步骤五进行详解
    # Timeout for creating the volume to migrate to when
    # performing volume migration (seconds) (integer value)
    #migration_create_volume_timeout_secs=300
    D:code-programcinder-codejunovolumemanager.py
    class VolumeManager(manager.SchedulerDependentManager):
        def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
            rpcapi = volume_rpcapi.VolumeAPI()
    
            # Create new volume on remote host
            new_vol_values = {}
            for k, v in volume.iteritems():
                new_vol_values[k] = v
            del new_vol_values['id']
            del new_vol_values['_name_id']
            # We don't copy volume_type because the db sets that according to
            # volume_type_id, which we do copy
            del new_vol_values['volume_type']
            if new_type_id:
                new_vol_values['volume_type_id'] = new_type_id
            new_vol_values['host'] = host['host']
            new_vol_values['status'] = 'creating'
            new_vol_values['migration_status'] = 'target:%s' % volume['id']
            new_vol_values['attach_status'] = 'detached'
            new_volume = self.db.volume_create(ctxt, new_vol_values)
            rpcapi.create_volume(ctxt, new_volume, host['host'],
                                 None, None, allow_reschedule=False)
            #以上部分,在数据库中新增一条卷的记录,同时在目的存储节点上,创建一个指定存储类型的卷
            # Wait for new_volume to become ready
    		在指定的时间段内,检查目标存储节点上的卷,状态是否正常
            starttime = time.time()
            deadline = starttime + CONF.migration_create_volume_timeout_secs
            new_volume = self.db.volume_get(ctxt, new_volume['id'])
            tries = 0
            while new_volume['status'] != 'available':
                tries = tries + 1
                now = time.time()
                if new_volume['status'] == 'error':
                    msg = _("failed to create new_volume on destination host")
                    raise exception.VolumeMigrationFailed(reason=msg)
                elif now > deadline:
                    msg = _("timeout creating new_volume on destination host")
                    raise exception.VolumeMigrationFailed(reason=msg)
                else:
                    time.sleep(tries ** 2)
                new_volume = self.db.volume_get(ctxt, new_volume['id'])
    
            # Copy the source volume to the destination volume
            try:
                if (volume['instance_uuid'] is None and
                        volume['attached_host'] is None):
                    self.driver.copy_volume_data(ctxt, volume, new_volume,---这里的driver为 volume_driver 的值,对步骤六详解
                                                 remote='dest')
                    # The above call is synchronous so we complete the migration
                    self.migrate_volume_completion(ctxt, volume['id'],-----完成迁移的后续工作,删除源卷,更新数据库状态
                                                   new_volume['id'], error=False)
                else:
                    nova_api = compute.API()
                    # This is an async call to Nova, which will call the completion
                    # when it's done
                    nova_api.update_server_volume(ctxt, volume['instance_uuid'],
                                                  volume['id'], new_volume['id'])
            except Exception:
                with excutils.save_and_reraise_exception():
                    msg = _("Failed to copy volume %(vol1)s to %(vol2)s")
                    LOG.error(msg % {'vol1': volume['id'],
                                     'vol2': new_volume['id']})
                    volume = self.db.volume_get(ctxt, volume['id'])
                    # If we're in the completing phase don't delete the target
                    # because we may have already deleted the source!
                    if volume['migration_status'] == 'migrating':
                        rpcapi.delete_volume(ctxt, new_volume)
                    new_volume['migration_status'] = None
    对步骤六进行详解
    D:code-programcinder-codejunovolumedriver.py	
     self.driver的取值为volume_driver的值,该方法调用的是下面父类里面的方法
    核心功能是把目的节点上的卷,挂载到源卷的节点上,进行Linux dd方式的数据拷贝,在数据考完完成以后,卸载目的卷
    class VolumeDriver(object):
    
        def copy_volume_data(self, context, src_vol, dest_vol, remote=None):
            """Copy data from src_vol to dest_vol."""
            LOG.debug(('copy_data_between_volumes %(src)s -> %(dest)s.')
                      % {'src': src_vol['name'], 'dest': dest_vol['name']})
    
            properties = utils.brick_get_connector_properties()
            dest_remote = True if remote in ['dest', 'both'] else False
            dest_orig_status = dest_vol['status']
            try:
                dest_attach_info = self._attach_volume(context,
                                                       dest_vol,
                                                       properties,
                                                       remote=dest_remote)
            except Exception:
                with excutils.save_and_reraise_exception():
                    msg = _("Failed to attach volume %(vol)s")
                    LOG.error(msg % {'vol': dest_vol['id']})
                    self.db.volume_update(context, dest_vol['id'],
                                          {'status': dest_orig_status})
    
            src_remote = True if remote in ['src', 'both'] else False
            src_orig_status = src_vol['status']
            try:
                src_attach_info = self._attach_volume(context,
                                                      src_vol,
                                                      properties,
                                                      remote=src_remote)
            except Exception:
                with excutils.save_and_reraise_exception():
                    msg = _("Failed to attach volume %(vol)s")
                    LOG.error(msg % {'vol': src_vol['id']})
                    self.db.volume_update(context, src_vol['id'],
                                          {'status': src_orig_status})
                    self._detach_volume(context, dest_attach_info, dest_vol,
                                        properties, force=True, remote=dest_remote)
    
            copy_error = True
            mode = self.HOST_BASED
            key_values = {}
            try:
                size_in_mb = int(src_vol['size']) * 1024    # vol size is in GB
                src_device_path = src_attach_info['device']['path']
                dest_device_path = dest_attach_info['device']['path']
                if (not isinstance(src_device_path, six.string_types) or
                        not isinstance(dest_device_path, six.string_types)):
                    mode = self.FILE_BASED
                key_values = {src_vol['id']: mode,
                              src_vol['id'] + 'previous_progress': '0'}
                if mode == self.HOST_BASED:
                    key_values[src_vol['id'] + 'source'] = src_device_path
                    key_values[src_vol['id'] + 'dest'] = dest_device_path
                    key_values[src_vol['id'] + 'pid'] = None
                else:
                    key_values[src_vol['id'] + 'dest_handle'] = None
                self._add_migration_info_key(key_values)
    
                volume_utils.copy_volume(
                    src_device_path,
                    dest_device_path,
                    size_in_mb,
                    self.configuration.volume_dd_blocksize)
                copy_error = False
            except Exception:
                with excutils.save_and_reraise_exception():
                    msg = _("Failed to copy volume %(src)s to %(dest)s.")
                    LOG.error(msg % {'src': src_vol['id'], 'dest': dest_vol['id']})
            finally:
                self._detach_volume(context, dest_attach_info, dest_vol,
                                    properties, force=copy_error,
                                    remote=dest_remote)
                self._detach_volume(context, src_attach_info, src_vol,
                                    properties, force=copy_error,
                                    remote=src_remote)
    

      

     

  • 相关阅读:
    redhat yum替换成CentOS yum 并修改源
    C++11新特性实验
    常见的安装包制作程序installer
    如何在数据库中删除并添加唯一索引?
    springboot中的restTemplate工具类
    如何使用swagger(一)
    The POM for com.qingmu:entity:jar:1.0.0-SNAPSHOT is missing, no dependency information available
    java.lang.IllegalStateException: Found multiple @SpringBootConfiguration annotated classes
    在使用postman中配置返回html页面
    Springboot中设置返回数据的时间格式
  • 原文地址:https://www.cnblogs.com/potato-chip/p/12656875.html
Copyright © 2011-2022 走看看