zoukankan      html  css  js  c++  java
  • AB逻辑

    # AB机新逻辑
    
    """
    1. 查看是否存在有效的远程灾备计划
    2. 先给计划创建一个空的任务。
    3. 到A端中填充任务,构造成真正可以执行的任务。(在这个过程需要到A端释放以前的以前锁定的资源,锁定现在的资源)(原子性)
    这个过程容易失败(失败后我们怎么处理较好呢??)
       1. 需要释放上一次任务的资源停止之前的任务并等待结束
       1. 重试到一定次数。
       2. 重试完成后继续失败该怎么办??无限重试?(要不无限重试吧,这样也就不用释放资源了,但是之前到A端锁定的资源该怎么办呢????对了,
       可以直接用特定的标志去释放,成功过去了就会释放,也就不用每一次失败都需要去释放了,并且不能保证一定能释放成功)这儿就这么处理
    4. 执行远程容灾任务,该怎么去执行呢??首先需要明确远程容灾任务是要做什么吧?
       1. 远程容灾任务要做什么?:
          1. 是不是我在前面每一个任务对应的磁盘级别的任务的参数都准备好,这个时候之前去执行就好了
    
    """
    import threading
    import time
    import json
    import copy
    import uuid
    from apiv1 import models
    from box_dashboard import xlogging
    from django.db import transaction
    
    _logger = xlogging.getLogger(__name__)
    
    
    class RemoteScheduleThreading(threading.Thread):
        def __init__(self):
            super(RemoteScheduleThreading, self).__init__(name='RemoteScheduleThreading')
            self.is_running = list()
    
        def run(self):
            while True:
                try:
                    self.work()
                except Exception as e:
                    _logger.debug('RemoteScheduleThreading Exception, because of {}'.format(e))
    
        def work(self):
            for remote_schedule in models.RemoteBackupSchedule.objects.filter(enabled=True, deleted=False):
                if not self.check_remote_schedule_can_exc(remote_schedule):
                    continue
                self.create_remote_task_empty(remote_schedule)
    
        def check_remote_schedule_can_exc(self, remote_schedule):
            """
            检测远程灾备计划此时是否可以执行
            :param remote_schedule:
            :return:
    
            1. 需要判断计划是否被禁用或者删除
            2. 判断计划类型,间隔类型查看是否到达执行时间,持续类型无条件继续执行,也就是不需要休眠
            3. 需要判断是否已经存在执行中的任务(通过计划去找任务)
            """
            # type: models.RemoteBackupSchedule, models.RemoteBackupTask
            if not remote_schedule.enabled or remote_schedule.deleted:
                return False
            now = time.time()
            if remote_schedule.get_remote_schedule_type == 'interval':
                if now < remote_schedule.next_run_date:
                    return False
            if self.is_have_running_task(remote_schedule):
                return False
            return True
    
        def create_remote_task_empty(self, remote_schedule):
            """
            创建远程容灾空任务,这个过程只在B端这边组装数据
            :param remote_schedule:
            :return:
    
            创建远程空任务我们需要什么??
            1. 需要上一次任务的信息。因为我们需要根据上一次任务的信息找到本次需要同步快照,那么找到本次需要同步的快照,需要什么去找呢??
               1. 需要上一次同步的主机快照,
               2. 需要上一次同步的时间点,
               3. 需要同步的主机
               4. 需要上一次任务的uuid(需要释放资源)
            """
            # type: models.RemoteBackupTask, models.RemoteBackupSchedule
            prev_info = self.fetch_prev_sync_info(remote_schedule)
            remote_host_ident = remote_schedule.host.ident
            remote_task = models.RemoteBackupTask.objects.create(schedule=remote_schedule)
            current_info = self.generate_current_task_info(remote_task)
            exc_config = {'prev_info': prev_info, 'remote_host_ident': remote_host_ident, 'current_info': current_info}
            remote_task.ext_config = json.dumps(exc_config)
            remote_task.save(update_fields=['ext_config', ])
    
        @staticmethod
        def generate_current_task_info(remote_task):
            """
            生成本次任务的一些信息(task_uuid)
            :param remote_task:
            :return:
            """
            task_uuid = uuid.uuid4().hex
            lock_type = 'remote_schedule_{}'.format(remote_task.schedule.id)
            return {'task_uuid': task_uuid, 'remote_task_id': remote_task.id, 'lock_type': lock_type}
    
        @staticmethod
        def fetch_prev_sync_info(remote_schedule):
            """
            获取上一次同步的信息
            :param remote_schedule:
            :return:
            """
            ext_config_dict = json.loads(remote_schedule.ext_config)
            prev_info = ext_config_dict.get('current_info', dict())
            return prev_info
    
        @staticmethod
        def is_have_running_task(remote_schedule):
            """
            检测计划是否存在执行中的任务
            :param remote_schedule:
            :return:
            """
            remote_task = remote_schedule.remote_backup_tasks.filter(finish_datetime__isnull=True)
            if remote_task:
                return True
            return False
    
    
    class RemoteBackupTaskMonitorThreading(threading.Thread):
        def __init__(self):
            super(RemoteBackupTaskMonitorThreading, self).__init__(name='RemoteBackupTaskMonitorThreading')
            self.running_list = list()
    
        def run(self):
            while True:
                try:
                    self.task_work()
                except Exception as e:
                    _logger.debug('RemoteBackupTaskMonitorThreading Exception, because of {}'.format(e))
    
        def task_work(self):
            for remote_backup_task in models.RemoteBackupTask.objects.filter(finish_datetime__isnull=True):
                self.clear_finish_task()
                if self.is_running(remote_backup_task):
                    continue
                remote_task_thread = RemoteBackupTaskThreading(remote_backup_task)
                remote_task_thread.setDaemon(True)
                remote_task_thread.start()
                self.running_list.append(remote_task_thread)
    
        def is_running(self, remote_backup_task):
            for running_task_thread in self.running_list:
                if running_task_thread.remote_backup_task == remote_backup_task:
                    if running_task_thread.is_alive():
                        return True
            return False
    
        def clear_finish_task(self):
            _running_task = copy.copy(self.running_list)
            for running_task_thread in _running_task:
                if running_task_thread.is_alive():
                    continue
                self.running_list.remove(running_task_thread)
    
    
    class RemoteBackupTaskThreading(threading.Thread):
        def __init__(self, remote_backup_task):
            super(RemoteBackupTaskThreading, self).__init__(name='RemoteBackupTaskThreading')
            self.remote_backup_task = remote_backup_task
            self.running_sub_task_thread = list()
    
        def run(self):
            with transaction.atomic():
                self.stop_all_sub_task_and_wait(self.remote_backup_task)
                update_type, sync_info = self.query_need_sync_info_from_A()  # 到A端去找需要同步的信息
                self.check_prev_and_release_other_to_A()
                self.update_local_database_info(update_type, sync_info)
            # 此时子任务已经创建完毕,需要放到线程中去执行
            self.exc_remote_sub_task()
            self.finish_current_task()
    
        def exc_remote_sub_task(self):
            """
            通过remote_task找到对应的子任务,然后拿到队列中去执行?
            :return:
            """
            # type: models.RemoteBackupTask
            for remote_sub_task in self.remote_backup_task.remote_backup_sub_tasks.all():
                remote_sub_task_thread = RemoteSubTaskThreading(remote_sub_task)
                remote_sub_task_thread.setDaemon(True)
                remote_sub_task_thread.start()
                self.running_sub_task_thread.append(remote_sub_task_thread)
    
            for remote_sub_task_thread in self.running_sub_task_thread:
                remote_sub_task_thread.join()
    
        def finish_current_task(self):
            """
            结束本次任务,更新任务,计划等信息
            :return:
            """
    
        def check_prev_and_release_other_to_A(self):
            """
            到A端去释放资源和检测上一次的资源是否被意外释放
            :return:
            1. 需要上一次任务的信息和这次任务的信息
            """
            ext_config_dict = json.loads(self.remote_backup_task.ext_config)
            prev_info = ext_config_dict.get('prev_info', dict())
            current_info = ext_config_dict['current_info']
            return {'prev_info': prev_info, current_info: current_info}
    
        def update_local_database_info(self, update_type, sync_info):
            """
            通过A端查到的同步信息来更新本地的信息
            :param update_type:
            :param sync_info:
            :return:
            """
            self.update_local_hostsnapshot(update_type, sync_info)
            self.update_local_disksnapshot(update_type, sync_info)
            self.update_local_snapshot_storage(update_type, sync_info)
            self.update_current_info_to_schedule_and_task(sync_info)
            self.create_remote_sub_task(sync_info)
    
        def update_local_hostsnapshot(self, update_type, sync_info):
            """
            更新本地的主机快照级别的信息,可能是新建,可能是更新
            :param update_type:
            :param sync_info:
            :return:
            """
    
        def update_local_disksnapshot(self, update_type, sync_info):
            """
             更新本地的磁盘快照级别的信息,可能是新建,可能是更新
            :param update_type:
            :param sync_info:
            :return:
            """
    
        def update_local_snapshot_storage(self, update_type, sync_info):
            """
            更新本地的快照存储级别的信息,可能是新建,可能是更新
            :param update_type:
            :param sync_info:
            :return:
            """
    
        def update_current_info_to_schedule_and_task(self, sync_info):
            """
            需要更新当前任务的一些信息
            :param sync_info:
            :return:
            如 hostsnapshot,timestmap
            """
    
        def create_remote_sub_task(self, sync_info):
            """
            创建远程灾备子任务(一个磁盘一个任务)
            :param sync_info:
            :return:
            1. 子任务执行需要(上一次任务的快照点,上一次同步到的timestamp,本次的快照点,本次的timestamp,本次任务对应的task_uuid)
            因此我们从remote_task拿到上一次的信息prev_info和current_info, r
            还需要生成一个task_uuid保存到子任务数据库记录中
            """
    
        def stop_all_sub_task_and_wait(self, remote_task):  #
            """
            停止计划上一次任务对应的所有子任务,并等待结束,但是这个过程需要到A端去停止,是不是就有机会失败?
            :param remote_task:
            :return:
            去A端停止子任务,我需要什么?其实这个时候只需要每一个子任务的task_uuid。
            通过当前的任务找到上一次的RemotebackupTask, 然后找到其对应的所有子任务,最后将子任务的所有的task_uuid从数据库读取出来到A端停止并等待任务结束
    
            """
            # type: models.RemoteBackupSubTask
    
        def query_need_sync_info_from_A(self):
            """
            到A端去找需要同步的信息
            :return:
            1. 需要知道上一次任务的信息(上一次的主机快照和时间点)
            2. 需要知道远程主机
            3. 需要知道本次同步的类型(间隔还是持续)
            """
            # type: models.RemoteBackupTask
            ext_config_dict = json.loads(self.remote_backup_task.ext_config)
            prev_info = ext_config_dict.get('prev_info', dict())
            current_task_uuid = ext_config_dict['current_info']['task_uuid']
            current_lock_type = ext_config_dict['current_info']['lock_type']
            remote_host_ident = ext_config_dict['remote_host_ident']
            sync_type = self.get_sync_type()
            return {'sync_type': sync_type, 'prev_info': prev_info, 'remote_host_ident': remote_host_ident,
                    'task_uuid': current_task_uuid, 'lock_type': current_lock_type}
    
        def get_sync_type(self):
            """
            根据任务找到计划,就可以知道同步的类型
            :return:
            """
            remote_schedule = self.remote_backup_task.schedule
            return remote_schedule.get_remote_schedule_type
    
    
    class RemoteSubTaskThreading(threading.Thread):
        def __init__(self, remote_sub_task):
            super(RemoteSubTaskThreading, self).__init__(name='RemoteSubTaskThreading')
            self.remote_sub_task = remote_sub_task
    
        def run(self):
            self.check_schedule_valid()
            self.start_remote_logic()
            self.start_local_rs_module()
            self.check_remote_logic()
            self.stop_and_wait_task()
            self.finish_sub_task()
    
        def check_schedule_valid(self):
            """
            检测该任务对应的计划是否还有效
            :return:
            """
    
        def start_remote_logic(self):
            """
            开始远端的发送数据的任务, 把之前的和现在的任务的主机快照点和时刻,task_uuid传输到A端开始执行任务
            :return:
            """
    
        def start_local_rs_module(self):
            """
            开始本地的接收数据的任务,
            :return:
            """
    
        def check_remote_logic(self):
            """
            检测远端任务的状态,查看传输任务是否结束 task_uuid
            :return:
            """
            task = {'status': False}
            while not task['status']:
                time.sleep(5)
    
        def stop_and_wait_task(self):
            """
            停止A端的传输任务并等待停止成功,不成功就一直循环,直到停止成功,通过task_uuid
            :return:
            """
            while True:
                try:
                    stop_and_wait_task(task_uuid)  # 到A端停止任务
                    return
                except Exception as e:
                    time.sleep(5)
                    _logger.info('stop_and_wait_task exception, because of {}'.format(e))
                    continue
    
        def finish_sub_task(self):
            """
            结束子任务,更新子任务数据库信息
            :return:
            """
    
    
    # A端
    
    def query_need_sync_info(host_ident, prev_info, sync_type, lock_uuid, lock_type):
        """
        找到本次需要同步的快照
        :param host_ident:
        :param prev_info:
        :param sync_type:
        :param lock_uuid
        :return: 需要返回快照点之间的关系?和本次同步的信息,而至于是否为cdp类型可以在同步信息中得到
        """
        prev_hostsnapshot = prev_info.get('hostsnapshot')
        host_obj = models.Host.objects.filter(ident=host_ident)
        if not host_obj:
            return 'update', {}
        if prev_hostsnapshot is None:
            # 代表是第一次来,
            # 不管什么同步类型,都直接通过主机找到最后的一个主机快照点?需要区分A端主机快照的类型吗?
            pass
        else:
            if sync_type == 'interval':
                # 间隔备份,
                # 1. 通过主机找到最新的快照点
                # 2. 和前一个任务所同步的信息作比较,(hostsnapshot 和 timestamp)一样,就返回空的同步信息回去如果比以前同步的新,
                #    就需要使用lock_uuid锁住当前的主机快照点
                pass
            else:
                # 持续备份
                # 获取上一次同步到的快照点,通过上一次同步的快照点,找到离他最近的一个快照点,然后分别打开两个快照点,
                # 1. 对于qcow类型算出他们的位图?然后同步,return create, 同步信息
                # 2. 对于cdp类型需要找到两个快照点的关系
                #    1. 追加 (在同一个链上,也在同一个主机快照上) return update ,同步信息
                #    2. 增量 (在同一个链上,但不在同一个主机快照上) return create ,同步信息
                #    3. 无关 (不在同一个链上) return create ,同步信息
                pass
            pass
    
    
    def check_prev_and_release_other(prev_info, current_info):
        """
        检测上一次任务和释放资源
        :param prev_info:
        :param current_info:
        :return:
        1. 通过lock_type找到所有的主机快照锁
        2. 通过prev的task_uuid是否从主机快照锁找出数据,如果可以找到,就说明还被锁着的,没有找到呢?没有上一次任务不管
        3. 释放除current_info和prev_info中通过task_uuid生成的锁
        """
        lock_type = current_info['lock_type']
        _ = prev_info
        _ = lock_type
    
    坚持不一定成功,但放弃必定失败。
  • 相关阅读:
    魔术球问题
    【模板】网络最大流
    [SCOI2010]股票交易
    [SCOI2009]生日礼物
    [HAOI2007]修筑绿化带
    [HAOI2007]理想的正方形
    [USACO12MAR]花盆Flowerpot
    滑动窗口
    斐波那契公约数
    [SDOI2008]仪仗队
  • 原文地址:https://www.cnblogs.com/bao9687426/p/13184277.html
Copyright © 2011-2022 走看看