# AB机新逻辑
"""
1. 查看是否存在有效的远程灾备计划
2. 先给计划创建一个空的任务。
3. 到A端中填充任务,构造成真正可以执行的任务。(在这个过程需要到A端释放以前的以前锁定的资源,锁定现在的资源)(原子性)
这个过程容易失败(失败后我们怎么处理较好呢??)
1. 需要释放上一次任务的资源停止之前的任务并等待结束
1. 重试到一定次数。
2. 重试完成后继续失败该怎么办??无限重试?(要不无限重试吧,这样也就不用释放资源了,但是之前到A端锁定的资源该怎么办呢????对了,
可以直接用特定的标志去释放,成功过去了就会释放,也就不用每一次失败都需要去释放了,并且不能保证一定能释放成功)这儿就这么处理
4. 执行远程容灾任务,该怎么去执行呢??首先需要明确远程容灾任务是要做什么吧?
1. 远程容灾任务要做什么?:
1. 是不是我在前面每一个任务对应的磁盘级别的任务的参数都准备好,这个时候之前去执行就好了
"""
import threading
import time
import json
import copy
import uuid
from apiv1 import models
from box_dashboard import xlogging
from django.db import transaction
_logger = xlogging.getLogger(__name__)
class RemoteScheduleThreading(threading.Thread):
def __init__(self):
super(RemoteScheduleThreading, self).__init__(name='RemoteScheduleThreading')
self.is_running = list()
def run(self):
while True:
try:
self.work()
except Exception as e:
_logger.debug('RemoteScheduleThreading Exception, because of {}'.format(e))
def work(self):
for remote_schedule in models.RemoteBackupSchedule.objects.filter(enabled=True, deleted=False):
if not self.check_remote_schedule_can_exc(remote_schedule):
continue
self.create_remote_task_empty(remote_schedule)
def check_remote_schedule_can_exc(self, remote_schedule):
"""
检测远程灾备计划此时是否可以执行
:param remote_schedule:
:return:
1. 需要判断计划是否被禁用或者删除
2. 判断计划类型,间隔类型查看是否到达执行时间,持续类型无条件继续执行,也就是不需要休眠
3. 需要判断是否已经存在执行中的任务(通过计划去找任务)
"""
# type: models.RemoteBackupSchedule, models.RemoteBackupTask
if not remote_schedule.enabled or remote_schedule.deleted:
return False
now = time.time()
if remote_schedule.get_remote_schedule_type == 'interval':
if now < remote_schedule.next_run_date:
return False
if self.is_have_running_task(remote_schedule):
return False
return True
def create_remote_task_empty(self, remote_schedule):
"""
创建远程容灾空任务,这个过程只在B端这边组装数据
:param remote_schedule:
:return:
创建远程空任务我们需要什么??
1. 需要上一次任务的信息。因为我们需要根据上一次任务的信息找到本次需要同步快照,那么找到本次需要同步的快照,需要什么去找呢??
1. 需要上一次同步的主机快照,
2. 需要上一次同步的时间点,
3. 需要同步的主机
4. 需要上一次任务的uuid(需要释放资源)
"""
# type: models.RemoteBackupTask, models.RemoteBackupSchedule
prev_info = self.fetch_prev_sync_info(remote_schedule)
remote_host_ident = remote_schedule.host.ident
remote_task = models.RemoteBackupTask.objects.create(schedule=remote_schedule)
current_info = self.generate_current_task_info(remote_task)
exc_config = {'prev_info': prev_info, 'remote_host_ident': remote_host_ident, 'current_info': current_info}
remote_task.ext_config = json.dumps(exc_config)
remote_task.save(update_fields=['ext_config', ])
@staticmethod
def generate_current_task_info(remote_task):
"""
生成本次任务的一些信息(task_uuid)
:param remote_task:
:return:
"""
task_uuid = uuid.uuid4().hex
lock_type = 'remote_schedule_{}'.format(remote_task.schedule.id)
return {'task_uuid': task_uuid, 'remote_task_id': remote_task.id, 'lock_type': lock_type}
@staticmethod
def fetch_prev_sync_info(remote_schedule):
"""
获取上一次同步的信息
:param remote_schedule:
:return:
"""
ext_config_dict = json.loads(remote_schedule.ext_config)
prev_info = ext_config_dict.get('current_info', dict())
return prev_info
@staticmethod
def is_have_running_task(remote_schedule):
"""
检测计划是否存在执行中的任务
:param remote_schedule:
:return:
"""
remote_task = remote_schedule.remote_backup_tasks.filter(finish_datetime__isnull=True)
if remote_task:
return True
return False
class RemoteBackupTaskMonitorThreading(threading.Thread):
def __init__(self):
super(RemoteBackupTaskMonitorThreading, self).__init__(name='RemoteBackupTaskMonitorThreading')
self.running_list = list()
def run(self):
while True:
try:
self.task_work()
except Exception as e:
_logger.debug('RemoteBackupTaskMonitorThreading Exception, because of {}'.format(e))
def task_work(self):
for remote_backup_task in models.RemoteBackupTask.objects.filter(finish_datetime__isnull=True):
self.clear_finish_task()
if self.is_running(remote_backup_task):
continue
remote_task_thread = RemoteBackupTaskThreading(remote_backup_task)
remote_task_thread.setDaemon(True)
remote_task_thread.start()
self.running_list.append(remote_task_thread)
def is_running(self, remote_backup_task):
for running_task_thread in self.running_list:
if running_task_thread.remote_backup_task == remote_backup_task:
if running_task_thread.is_alive():
return True
return False
def clear_finish_task(self):
_running_task = copy.copy(self.running_list)
for running_task_thread in _running_task:
if running_task_thread.is_alive():
continue
self.running_list.remove(running_task_thread)
class RemoteBackupTaskThreading(threading.Thread):
def __init__(self, remote_backup_task):
super(RemoteBackupTaskThreading, self).__init__(name='RemoteBackupTaskThreading')
self.remote_backup_task = remote_backup_task
self.running_sub_task_thread = list()
def run(self):
with transaction.atomic():
self.stop_all_sub_task_and_wait(self.remote_backup_task)
update_type, sync_info = self.query_need_sync_info_from_A() # 到A端去找需要同步的信息
self.check_prev_and_release_other_to_A()
self.update_local_database_info(update_type, sync_info)
# 此时子任务已经创建完毕,需要放到线程中去执行
self.exc_remote_sub_task()
self.finish_current_task()
def exc_remote_sub_task(self):
"""
通过remote_task找到对应的子任务,然后拿到队列中去执行?
:return:
"""
# type: models.RemoteBackupTask
for remote_sub_task in self.remote_backup_task.remote_backup_sub_tasks.all():
remote_sub_task_thread = RemoteSubTaskThreading(remote_sub_task)
remote_sub_task_thread.setDaemon(True)
remote_sub_task_thread.start()
self.running_sub_task_thread.append(remote_sub_task_thread)
for remote_sub_task_thread in self.running_sub_task_thread:
remote_sub_task_thread.join()
def finish_current_task(self):
"""
结束本次任务,更新任务,计划等信息
:return:
"""
def check_prev_and_release_other_to_A(self):
"""
到A端去释放资源和检测上一次的资源是否被意外释放
:return:
1. 需要上一次任务的信息和这次任务的信息
"""
ext_config_dict = json.loads(self.remote_backup_task.ext_config)
prev_info = ext_config_dict.get('prev_info', dict())
current_info = ext_config_dict['current_info']
return {'prev_info': prev_info, current_info: current_info}
def update_local_database_info(self, update_type, sync_info):
"""
通过A端查到的同步信息来更新本地的信息
:param update_type:
:param sync_info:
:return:
"""
self.update_local_hostsnapshot(update_type, sync_info)
self.update_local_disksnapshot(update_type, sync_info)
self.update_local_snapshot_storage(update_type, sync_info)
self.update_current_info_to_schedule_and_task(sync_info)
self.create_remote_sub_task(sync_info)
def update_local_hostsnapshot(self, update_type, sync_info):
"""
更新本地的主机快照级别的信息,可能是新建,可能是更新
:param update_type:
:param sync_info:
:return:
"""
def update_local_disksnapshot(self, update_type, sync_info):
"""
更新本地的磁盘快照级别的信息,可能是新建,可能是更新
:param update_type:
:param sync_info:
:return:
"""
def update_local_snapshot_storage(self, update_type, sync_info):
"""
更新本地的快照存储级别的信息,可能是新建,可能是更新
:param update_type:
:param sync_info:
:return:
"""
def update_current_info_to_schedule_and_task(self, sync_info):
"""
需要更新当前任务的一些信息
:param sync_info:
:return:
如 hostsnapshot,timestmap
"""
def create_remote_sub_task(self, sync_info):
"""
创建远程灾备子任务(一个磁盘一个任务)
:param sync_info:
:return:
1. 子任务执行需要(上一次任务的快照点,上一次同步到的timestamp,本次的快照点,本次的timestamp,本次任务对应的task_uuid)
因此我们从remote_task拿到上一次的信息prev_info和current_info, r
还需要生成一个task_uuid保存到子任务数据库记录中
"""
def stop_all_sub_task_and_wait(self, remote_task): #
"""
停止计划上一次任务对应的所有子任务,并等待结束,但是这个过程需要到A端去停止,是不是就有机会失败?
:param remote_task:
:return:
去A端停止子任务,我需要什么?其实这个时候只需要每一个子任务的task_uuid。
通过当前的任务找到上一次的RemotebackupTask, 然后找到其对应的所有子任务,最后将子任务的所有的task_uuid从数据库读取出来到A端停止并等待任务结束
"""
# type: models.RemoteBackupSubTask
def query_need_sync_info_from_A(self):
"""
到A端去找需要同步的信息
:return:
1. 需要知道上一次任务的信息(上一次的主机快照和时间点)
2. 需要知道远程主机
3. 需要知道本次同步的类型(间隔还是持续)
"""
# type: models.RemoteBackupTask
ext_config_dict = json.loads(self.remote_backup_task.ext_config)
prev_info = ext_config_dict.get('prev_info', dict())
current_task_uuid = ext_config_dict['current_info']['task_uuid']
current_lock_type = ext_config_dict['current_info']['lock_type']
remote_host_ident = ext_config_dict['remote_host_ident']
sync_type = self.get_sync_type()
return {'sync_type': sync_type, 'prev_info': prev_info, 'remote_host_ident': remote_host_ident,
'task_uuid': current_task_uuid, 'lock_type': current_lock_type}
def get_sync_type(self):
"""
根据任务找到计划,就可以知道同步的类型
:return:
"""
remote_schedule = self.remote_backup_task.schedule
return remote_schedule.get_remote_schedule_type
class RemoteSubTaskThreading(threading.Thread):
def __init__(self, remote_sub_task):
super(RemoteSubTaskThreading, self).__init__(name='RemoteSubTaskThreading')
self.remote_sub_task = remote_sub_task
def run(self):
self.check_schedule_valid()
self.start_remote_logic()
self.start_local_rs_module()
self.check_remote_logic()
self.stop_and_wait_task()
self.finish_sub_task()
def check_schedule_valid(self):
"""
检测该任务对应的计划是否还有效
:return:
"""
def start_remote_logic(self):
"""
开始远端的发送数据的任务, 把之前的和现在的任务的主机快照点和时刻,task_uuid传输到A端开始执行任务
:return:
"""
def start_local_rs_module(self):
"""
开始本地的接收数据的任务,
:return:
"""
def check_remote_logic(self):
"""
检测远端任务的状态,查看传输任务是否结束 task_uuid
:return:
"""
task = {'status': False}
while not task['status']:
time.sleep(5)
def stop_and_wait_task(self):
"""
停止A端的传输任务并等待停止成功,不成功就一直循环,直到停止成功,通过task_uuid
:return:
"""
while True:
try:
stop_and_wait_task(task_uuid) # 到A端停止任务
return
except Exception as e:
time.sleep(5)
_logger.info('stop_and_wait_task exception, because of {}'.format(e))
continue
def finish_sub_task(self):
"""
结束子任务,更新子任务数据库信息
:return:
"""
# A端
def query_need_sync_info(host_ident, prev_info, sync_type, lock_uuid, lock_type):
"""
找到本次需要同步的快照
:param host_ident:
:param prev_info:
:param sync_type:
:param lock_uuid
:return: 需要返回快照点之间的关系?和本次同步的信息,而至于是否为cdp类型可以在同步信息中得到
"""
prev_hostsnapshot = prev_info.get('hostsnapshot')
host_obj = models.Host.objects.filter(ident=host_ident)
if not host_obj:
return 'update', {}
if prev_hostsnapshot is None:
# 代表是第一次来,
# 不管什么同步类型,都直接通过主机找到最后的一个主机快照点?需要区分A端主机快照的类型吗?
pass
else:
if sync_type == 'interval':
# 间隔备份,
# 1. 通过主机找到最新的快照点
# 2. 和前一个任务所同步的信息作比较,(hostsnapshot 和 timestamp)一样,就返回空的同步信息回去如果比以前同步的新,
# 就需要使用lock_uuid锁住当前的主机快照点
pass
else:
# 持续备份
# 获取上一次同步到的快照点,通过上一次同步的快照点,找到离他最近的一个快照点,然后分别打开两个快照点,
# 1. 对于qcow类型算出他们的位图?然后同步,return create, 同步信息
# 2. 对于cdp类型需要找到两个快照点的关系
# 1. 追加 (在同一个链上,也在同一个主机快照上) return update ,同步信息
# 2. 增量 (在同一个链上,但不在同一个主机快照上) return create ,同步信息
# 3. 无关 (不在同一个链上) return create ,同步信息
pass
pass
def check_prev_and_release_other(prev_info, current_info):
"""
检测上一次任务和释放资源
:param prev_info:
:param current_info:
:return:
1. 通过lock_type找到所有的主机快照锁
2. 通过prev的task_uuid是否从主机快照锁找出数据,如果可以找到,就说明还被锁着的,没有找到呢?没有上一次任务不管
3. 释放除current_info和prev_info中通过task_uuid生成的锁
"""
lock_type = current_info['lock_type']
_ = prev_info
_ = lock_type