zoukankan      html  css  js  c++  java
  • Oracle同步数据到kafka

    文章介绍

    ​ 本文章介绍如何使用kafka-connect-oracle开源工具,将Oracle dml产生的数据实时同步至kafka,供kafka消费。

    环境准备

    软件准备

    • CentOS Linux 7.6.1810 (2台,A主机,B主机)
    • Oracle 11.2.0.4(A主机安装)
    • Kafka 2.13-2.6.0 (B主机安装)
    • kafka-connect-oracle-master (B主机安装,开源程序,用于同步Oracle数据到kafka)
    • apache-maven 3.6.3 (B主机安装,kafka-connect-oracle-master 的打包工具)
    • jdk-8u261-linux-x64.rpm (B主机安装)

    下载地址

    实施过程

    Oracle主机(A)配置

    Oracle实例配置项:

    • 开启归档日志
    • 开启附加日志
    • 创建kafka-connect-oracle-master连接用户
    • 创建测试数据生成用户及测试表
    --开启归档日志
    sqlplus / as sysdba    
    SQL>shutdown immediate
    SQL>startup mount
    SQL>alter database archivelog;
    SQL>alter database open;
    --开启附加日志
    SQL>alter database add supplemental log data (all) columns;
    --创建kafka-connect-oracle-master连接用户
    create role logmnr_role;
    grant create session to logmnr_role;
    grant  execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
    create user kminer identified by kminerpass;
    grant  logmnr_role to kminer;
    alter user kminer quota unlimited on users;
    --创建测试数据生成用户及测试表
    create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100M autoextend on next 10M;
    create user whtest identified by whtest default tablespace test_date;
    grant connect,resource to whtest;
    grant execute on dbms_scheduler to whtest;
    grant execute on dbms_random to whtest;
    grant   create  job  to  whtest;
    create table t1 (
    id int ,
    name char(10),
    createtime date default sysdate
    );
    alter table WHTEST.T1  add constraint PK_ID_T1 primary key (ID)  using index   tablespace TEST_DATE;
    
    create table t2 (
    id int ,
    name char(10),
    createtime date default sysdate
    );
    alter table WHTEST.T2  add constraint PK_ID_T2 primary key (ID)  using index   tablespace TEST_DATE;
    create table t3 (
    id int ,
    name char(10),
    createtime date default sysdate
    );
    alter table WHTEST.T3  add constraint PK_ID_T3 primary key (ID)  using index   tablespace TEST_DATE;
    begin
    dbms_scheduler.create_job(
    job_name=> 't1_job',
    job_type=> 'PLSQL_BLOCK',
    job_action =>'declare
    v_id int;
    v_name char(10);
    begin
      for i in 1..10 loop
        v_id := round(dbms_random.value(1,1000000000));
        v_name :=round(dbms_random.value(1,1000000000));
        insert into whtest.t1 (id,name)values(v_id,v_name);
      end loop;
    end;',
    enabled=>true,
    repeat_interval=>'sysdate + 5/86400',
    comments=>'insert into t1 every 5 sec');
    end;
    / 
    
    begin
    dbms_scheduler.create_job(
    job_name=> 't2_job',
    job_type=> 'PLSQL_BLOCK',
    job_action =>'declare
    v_id int;
    v_name char(10);
    begin
      for i in 1..10 loop
        v_id := round(dbms_random.value(1,1000000000));
        v_name :=round(dbms_random.value(1,1000000000));
        insert into whtest.t2 (id,name)values(v_id,v_name);
      end loop;
    end;',
    enabled=>true,
    repeat_interval=>'sysdate + 5/86400',
    comments=>'insert into t1 every 5 sec');
    end;
    / 
    
    begin
    dbms_scheduler.create_job(
    job_name=> 't3_job',
    job_type=> 'PLSQL_BLOCK',
    job_action =>'declare
    v_id int;
    v_name char(10);
    begin
      for i in 1..10 loop
        v_id := round(dbms_random.value(1,1000000000));
        v_name :=round(dbms_random.value(1,1000000000));
        insert into whtest.t3 (id,name)values(v_id,v_name);
      end loop;
    end;',
    enabled=>true,
    repeat_interval=>'sysdate + 5/86400',
    comments=>'insert into t3 every 5 sec');
    end;
    / 
    --JOB创建之后,暂时先diable,待kafka配置完成之后再enable
    exec DBMS_SCHEDULER.DISABLE('T1_JOB');
    exec DBMS_SCHEDULER.DISABLE('T2_JOB');
    exec DBMS_SCHEDULER.DISABLE('T3_JOB');
    
    exec DBMS_SCHEDULER.ENABLE('T1_JOB');
    exec DBMS_SCHEDULER.ENABLE('T2_JOB');
    exec DBMS_SCHEDULER.ENABLE('T3_JOB');
    

    Kafka主机(B)配置

    ​ 将下载好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上传至B主机/soft目录待使用。

    主机hosts文件添加解析

    [root@softdelvily ~]# cat /etc/hosts
    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.20.44 softdelvily localhost
    

    安装JDK

    [root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm 
    warning: jdk-8u261-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY
    Preparing...                          ################################# [100%]
    Updating / installing...
       1:jdk1.8-2000:1.8.0_261-fcs        ################################# [100%]
    Unpacking JAR files...
            tools.jar...
            plugin.jar...
            javaws.jar...
            deploy.jar...
            rt.jar...
            jsse.jar...
            charsets.jar...
            localedata.jar...
    

    配置apache-maven工具

    ​ 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。

    [root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/
    apache-maven-3.6.3/README.txt
    apache-maven-3.6.3/LICENSE
    .....
    [root@softdelvily soft]# cd /usr/local/
    [root@softdelvily local]# ll
    total 0
    drwxr-xr-x. 6 root root  99 Sep 23 09:56 apache-maven-3.6.3
    drwxr-xr-x. 2 root root   6 Apr 11  2018 bin
    .....
    [root@softdelvily local]# vi /etc/profile
    .......
    ##添加如下环境变量
    MAVEN_HOME=/usr/local/apache-maven-3.6.3
    export MAVEN_HOME
    export PATH=${PATH}:${MAVEN_HOME}/bin
    [root@softdelvily local]# source /etc/profile
    [root@softdelvily local]# mvn -v
    Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
    Maven home: /usr/local/apache-maven-3.6.3
    Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre
    Default locale: en_US, platform encoding: UTF-8
    OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
    

    配置Kafka 2.13-2.6.0

    ​ 解压Kafka 2.13-2.6.0 至/usr/local目录。

    [root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/
    kafka_2.13-2.6.0/
    kafka_2.13-2.6.0/LICENSE
    ......
    [root@softdelvily soft]# cd /usr/local/
    [root@softdelvily local]# ll
    total 0
    drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3
    drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0
    .....
    

    ​ 开启kafka,并创建对应同步数据库过的topic

    --1、session1 启动ZK
    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
    [2020-09-23 10:06:49,158] INFO Reading configuration from: ../config/zookeeper.properties 
    .......
    [2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
    --2、session2 启动kafka
    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties
    --3、session3 创建cdczztar
    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar
    Created topic cdczztar.
    [root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
    __consumer_offsets
    cdczztar
    

    配置kafka-connect-oracle-maste

    解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。

    --解压zip包
    [root@softdelvily soft]# unzip kafka-connect-oracle-master.zip 
    [root@softdelvily soft]# ll
    total 201180
    -rw-r--r--. 1 root root   9506321 Sep 22 16:05 apache-maven-3.6.3-bin.tar.gz
    -rw-r--r--. 1 root root 127431820 Sep  8 10:43 jdk-8u261-linux-x64.rpm
    -rw-r--r--. 1 root root  65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz
    drwxr-xr-x. 5 root root       107 Sep  8 15:48 kafka-connect-oracle-master
    -rw-r--r--. 1 root root   3522729 Sep 22 14:14 kafka-connect-oracle-master.zip
    [root@softdelvily soft]# cd kafka-connect-oracle-master/config/
    [root@softdelvily config]# ll
    total 4
    -rw-r--r--. 1 root root 1135 Sep  8 15:48 OracleSourceConnector.properties
    --调整properties配置文件
    --需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考README.md
    [root@softdelvily config]# vi OracleSourceConnector.properties 
    name=oracle-logminer-connector
    connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
    db.name.alias=zztar
    tasks.max=1
    topic=cdczztar
    db.name=zztar
    db.hostname=192.168.xx.xx
    db.port=1521
    db.user=kminer
    db.user.password=kminerpass
    db.fetch.size=1
    table.whitelist=WHTEST.T1,WHTEST.T2
    table.blacklist=WHTEST.T3
    parse.dml.data=true
    reset.offset=true
    start.scn=
    multitenant=false
    --编译程序
    [root@softdelvily ~]# cd /soft/kafka-connect-oracle-master
    [root@softdelvily kafka-connect-oracle-master]# mvn clean package
    [INFO] Scanning for projects...
    .......
    [INFO] Building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
    with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  94.03 s
    [INFO] Finished at: 2020-09-23T10:25:52+08:00
    [INFO] ------------------------------------------------------------------------
    

    ​ 将如下文件复制到kafka工作目录。

    • 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$KAFKA_HOME/lib
    • 复制config/OracleSourceConnector.properties 文件到$KAFKA_HOME/config
    [root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/
    [root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/
    [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib
    [root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/
    [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/
    [root@softdelvily config]# cp OracleSourceConnector.properties /usr/local/kafka_2.13-2.6.0/config/
    

    启动kafka-connect-oracle

    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties
    ......
    (com.ecer.kafka.connect.oracle.OracleSourceTask:187)
    [2020-09-23 10:40:31,375] INFO Log Miner will start at new position SCN : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.OracleSourceTask:188)
    

    启动kafka消费者

    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    

    启动数据库JOB

    [oracle@oracle01 ~]$ sqlplus / as sysdba
    
    SQL*Plus: Release 11.2.0.4.0 Production on Wed Sep 23 10:45:16 2020
    
    Copyright (c) 1982, 2011, Oracle.  All rights reserved.
    
    set pagesize 999
    
    Connected to:
    Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
    With the Partitioning, OLAP, Data Mining and Real Application Testing options
    
    SQL> SQL> conn whtest/whtest
    Connected.
    SQL> exec DBMS_SCHEDULER.ENABLE('T1_JOB');
    
    PL/SQL procedure successfully completed.
    

    kafka消费者界面

    出现类似记录,表明同步成功,数据以key:value的形式输出。

    {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"SCN":2847668,"SEG_OWNER":"WHTEST","TABLE_NAME":"T1","TIMESTAMP":1600829206000,"SQL_REDO":"insert into "WHTEST"."T1"("ID","NAME","CREATETIME") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION":"INSERT","data":{"ID":5.57005146E8,"NAME":"533888119","CREATETIME":1600829206000},"before":null}}
    
  • 相关阅读:
    奥展项目笔记12-批量下载文件
    深度学习笔记03-梯度下降和方向传播
    深度学习笔记02-高效计算基础(python)
    深度学习笔记01-数学基础
    解决Android Studio卡在Gradle:Resolve dependecies 'app:_debugCompile'问题
    Oracle DB , 计算各个用户/schema 的磁盘占用空间
    转载:删除github上文件夹的两种方式
    Win7无法保存共享帐户密码
    怎么做网线,网线水晶头接法和线序
    QQ Protect 的删除
  • 原文地址:https://www.cnblogs.com/bicewow/p/13717143.html
Copyright © 2011-2022 走看看