zoukankan      html  css  js  c++  java
  • 使用Oracle Logminer同步Demo

    使用Oracle Logminer同步Demo

    1 Demo介绍

    1.1 Demo设想

    前面介绍了Oracle LogMiner配置使用以及使用LogMiner进行解析日志文件性能,在这篇文章中将利用LogMiner进行数据同步,实现从源目标数据库到目标数据库之间的数据同步。由于LogMiner支持的版本是8.1及以上,所以进行数据同步的Oracle数据库版本也必须是8.1及以上。

    当然在本文中介绍的是LogMiner进行数据同步例子,也可以利用LogMiner进行数据审计、数据操作追踪等功能,由于这些从操作原理来说是一致,在本文不做讨论。

    1.2 框架图

    clip_image002[6]

    1.3 流程图

    clip_image002[4]

    l 配置阶段

    1、 控制端:指定源端、目标端数据库信息、LOGMINER同步时间等配置信息;

    l 获取源端同步数据

    2、 控制台:通过定时轮询的方式检测是否到达数据同步时间,如果是则进行数据同步,否则继续进行轮询;

    3、 源数据库:定时加载数据库归档日志文件到动态表v$logmnr_contents中;

    4、 源数据库:根据条件读取指定sql语句;

    l 目标端数据入库

    5、 源数据库:执行sql语句。

    2 代码分析

    2.1 目录及环境配置

    在该Demo项目中需要引入Oracle JDBC驱动包,具体项目分为四个类:

    1. Start.java:程序入口方法;

    2. SyncTask.java:数据同步Demo核心,生成字典文件和读取日志文件、目标数据库执行SQL语句等;

    3. DataBase.java:数据库操作基础类;

    4. Constants.java:源数据库、目标数据库配置、字典文件和归档文件路径。

    clip_image006[4]

    2.2 代码分析

    2.2.1 Constants.java

    在该类中设置了数据同步开始SCN号、源数据库配置、目标数据库配置以及字典文件/日志文件路径。需要注意的是在源数据库配置中有两个用户:一个是调用LogMiner用户,该用户需要拥有dbms_logmnr、dbms_logmnr_d两个过程权限,在该Demo中该用为为sync;另外一个为LogMiner读取该用户操作SQL语句,在该Demo中该用为为LOGMINER。

    复制代码
    package com.constants;
    
    /**
     * [Constants]|描述:Logminer配置参数
     * @作者: ***
     * @日期: 2013-1-15 下午01:53:57
     * @修改历史:
     */
    public class Constants {
        
        /** 上次数据同步最后SCN号 */
        public static String LAST_SCN = "0";
    
        /** 源数据库配置 */
        public static String DATABASE_DRIVER="oracle.jdbc.driver.OracleDriver";
        public static String SOURCE_DATABASE_URL="jdbc:oracle:thin:@127.0.0.1:1521:practice";
        public static String SOURCE_DATABASE_USERNAME="sync";
        public static String SOURCE_DATABASE_PASSWORD="sync";
        public static String SOURCE_CLIENT_USERNAME = "LOGMINER";
        
        /** 目标数据库配置 */
        public static String SOURCE_TARGET_URL="jdbc:oracle:thin:@127.0.0.1:1521:target";
        public static String SOURCE_TARGET_USERNAME="target";
        public static String SOURCE_TARGET_PASSWORD="target";
        
        /** 日志文件路径 */
        public static String LOG_PATH = "D:\oracle\oradata\practice";
        
        /** 数据字典路径 */
        public static String DATA_DICTIONARY = "D:\oracle\oradata\practice\LOGMNR";
    }
    复制代码
    2.2.2 SyncTask.java

    在该类中有两个方法,第一个方法为createDictionary,作用为生成数据字典文件,另外一个是startLogmur,该方法是LogMiner分析同步方法。

    复制代码
    /**
     * <p>方法名称: createDictionary|描述: 调用logminer生成数据字典文件</p>
     * @param sourceConn 源数据库连接
     * @throws Exception 异常信息
     */
    public void createDictionary(Connection sourceConn) throws Exception{
        String createDictSql = "BEGIN dbms_logmnr_d.build(dictionary_filename => 'dictionary.ora', dictionary_location =>'"+Constants.DATA_DICTIONARY+"'); END;";
        CallableStatement callableStatement = sourceConn.prepareCall(createDictSql);
        callableStatement.execute();
    }
    复制代码
    复制代码
    /**
     * <p>方法名称: startLogmur|描述:启动logminer分析 </p>
     * @throws Exception
     */
    public void startLogmur() throws Exception{
        
        Connection sourceConn = null;
        Connection targetConn = null;
        try {
            ResultSet resultSet = null;
            
            // 获取源数据库连接
            sourceConn = DataBase.getSourceDataBase();
            Statement statement = sourceConn.createStatement();
            
            // 添加所有日志文件,本代码仅分析联机日志
            StringBuffer sbSQL = new StringBuffer();
            sbSQL.append(" BEGIN");
            sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\REDO01.LOG', options=>dbms_logmnr.NEW);");
            sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\REDO02.LOG', options=>dbms_logmnr.ADDFILE);");
            sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\REDO03.LOG', options=>dbms_logmnr.ADDFILE);");
            sbSQL.append(" END;");
            CallableStatement callableStatement = sourceConn.prepareCall(sbSQL+"");
            callableStatement.execute();
            
            // 打印获分析日志文件信息
            resultSet = statement.executeQuery("SELECT db_name, thread_sqn, filename FROM v$logmnr_logs");
            while(resultSet.next()){
                System.out.println("已添加日志文件==>"+resultSet.getObject(3));
            }
            
            System.out.println("开始分析日志文件,起始scn号:"+Constants.LAST_SCN);
            callableStatement = sourceConn.prepareCall("BEGIN dbms_logmnr.start_logmnr(startScn=>'"+Constants.LAST_SCN+"',dictfilename=>'"+Constants.DATA_DICTIONARY+"\dictionary.ora',OPTIONS =>DBMS_LOGMNR.COMMITTED_DATA_ONLY+dbms_logmnr.NO_ROWID_IN_STMT);END;");
            callableStatement.execute();
            System.out.println("完成分析日志文件");
            
            // 查询获取分析结果
            System.out.println("查询分析结果");
            resultSet = statement.executeQuery("SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='"+Constants.SOURCE_CLIENT_USERNAME+"' AND seg_type_name='TABLE' AND operation !='SELECT_FOR_UPDATE'");
            
            // 连接到目标数据库,在目标数据库执行redo语句
            targetConn = DataBase.getTargetDataBase();
            Statement targetStatement = targetConn.createStatement();
            
            String lastScn = Constants.LAST_SCN;
            String operation = null;
            String sql = null;
            boolean isCreateDictionary = false;
            while(resultSet.next()){
                lastScn = resultSet.getObject(1)+"";
                if( lastScn.equals(Constants.LAST_SCN) ){
                    continue;
                }
                
                operation = resultSet.getObject(2)+"";
                if( "DDL".equalsIgnoreCase(operation) ){
                    isCreateDictionary = true;
                }
                
                sql = resultSet.getObject(5)+"";
                
                // 替换用户
                sql = sql.replace("""+Constants.SOURCE_CLIENT_USERNAME+"".", "");
                System.out.println("scn="+lastScn+",自动执行sql=="+sql+"");
                
                try {
                    targetStatement.executeUpdate(sql.substring(0, sql.length()-1));
                } catch (Exception e) {
                    System.out.println("测试一下,已经执行过了");
                }
            }
            
            // 更新scn
            Constants.LAST_SCN = (Integer.parseInt(lastScn))+"";
            
            // DDL发生变化,更新数据字典
            if( isCreateDictionary ){
                System.out.println("DDL发生变化,更新数据字典");
                createDictionary(sourceConn);
                System.out.println("完成更新数据字典");
                isCreateDictionary = false;
            }
            
            System.out.println("完成一个工作单元");
            
        }
        finally{
            if( null != sourceConn ){
                sourceConn.close();
            }
            if( null != targetConn ){
                targetConn.close();
            }
            
            sourceConn = null;
            targetConn = null;
        }
    }
    复制代码

    3 运行结果

    3.1 源数据库操作

    1、创建AAAAA表,并插入数据

    clip_image008[4]

    2、创建EMP1表

    clip_image010[4]

    3.2 运行Demo

    在控制台中输出如下日志

    clip_image012[4]

    3.3 目标数据库结果

    创建AAAAA和EMP1表,并在AAAAA插入了数据

    clip_image014[4]

  • 相关阅读:
    [Swift]LeetCode282. 给表达式添加运算符 | Expression Add Operators
    [Swift]LeetCode279. 完全平方数 | Perfect Squares
    [Swift]LeetCode275. H指数 II | H-Index II
    [Swift]LeetCode274.H指数 | H-Index
    [Swift]LeetCode273. 整数转换英文表示 | Integer to English Words
    [Swift]LeetCode267.回文全排列 II $ Palindrome Permutation II
    Cygwin与minGW
    pat-1087【最短路径】
    Codeforces Round #313 A. Currency System in Geraldion(简单题)
    DIV+CSS在不同浏览器中的表现
  • 原文地址:https://www.cnblogs.com/qiumingcheng/p/10976440.html
Copyright © 2011-2022 走看看