zoukankan      html  css  js  c++  java
  • TB级NFS数据平滑迁移方案设计与实现

    平台原来挂载的/mnt/ccdbfs不变,由于里面有很多软链接,因此如果迁移后把平台代码里面的存储路径改了的话,万一有没改的地方就不好处理了。

    故先设计双写,保证在复制老数据的时候, /mnt/ccdbfs 和/mnt/mfs同时写入新数据,保证数据一致性。等全部复制完后,观察没有diff了,再切换挂载,让 /mnt/ccdbfs 挂载到新搭建的 moosefs 主服务器。

    inotify文件系统监控机制。
    rsync是linux系统下的数据镜像备份工具。使用快速增量备份工具Sync可以远程同步,支持本地复制,或者与其他SSH、rsync主机同步。

    有用户组和权限双重问题,后面临时修改系统cp命令,完全切换玩后又切回去。

    下文前两种方案,是最开始调研的网上大多数人使用的方案修改出来的,但是实测后发现,对超大目录的同步非常无力,光inotify就要扫很久,并且扫完后,对新文件的检测也会出现问题。故最后是用最后一种方案,修改了老nfs服务的日志级别,从中捞取文件变化,引入延迟同步+diff无则同步策略。

    下文出现ccdbfs的地方代表老nfs服务,mfs代表新nfs服务。

    inotify+rsync

    对于文件里量不大的目录,复制起来快,且代码量少。

    但是对超大目录,扫码起来无力。且事件感知会出现重复。

    copy_ccdbfs_update_to_mfs.sh

    # rsync auto sync script with inotify
    # 2019/03/21
    
    # configs
    source_path=/mnt/ccdbfs/
    target_path=/mnt/mfs/
    rsync_bin=/usr/bin/rsync
    INOTIFY_EXCLUDE='aladata.*'
    
    
    cd $(dirname $0)
    
    RSYNC_EXCLUDE=$(pwd)/rsync_exclude.lst
    
    current_date=$(date +%Y-%m-%d_%H%M%S)
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)
    
    log_file=$(pwd)/runcopy.log.${current_date}
    inotifywait_bin=$(pwd)/inotifywait
    
    inotify_fun(){
        ${inotifywait_bin} -mrq --timefmt '%Y/%m/%d-%H:%M:%S' --format '%T %w %f' 
         --exclude ${INOTIFY_EXCLUDE} -e modify,create,move,attrib ${source_path} 
        | while read file
            do
                ${rsync_bin} -auvrtzopgP --exclude-from=${RSYNC_EXCLUDE} --progress --bwlimit=800000 ${source_path} ${target_path}
            done
    }
    
    #inotify log
    inotify_fun &> ${log_file} &
    
    

    inotify+自定义复制脚本

    inotify启一个光监控文件新增修改事件的脚本,导入到change_file.list。

    自定义复制脚本去读取并且逻辑控制后复制。
    inotify_ccdbfs_change.sh

    # find ccdbfs change with inotify, export to file
    # 2019/03/21
    
    # configs
    source_path=/mnt/ccdbfs/
    INOTIFY_EXCLUDE='.*(.swx|.swp)$|aladata.*'
    
    cd $(dirname $0)
    current_date=$(date +%Y-%m-%d_%H%M%S)
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)
    inotifywait_bin=$(pwd)/inotifywait
    
    ${inotifywait_bin} -mrq --timefmt '%Y/%m/%d-%H:%M:%S' --format '%w%f %e %T' 
        --exclude ${INOTIFY_EXCLUDE} -e create,close_write ${source_path} 
    | while read line
        do
            echo ${line} >> $(pwd)/change_file.list
        done &
    
    

    change_file.list

    /mnt/mfs/dirtest4 CREATE,ISDIR 2019/03/21-22:30:06
    /mnt/mfs/dirtest4/5 CREATE,ISDIR 2019/03/21-22:30:06
    /mnt/mfs/a.test CLOSE_WRITE,CLOSE 2019/03/21-22:30:17
    /mnt/mfs/c.test CLOSE_WRITE,CLOSE 2019/03/21-22:30:24
    /mnt/mfs/aa CREATE 2019/03/21-22:30:45
    /mnt/mfs/aa CLOSE_WRITE,CLOSE 2019/03/21-22:30:45
    

    inotify_ccdbfs_change_cp.py

    # -*- coding: utf-8 -*-
    # 持续读取inotify_ccdbfs_change.sh监控到的变化文件change_file.list,从ccdbfs同步到mfs
    # 2019/03/22
    import os
    import time
    
    
    SOURCE_ROOT = '/mnt/ccdbfs'
    TARGET_ROOT = '/mnt/mfs'
    
    def deal_ori_change_file_list():
        '''获取change_file.list文件得到变化文件list
           顺便追加change_file.list到change_file.bak
           然后清空change_file.list
        '''
        change_file_path = os.getcwd() + '/change_file.list'
        change_file_back_path = os.getcwd() + '/change_file.bak'
        with open(change_file_path) as f:
            row_list = f.read().splitlines()
        # 追加备份
        os.system('echo "" >> ' + change_file_back_path)  # 相当于换行
        os.system('cat ' + change_file_path + ' >> ' + change_file_back_path)
    
        # 清空原change_file.list
        os.system('cat /dev/null > ' + change_file_path)
    
        return row_list
    
    
    def uniq_row_list(row_list):
        '''对获取到的原始row_list拆分和保序去重
           返回 dir_list, file_list
        '''
        dir_list = []
        file_list = []
    
        for row in row_list:
            try:
                obj, effect, _ = row.split(' ')
                if 'ISDIR' in effect:
                    if obj not in dir_list:
                        dir_list.append(obj)
                else:
                    if obj not in file_list:
                        file_list.append(obj)
            except:
                print(str(time.time()) + 'uniq_row_list except')
    
        return dir_list, file_list
    
    
    def copy_change(dir_list, file_list):
        '''在mfs中复制ccdbfs中的变化
           先复制创建目录,再复制
        '''
        # 优先在mfs中创建没有的目录,再去复制文件
        for a_dir in dir_list:
            dst = a_dir.replace(SOURCE_ROOT, TARGET_ROOT)
            if os.path.exists(dst) == False:
                os.system('mkdir -p ' + dst)
        
        for a_file in file_list:
            dst_file = a_file.replace(SOURCE_ROOT, TARGET_ROOT)
            # 防止目标地址目录没创建
            (file_path,file_name) = os.path.split(dst_file)
            if os.path.exists(file_path) == False:
                os.system('mkdir -p ' + file_path)
            # 复制文件或者软链接
            os.system('cp -d ' + a_file + ' ' + dst_file)
    
    
    if __name__ == '__main__':
        last_time = 0
        while True:
            change_file_path = os.getcwd() + '/change_file.list'
            change_file_back_path = os.getcwd() + '/change_file.bak'
            # 获取change_file.list文件得到变化文件list
            with open(change_file_path) as f:
                row_list = f.read().splitlines()
    
            # 执行备份条件是 变化记录满100条 或 间隔5s且有变化记录
            # 所以当没满100条,且没满5s的时候,或者满了5s但是没有变化记录,不进行后面的备份
            if len(row_list) < 100:
                if time.time() - last_time < 5:
                    print('row_list < 100 and delta_time < 5')
                    time.sleep(1)
                    continue
                elif len(row_list) == 0:
                    print('delta_time > 5 but row_list == 0')
                    time.sleep(1)
                    continue
    
            # 满足条件,开始备份
            print('cp start')
            last_time = time.time()
            # 追加备份历史记录
            os.system('echo "" >> ' + change_file_back_path)  # 相当于换行
            os.system('cat ' + change_file_path + ' >> ' + change_file_back_path)
            # 清空原change_file.list
            os.system('cat /dev/null > ' + change_file_path)
    
            dir_list, file_list = uniq_row_list(row_list)
            print('dir_list: ', dir_list)
            print('file_list: ', file_list)
            copy_change(dir_list, file_list)
            time.sleep(1)
    
    

    增量双写,diff复制策略(实操方案)

    捞取老nfs日志+自定义双写脚本

    cp_ccdbfs_change.py 增量更新同步双写脚本。集群业务机器,有对老ccdbfs写入的都启一个。

    对于捞取到的文件变化日志进行双重筛选判断,因:

    • 测试发现,复制没问题,但是清理复制过的记录的时候,要去查询一下文件的修改时间,又产生了一条读的操作,还是一模一样的日志,又会再删除后新添加进全局map去,所以这样的话就是无限循环删不掉。
    • 打算修改方案,在复制后,不把那条记录删除,而是把存的log时间改为0,当轮询线程取出来的记录,时间为0,直接跳过,不去查询修改时间了。
    • 之后如果该文件再次修改,触发了日志后,会看如果时间为0,证明上次复制过了,但是现在又新修改了,那么重新刷成触发日志的新时间。
    • 顺便,如果是目录,时间存成1,也不再处理。
    • 内存应该撑得住,因为存储的map只是文件的信息而不是文件内容,并且,经常有进入目录或者查询修改时间产生的日志记录。
    • 上面这样的话,目录可以排除了,但是对于文件,还是会因为查询修改时间,产生日志记录,并且刷新log_time。所以还应该设置一个检查过的map记录(值为上一次复制或检查完成的时间),两个结合判断。
    # -*- coding: utf-8 -*-
    # 持续读取从ccdbfs日志捞取出来的变更文件,并同步到mfs对应路径下
    # 2019/03/26
    import os
    import time
    import sys
    import subprocess
    import threading
    
    CCDBFS_LOG_PATH = '/home/work/nfs_client/log/Client.log'
    SOURCE_ROOT = '/mnt/ccdbfs'
    TARGET_ROOT = '/mnt/mfs'
    CCDBFS_SAVE_ROOT = '/disk'
    CCDBFS_SEARCH_TOOLS_ROOT = '/home/work/copy_ccdbfs_to_mfs/output'
    
    EXCLUDE_LIST = ['/mnt/ccdbfs/aladata']
    
    DELAY_DEAL_TIME = 120
    
    MAP_DICT = {}  # 存储有变更的文件信息
    INODE_PATH_DICT = {}  # 提供存储和查询inode对应路径用
    FINISHED_DICT = {}  # 存储复制过的文件信息,值记录上次检查完成时间,值为1表示不再做检查
    
    def myprint(*args):
        ltime = time.localtime(time.time())
        now_time = time.strftime("%Y-%m-%d %H:%M:%S, ", ltime)        
        print(now_time, args)
    
    
    def copy_change(obj_path):
        '''在mfs中复制ccdbfs中的变化
        '''
        try:
            target_path = obj_path.replace(SOURCE_ROOT, TARGET_ROOT)
            # 先在mfs目标路径下判断路径是否存在,没有则创建
            (file_path,file_name) = os.path.split(target_path)
            if os.path.exists(file_path) == False:
                os.system('mkdir -p ' + file_path)
    
            # 复制文件或者软链接
            # -p 修改时间和访问权限也复制到新文件
            # -d 复制时保留链接
            # -u 源文件修改时间更新才复制
            os.system("""sh -c 'nohup cp -pdu "%s" "%s" &'""" % (obj_path, target_path))
        except:
            myprint('cp change error: ' + obj_path)
    
    def extract_dir_objname(log_buff):
        '''从日志语句中提取出父目录以及存储对象名
        '''
        try:
            tmplist = log_buff.split(' ')
            log_time = '2019-' + tmplist[1] + ' ' + tmplist[2]
            log_time_stamp = time.mktime(time.strptime(log_time, "%Y-%m-%d %H:%M:%S:"))
            # 去掉 parentDirInode:0xa0000000101b9c32, 的多余字符
            dir_inode = tmplist[8].replace('parentDirInode:', '').replace(',', '')
            # 只取出存储对象名字
            objname = tmplist[9].replace('
    ','').split(':')[1]
            return log_time_stamp, dir_inode, objname
        except:
            myprint('extract_dir_objname error, ' + log_buff)
            return None, None, None
    
    
    def tail_log_add_map():
        '''
        Desc: 持续捞取文件变更日志,并将满足条件的变更存入全局MAP_DICT
        '''
        # 持续捞取长这样的日志,必须包含 ]Lookup, 有逗号
        # DEBUG: 03-26 11:45:56:  Client * 15176 [ccdb:NFSClient.cpp:625:15176]Lookup, parentDirInode:0xa0000001198bee40, dentryName:filelinktest
        p = subprocess.Popen('tail -F ' + CCDBFS_LOG_PATH, shell=True, stdout=subprocess.PIPE)
        while True:
            buff = p.stdout.readline()
            # 克服直接在Popen中grep有问题的办法
            if ']Lookup,' not in buff:
                continue
            if buff == '' and p.poll() != None:
                break
            
            log_time_stamp, dir_inode, objname = extract_dir_objname(buff)
            if (log_time_stamp == None) or (dir_inode == None) or (objname == None):
                continue
    
            # 若不存在,添加之
            map_value = MAP_DICT.get(dir_inode + '+' + objname)
            if  map_value == None:
                MAP_DICT[dir_inode + '+' + objname] = log_time_stamp
            else:
                last_finished_time = FINISHED_DICT.get(dir_inode + '+' + objname)
                # 之前复制完成log_time变成了0,再次有修改才变换这里的log_time
                # 上次检测或者复制完成10s内,视为读操作,不修改log_time
                if last_finished_time == None:
                    # FINISHED_DICT无记录,表示要去检测
                    last_finished_time = 0
                if (map_value == 0) and (time.time() - last_finished_time > 10):
                        MAP_DICT[dir_inode + '+' + objname] = log_time_stamp
    
    
    def check_map_dict():
        '''
        Desc: 轮询遍历MAP_DICT将满足条件的文件进行复制或删除记录
        '''
        global MAP_DICT
        while True:
            for k, log_time in MAP_DICT.items():
                # myprint(k, log_time)
                # log_time为0表示上一次复制或检查完成,log_time为1表示是目录永远不再检查,不用操作
                # 在复制文件前会检查,如果目录不存在会创建
                if log_time == 0 or log_time == 1:
                    continue
                dir_inode, objname = k.split('+')
                # 先查全局INODE_PATH_DICT中存了路径没,没有再去查ccdbfs客户端
                dir_path = INODE_PATH_DICT.get(dir_inode)
                if dir_path == None:
                    try:
                        op = os.popen("cd %s && echo 'lookup %s' | ./bin/Cli -x" % (CCDBFS_SEARCH_TOOLS_ROOT, dir_inode))
                        ret = op.read()
                        # ccdbfs客户端存的路径是/disk开头,挂载路径是/mnt/ccdbfs
                        dir_path = ret.split(' ')[5].replace(CCDBFS_SAVE_ROOT, SOURCE_ROOT)
                        INODE_PATH_DICT[dir_inode] = dir_path
                        myprint('search ccdbfs client:%s->%s' % (dir_inode, dir_path))
                    except:
                        myprint('search dir path error: ' + k)
                        # 若获取不到路径,跳过这次复制
                        MAP_DICT[k] = 0
                        FINISHED_DICT[k] = time.time()
                        continue
    
                # 拼接文件路径
                obj_path = dir_path + objname
    
                # 若obj_path本身是目录,不用处理
                if os.path.isdir(obj_path) == True:
                    # 修改log_time为1,当做标记用
                    MAP_DICT[k] = 1
                    continue
    
                # 排除一些不需要监控的目录
                exclude_flag = False
                for exclude in EXCLUDE_LIST:
                    if exclude in dir_path:
                        exclude_flag = True
                        break
                if exclude_flag == True:
                    MAP_DICT[k] = 1  # 故意置1,永不检查
                    continue
    
                # 若是临时文件,不存在了,跳过这次复制
                if os.path.exists(obj_path) == False:
                    myprint(obj_path + ' is inexistent')
                    MAP_DICT[k] = 0
                    FINISHED_DICT[k] = time.time()
                    continue
    
                # 获取最新修改时间
                try:
                    modify_time = os.stat(obj_path).st_mtime
                except:
                    myprint('os.stat().st_mtime error: ' + obj_path)
                    continue
                now_time = time.time()
                # -5秒的判断,是防止极短时间写入后,log日志时间稍大于修改时间。
                if now_time - modify_time > DELAY_DEAL_TIME and modify_time - log_time >= -5:
                    # 复制同步,并修改键值表示复制过
                    myprint('cp and set MAP_DICT[k]=0, FINISHED_DICT=now_time, ' + obj_path)
                    copy_change(obj_path)
                    MAP_DICT[k] = 0
                    FINISHED_DICT[k] = now_time
                else:
                    # 也不能让MAP_DICT野蛮增长,对于读操作(日志时间大于修改时间5s以上)的log,要清除掉
                    if now_time - modify_time > DELAY_DEAL_TIME and log_time - modify_time > 5:
                        # 修改键值表示检查过
                        myprint('no change, set MAP_DICT[k]=0, FINISHED_DICT=now_time, ' + obj_path)
                        MAP_DICT[k] = 0
                        FINISHED_DICT[k] = now_time
                # time.sleep(2)
            time.sleep(60)
    
    if __name__ == '__main__':
        t1 = threading.Thread(target = tail_log_add_map)
        t2 = threading.Thread(target = check_map_dict)
    
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
    

    copy_inexistent_file.py 同步剩余全量文件,若无则同策略。另起一台机器连接新老nfs客户端即可启动之。

    # -*- coding: utf-8 -*-
    # 平台nfs未被inotify检测到的文件迁移
    # 2019/03/18
    import os
    import time
    
    DIRS_LIST = []  # 记录老文件夹下不断遍历添加进来的目录名,检测完后抛出,当队列使用
    
    # SOURCE_ROOT,TARGET_ROOT提供程序中替换目录路径用
    # 为防止其他文件也用到ccdbfs字眼,所以全部执行完后
    # 手动修改新mfs目录里ccdbfs软链接,而不再程序中特殊控制
    SOURCE_ROOT = '/mnt/ccdbfs'
    TARGET_ROOT = '/mnt/mfs'
    # SOURCE_ROOT = '/mnt/srcdir'
    # TARGET_ROOT = '/mnt/tardir'
    
    EXCLUDE_LIST = ['/mnt/ccdbfs/aladata']
    # EXCLUDE_LIST = ['/mnt/srcdir/aladata']
    
    def myprint(*args):
        ltime = time.localtime(time.time())
        now_time = time.strftime("%Y-%m-%d %H:%M:%S, ", ltime)        
        print(now_time, args)
    
    
    def dir_info(file_dir):
        for root, dirs, files in os.walk(file_dir):
            # myprint('root_dir:', root)  # 当前目录路径
            # myprint('sub_dirs:', dirs)  # 当前路径下所有子目录
            # myprint('files:', files)  # 当前路径下所有非目录子文件
            # 补全目录全路径
            full_path_dirs = [root + '/' + i for i in dirs]
            full_path_dirs_res = []
    
            # 过滤某些不拷贝的目录
            for i in full_path_dirs:
                exclude_flat = False
                for exclude in EXCLUDE_LIST:
                    if exclude in i:
                        exclude_flat = True
                        break
                if exclude_flat == False:
                    full_path_dirs_res.append(i)
    
            # 补全文件全路径
            full_path_files = [root + '/' + i for i in files]
            return full_path_dirs_res, full_path_files
    
    
    def doing_copy_file(dir_path):
        '''传入原ccdbfs目录下的某文件夹路径,复制其中文件
           并将其中文件夹推入全局DIRS_LIST
        '''
        dirs, files = dir_info(dir_path)
        # 发现目录,推入DIRS_LIST
        global DIRS_LIST
        DIRS_LIST += dirs
    
        for src_file in files:
            # mfs下属目录中不存在该文件,才复制
            dst_file = src_file.replace(SOURCE_ROOT, TARGET_ROOT)
            if os.path.exists(dst_file) == False:
                try:
                    # 防止目标地址目录没创建
                    (file_path,file_name) = os.path.split(dst_file)
                    if os.path.exists(file_path) == False:
                        os.system('mkdir -p ' + file_path)
                    # 正式复制该文件
                    myprint('cp -pdu "%s" "%s"' % (src_file, dst_file))
                    os.system('cp -pdu "%s" "%s"' % (src_file, dst_file))
                except:
                    myprint('copy file except: ' + dst_file)
    
    
    def make_soft_link(soft_file):
        '''将ccdbfs中的软链接复制到对应mfs目录相同层级下
        '''
        try:
            dst_file = soft_file.replace(SOURCE_ROOT, TARGET_ROOT)
            if os.path.exists(dst_file) == True:
                # 存在则不用重复复制
                myprint(dst_file + 'already exists')
                return
            # 防止目标地址目录没创建
            (file_path,file_name) = os.path.split(dst_file)
            if os.path.exists(file_path) == False:
                os.system('mkdir -p ' + file_path)
            # 正式复制该软链接
            myprint('cp -pdu "%s" "%s"' % (soft_file, dst_file))
            os.system('cp -pdu "%s" "%s"' % (soft_file, dst_file))
        except:
            myprint('make soft link except, old:' + soft_file)
    
    
    if __name__ == '__main__':
        # 先把源头根目录下的文件复制,以及其文件夹推入DIRS_LIST
        doing_copy_file(SOURCE_ROOT)
    
        # 一层一层目录遍历完,DIRS_LIST清空程序才结束
        while len(DIRS_LIST) > 0:
            current = DIRS_LIST.pop(0)
            # myprint('now doing->', current)
            if os.path.islink(current) == True:
                # 若是软链接,不进入目录,只获取软链接信息并在新mfs里创建同名软链接
                make_soft_link(current)
            else:
                # 在mfs下属文件夹中该目录不存在则创建
                dst = current.replace(SOURCE_ROOT, TARGET_ROOT)
                if os.path.exists(dst) == False:
                    os.system('mkdir -p ' + dst)
                doing_copy_file(current)
               
        print('done!')
    
  • 相关阅读:
    C# 从Excel 批量导入数据库
    SQL、Linq和Lambda表达式 的关系
    layer 中的 layer.alert layer.msg layer.confirm
    jquery 关于使用 append 追加 元素后 事件无法触发
    eBay 开发流程
    WCF学习笔记(2)——使用IIS承载WCF服务
    WCF学习笔记(1)——Hello WCF
    [老老实实学WCF] 第十篇 消息通信模式(下) 双工
    [老老实实学WCF] 第九篇 消息通信模式(上) 请求应答与单向
    [老老实实学WCF] 第八篇 实例化
  • 原文地址:https://www.cnblogs.com/xrszff/p/10960196.html
Copyright © 2011-2022 走看看