# AB机新逻辑 """ 1. 查看是否存在有效的远程灾备计划 2. 先给计划创建一个空的任务。 3. 到A端中填充任务,构造成真正可以执行的任务。(在这个过程需要到A端释放以前的以前锁定的资源,锁定现在的资源)(原子性) 这个过程容易失败(失败后我们怎么处理较好呢??) 1. 需要释放上一次任务的资源停止之前的任务并等待结束 1. 重试到一定次数。 2. 重试完成后继续失败该怎么办??无限重试?(要不无限重试吧,这样也就不用释放资源了,但是之前到A端锁定的资源该怎么办呢????对了, 可以直接用特定的标志去释放,成功过去了就会释放,也就不用每一次失败都需要去释放了,并且不能保证一定能释放成功)这儿就这么处理 4. 执行远程容灾任务,该怎么去执行呢??首先需要明确远程容灾任务是要做什么吧? 1. 远程容灾任务要做什么?: 1. 是不是我在前面每一个任务对应的磁盘级别的任务的参数都准备好,这个时候之前去执行就好了 """ import threading import time import json import copy import uuid from apiv1 import models from box_dashboard import xlogging from django.db import transaction _logger = xlogging.getLogger(__name__) class RemoteScheduleThreading(threading.Thread): def __init__(self): super(RemoteScheduleThreading, self).__init__(name='RemoteScheduleThreading') self.is_running = list() def run(self): while True: try: self.work() except Exception as e: _logger.debug('RemoteScheduleThreading Exception, because of {}'.format(e)) def work(self): for remote_schedule in models.RemoteBackupSchedule.objects.filter(enabled=True, deleted=False): if not self.check_remote_schedule_can_exc(remote_schedule): continue self.create_remote_task_empty(remote_schedule) def check_remote_schedule_can_exc(self, remote_schedule): """ 检测远程灾备计划此时是否可以执行 :param remote_schedule: :return: 1. 需要判断计划是否被禁用或者删除 2. 判断计划类型,间隔类型查看是否到达执行时间,持续类型无条件继续执行,也就是不需要休眠 3. 需要判断是否已经存在执行中的任务(通过计划去找任务) """ # type: models.RemoteBackupSchedule, models.RemoteBackupTask if not remote_schedule.enabled or remote_schedule.deleted: return False now = time.time() if remote_schedule.get_remote_schedule_type == 'interval': if now < remote_schedule.next_run_date: return False if self.is_have_running_task(remote_schedule): return False return True def create_remote_task_empty(self, remote_schedule): """ 创建远程容灾空任务,这个过程只在B端这边组装数据 :param remote_schedule: :return: 创建远程空任务我们需要什么?? 1. 需要上一次任务的信息。因为我们需要根据上一次任务的信息找到本次需要同步快照,那么找到本次需要同步的快照,需要什么去找呢?? 1. 需要上一次同步的主机快照, 2. 需要上一次同步的时间点, 3. 需要同步的主机 4. 需要上一次任务的uuid(需要释放资源) """ # type: models.RemoteBackupTask, models.RemoteBackupSchedule prev_info = self.fetch_prev_sync_info(remote_schedule) remote_host_ident = remote_schedule.host.ident remote_task = models.RemoteBackupTask.objects.create(schedule=remote_schedule) current_info = self.generate_current_task_info(remote_task) exc_config = {'prev_info': prev_info, 'remote_host_ident': remote_host_ident, 'current_info': current_info} remote_task.ext_config = json.dumps(exc_config) remote_task.save(update_fields=['ext_config', ]) @staticmethod def generate_current_task_info(remote_task): """ 生成本次任务的一些信息(task_uuid) :param remote_task: :return: """ task_uuid = uuid.uuid4().hex lock_type = 'remote_schedule_{}'.format(remote_task.schedule.id) return {'task_uuid': task_uuid, 'remote_task_id': remote_task.id, 'lock_type': lock_type} @staticmethod def fetch_prev_sync_info(remote_schedule): """ 获取上一次同步的信息 :param remote_schedule: :return: """ ext_config_dict = json.loads(remote_schedule.ext_config) prev_info = ext_config_dict.get('current_info', dict()) return prev_info @staticmethod def is_have_running_task(remote_schedule): """ 检测计划是否存在执行中的任务 :param remote_schedule: :return: """ remote_task = remote_schedule.remote_backup_tasks.filter(finish_datetime__isnull=True) if remote_task: return True return False class RemoteBackupTaskMonitorThreading(threading.Thread): def __init__(self): super(RemoteBackupTaskMonitorThreading, self).__init__(name='RemoteBackupTaskMonitorThreading') self.running_list = list() def run(self): while True: try: self.task_work() except Exception as e: _logger.debug('RemoteBackupTaskMonitorThreading Exception, because of {}'.format(e)) def task_work(self): for remote_backup_task in models.RemoteBackupTask.objects.filter(finish_datetime__isnull=True): self.clear_finish_task() if self.is_running(remote_backup_task): continue remote_task_thread = RemoteBackupTaskThreading(remote_backup_task) remote_task_thread.setDaemon(True) remote_task_thread.start() self.running_list.append(remote_task_thread) def is_running(self, remote_backup_task): for running_task_thread in self.running_list: if running_task_thread.remote_backup_task == remote_backup_task: if running_task_thread.is_alive(): return True return False def clear_finish_task(self): _running_task = copy.copy(self.running_list) for running_task_thread in _running_task: if running_task_thread.is_alive(): continue self.running_list.remove(running_task_thread) class RemoteBackupTaskThreading(threading.Thread): def __init__(self, remote_backup_task): super(RemoteBackupTaskThreading, self).__init__(name='RemoteBackupTaskThreading') self.remote_backup_task = remote_backup_task self.running_sub_task_thread = list() def run(self): with transaction.atomic(): self.stop_all_sub_task_and_wait(self.remote_backup_task) update_type, sync_info = self.query_need_sync_info_from_A() # 到A端去找需要同步的信息 self.check_prev_and_release_other_to_A() self.update_local_database_info(update_type, sync_info) # 此时子任务已经创建完毕,需要放到线程中去执行 self.exc_remote_sub_task() self.finish_current_task() def exc_remote_sub_task(self): """ 通过remote_task找到对应的子任务,然后拿到队列中去执行? :return: """ # type: models.RemoteBackupTask for remote_sub_task in self.remote_backup_task.remote_backup_sub_tasks.all(): remote_sub_task_thread = RemoteSubTaskThreading(remote_sub_task) remote_sub_task_thread.setDaemon(True) remote_sub_task_thread.start() self.running_sub_task_thread.append(remote_sub_task_thread) for remote_sub_task_thread in self.running_sub_task_thread: remote_sub_task_thread.join() def finish_current_task(self): """ 结束本次任务,更新任务,计划等信息 :return: """ def check_prev_and_release_other_to_A(self): """ 到A端去释放资源和检测上一次的资源是否被意外释放 :return: 1. 需要上一次任务的信息和这次任务的信息 """ ext_config_dict = json.loads(self.remote_backup_task.ext_config) prev_info = ext_config_dict.get('prev_info', dict()) current_info = ext_config_dict['current_info'] return {'prev_info': prev_info, current_info: current_info} def update_local_database_info(self, update_type, sync_info): """ 通过A端查到的同步信息来更新本地的信息 :param update_type: :param sync_info: :return: """ self.update_local_hostsnapshot(update_type, sync_info) self.update_local_disksnapshot(update_type, sync_info) self.update_local_snapshot_storage(update_type, sync_info) self.update_current_info_to_schedule_and_task(sync_info) self.create_remote_sub_task(sync_info) def update_local_hostsnapshot(self, update_type, sync_info): """ 更新本地的主机快照级别的信息,可能是新建,可能是更新 :param update_type: :param sync_info: :return: """ def update_local_disksnapshot(self, update_type, sync_info): """ 更新本地的磁盘快照级别的信息,可能是新建,可能是更新 :param update_type: :param sync_info: :return: """ def update_local_snapshot_storage(self, update_type, sync_info): """ 更新本地的快照存储级别的信息,可能是新建,可能是更新 :param update_type: :param sync_info: :return: """ def update_current_info_to_schedule_and_task(self, sync_info): """ 需要更新当前任务的一些信息 :param sync_info: :return: 如 hostsnapshot,timestmap """ def create_remote_sub_task(self, sync_info): """ 创建远程灾备子任务(一个磁盘一个任务) :param sync_info: :return: 1. 子任务执行需要(上一次任务的快照点,上一次同步到的timestamp,本次的快照点,本次的timestamp,本次任务对应的task_uuid) 因此我们从remote_task拿到上一次的信息prev_info和current_info, r 还需要生成一个task_uuid保存到子任务数据库记录中 """ def stop_all_sub_task_and_wait(self, remote_task): # """ 停止计划上一次任务对应的所有子任务,并等待结束,但是这个过程需要到A端去停止,是不是就有机会失败? :param remote_task: :return: 去A端停止子任务,我需要什么?其实这个时候只需要每一个子任务的task_uuid。 通过当前的任务找到上一次的RemotebackupTask, 然后找到其对应的所有子任务,最后将子任务的所有的task_uuid从数据库读取出来到A端停止并等待任务结束 """ # type: models.RemoteBackupSubTask def query_need_sync_info_from_A(self): """ 到A端去找需要同步的信息 :return: 1. 需要知道上一次任务的信息(上一次的主机快照和时间点) 2. 需要知道远程主机 3. 需要知道本次同步的类型(间隔还是持续) """ # type: models.RemoteBackupTask ext_config_dict = json.loads(self.remote_backup_task.ext_config) prev_info = ext_config_dict.get('prev_info', dict()) current_task_uuid = ext_config_dict['current_info']['task_uuid'] current_lock_type = ext_config_dict['current_info']['lock_type'] remote_host_ident = ext_config_dict['remote_host_ident'] sync_type = self.get_sync_type() return {'sync_type': sync_type, 'prev_info': prev_info, 'remote_host_ident': remote_host_ident, 'task_uuid': current_task_uuid, 'lock_type': current_lock_type} def get_sync_type(self): """ 根据任务找到计划,就可以知道同步的类型 :return: """ remote_schedule = self.remote_backup_task.schedule return remote_schedule.get_remote_schedule_type class RemoteSubTaskThreading(threading.Thread): def __init__(self, remote_sub_task): super(RemoteSubTaskThreading, self).__init__(name='RemoteSubTaskThreading') self.remote_sub_task = remote_sub_task def run(self): self.check_schedule_valid() self.start_remote_logic() self.start_local_rs_module() self.check_remote_logic() self.stop_and_wait_task() self.finish_sub_task() def check_schedule_valid(self): """ 检测该任务对应的计划是否还有效 :return: """ def start_remote_logic(self): """ 开始远端的发送数据的任务, 把之前的和现在的任务的主机快照点和时刻,task_uuid传输到A端开始执行任务 :return: """ def start_local_rs_module(self): """ 开始本地的接收数据的任务, :return: """ def check_remote_logic(self): """ 检测远端任务的状态,查看传输任务是否结束 task_uuid :return: """ task = {'status': False} while not task['status']: time.sleep(5) def stop_and_wait_task(self): """ 停止A端的传输任务并等待停止成功,不成功就一直循环,直到停止成功,通过task_uuid :return: """ while True: try: stop_and_wait_task(task_uuid) # 到A端停止任务 return except Exception as e: time.sleep(5) _logger.info('stop_and_wait_task exception, because of {}'.format(e)) continue def finish_sub_task(self): """ 结束子任务,更新子任务数据库信息 :return: """ # A端 def query_need_sync_info(host_ident, prev_info, sync_type, lock_uuid, lock_type): """ 找到本次需要同步的快照 :param host_ident: :param prev_info: :param sync_type: :param lock_uuid :return: 需要返回快照点之间的关系?和本次同步的信息,而至于是否为cdp类型可以在同步信息中得到 """ prev_hostsnapshot = prev_info.get('hostsnapshot') host_obj = models.Host.objects.filter(ident=host_ident) if not host_obj: return 'update', {} if prev_hostsnapshot is None: # 代表是第一次来, # 不管什么同步类型,都直接通过主机找到最后的一个主机快照点?需要区分A端主机快照的类型吗? pass else: if sync_type == 'interval': # 间隔备份, # 1. 通过主机找到最新的快照点 # 2. 和前一个任务所同步的信息作比较,(hostsnapshot 和 timestamp)一样,就返回空的同步信息回去如果比以前同步的新, # 就需要使用lock_uuid锁住当前的主机快照点 pass else: # 持续备份 # 获取上一次同步到的快照点,通过上一次同步的快照点,找到离他最近的一个快照点,然后分别打开两个快照点, # 1. 对于qcow类型算出他们的位图?然后同步,return create, 同步信息 # 2. 对于cdp类型需要找到两个快照点的关系 # 1. 追加 (在同一个链上,也在同一个主机快照上) return update ,同步信息 # 2. 增量 (在同一个链上,但不在同一个主机快照上) return create ,同步信息 # 3. 无关 (不在同一个链上) return create ,同步信息 pass pass def check_prev_and_release_other(prev_info, current_info): """ 检测上一次任务和释放资源 :param prev_info: :param current_info: :return: 1. 通过lock_type找到所有的主机快照锁 2. 通过prev的task_uuid是否从主机快照锁找出数据,如果可以找到,就说明还被锁着的,没有找到呢?没有上一次任务不管 3. 释放除current_info和prev_info中通过task_uuid生成的锁 """ lock_type = current_info['lock_type'] _ = prev_info _ = lock_type