zoukankan      html  css  js  c++  java
  • mysql历史数据自动归档

    数据库跑一段时间后,因为查询性能、磁盘容量,运维管理等方面的原因,需要将在线数据挪到历史库(不同的服务器)。如我们的在线订单只留3个月数据,3个月以前的就需要到历史库查了。

    自动归档常见的方式有pt-archiver,但我还是觉得自己写存储过程更靠谱。。。

    思路:

    在线库实例打开federated支持,创建数据库dborder(业务库), linkhis(归档用);
    历史库创建历史表dborderhis.myorder_tab_his;
    在linkhis库下创建federated表linkhis.myorder_tab_his,指向dborderhis.myorder_tab_his;
    在linkhis库下创建日志表archive_log,存储过程proc_archive,proc_archive_pkg,并通过JOB调度proc_archive_pkg;
    在线库的从库需要忽略linkhis的复制:replicate-ignore-db=linkhis,否则从库也会往这个历史库重复同步数据。

    日志记录表archive_log

    create table archive_log
    (
    id bigint auto_increment PRIMARY key,
    tab_name varchar(40),
    archive_date_begin datetime,
    archive_date_end datetime,
    create_time datetime default CURRENT_TIMESTAMP(),
    status int(1),
    insert_rows bigint(11),
    delete_rows bigint(11),
    remark varchar(1000)
    )

    存储过程:proc_archive

    CREATE PROCEDURE proc_archive(in i_table_source varchar(40), 
    in i_table_target varchar(40), 
    in i_fieldname varchar(40), 
    in i_keepdays int,
    in i_archdays int,
    in i_other_cond varchar(500))
    begin
    /*
    入参:
    i_table_source:原表,含dbname
    i_table_target:federated表
    i_fieldname:时间字段
    i_keepdays:保留天数
    i_archdays:每次归档多少天数据
    i_other_cond:数据额外条件(如status in (2,3)不能归档,需要保留),无额外条件则输入'1=1'
    归档日志表archive_log.status字段含义:
    0:成功, 1:现有数据在保留天数内, 2:目标表含有待归档时间范围的数据, 
    3:插入数据和删除数据记录数不同, 4:SQL执行异常,具体错误见remark
    注意:
    有额外条件时,如果历史数据被修改,从不符合归档条件变成符合归档条件,
    因历史表中归档时间段内已经有之前归档的数据(@v_his_num_before>0),程序会退出,需手动处理
    */
    declare EXIT HANDLER for SQLWARNING,NOT FOUND,SQLEXCEPTION 
    begin 
    GET DIAGNOSTICS CONDITION 1 @p1=RETURNED_SQLSTATE,@p2= MESSAGE_TEXT;
    ROLLBACK;
    insert into archive_log(tab_name,archive_date_begin,archive_date_end,status,insert_rows,delete_rows,remark)
    values(i_table_source,@v_arch_begin,@v_arch_end,4,@v_his_num_after,@v_del_num,concat('error ',@p1,' - ',@p2));
    end;
    /* 获取在线表的最小日期 */
    set @mystmt = concat("select str_to_date(date_format(min(",i_fieldname,"),'%Y%m%d'),'%Y%m%d') into @v_arch_begin from ",i_table_source,' where ',i_other_cond);
    prepare stmt from @mystmt;
    execute stmt;
    deallocate prepare stmt;
    
    set @v_arch_end = date_add(@v_arch_begin,interval i_archdays day);
    
    set @mystmt = concat("select count(*) into @v_his_num_before from ",i_table_target," where ",i_fieldname," >= ? and ",i_fieldname," < ?");
    prepare stmt from @mystmt;
    execute stmt using @v_arch_begin,@v_arch_end;
    deallocate prepare stmt;
    /* 如果在线表的数据低于keepday范围,退出 */
    if timestampdiff(day,@v_arch_begin,now()) <= i_keepdays then
    insert into archive_log(tab_name,archive_date_begin,archive_date_end,status,insert_rows,delete_rows,remark)
    values(i_table_source,@v_arch_begin,@v_arch_end,1,0,0,concat('error, all data in keey days, min ',i_fieldname,': ',@v_arch_begin));
    end if;
    /* 如果历史表所在的日期区间有数据,退出(需要手动排查原因) */
    if @v_his_num_before <> 0 then
    insert into archive_log(tab_name,archive_date_begin,archive_date_end,status,insert_rows,delete_rows,remark)
    values(i_table_source,@v_arch_begin,@v_arch_end,2,0,0,concat('error, data exists,row num:',@v_his_num_before));
    end if;
    
    if (timestampdiff(day,@v_arch_begin,now()) > i_keepdays and @v_his_num_before = 0) then 
    set @mystmt = concat("insert into ",i_table_target," select * from ",i_table_source," where ",i_fieldname," >= ? and ",i_fieldname," < ? and ",i_other_cond);
    prepare stmt from @mystmt;
    execute stmt using @v_arch_begin,@v_arch_end;
    deallocate prepare stmt;
    /* 因为federated引擎不支持事务,数据insert后再select下记录数,与下面的delete记录数对比,相同则提交delete操作 */
    set @mystmt = concat("select count(*) into @v_his_num_after from ",i_table_target," where ",i_fieldname," >= ? and ",i_fieldname," < ?");
    prepare stmt from @mystmt;
    execute stmt using @v_arch_begin,@v_arch_end;
    deallocate prepare stmt;
    
    start transaction;
    
    set @mystmt = concat("delete from ",i_table_source," where ",i_fieldname," >= ? and ",i_fieldname," < ? and ",i_other_cond);
    prepare stmt from @mystmt;
    execute stmt using @v_arch_begin,@v_arch_end;
    set @v_del_num = row_count();
    deallocate prepare stmt;
    
    if @v_del_num = @v_his_num_after then
    commit;
    insert into archive_log(tab_name,archive_date_begin,archive_date_end,status,insert_rows,delete_rows,remark)
    values(i_table_source,@v_arch_begin,@v_arch_end,0,@v_his_num_after,@v_del_num,'success');
    else
    rollback;
    insert into archive_log(tab_name,archive_date_begin,archive_date_end,status,insert_rows,delete_rows,remark)
    values(i_table_source,@v_arch_begin,@v_arch_end,3,@v_his_num_after,@v_del_num,'rollback, inserted rows num not equal to deleted rows num');
    end if;
    end if;
    
    end;
     

    存储过程proc_archive_pkg

    CREATE PROCEDURE `proc_archive_pkg`()
    begin
    call proc_archive(
                      'dborder.myorder_tab', -- tabel source
                      'myorder_tab_his',     -- table target
                      'create_time',         -- time field name
                      120,                   -- i_keepdays
                      1,                     -- i_archdays
                      '1=1'                  -- i_other_cond
                      );
     
    end


    归档日志表记录



     
  • 相关阅读:
    python
    HTTP和HTTPS协议,详解
    常用加密算法之非对称加密算法
    Docker容器数据卷-Volume详解
    使用nsenter进入docker容器后端报错 mesg: ttyname failed: No such file or directory
    Docker 查看容器 IP 地址
    Docker容器数据卷volumes-from
    Docker 进入容器的4种方法
    Jmeter之Bean shell使用(二)
    Jmeter之Bean shell使用(一)
  • 原文地址:https://www.cnblogs.com/lkj371/p/12750934.html
Copyright © 2011-2022 走看看