zoukankan      html  css  js  c++  java
  • Oracle同步数据到kafka

    文章介绍

    ​ 本文章介绍如何使用kafka-connect-oracle开源工具,将Oracle dml产生的数据实时同步至kafka,供kafka消费。

    环境准备

    软件准备

    • CentOS Linux 7.6.1810 (2台,A主机,B主机)
    • Oracle 11.2.0.4(A主机安装)
    • Kafka 2.13-2.6.0 (B主机安装)
    • kafka-connect-oracle-master (B主机安装,开源程序,用于同步Oracle数据到kafka)
    • apache-maven 3.6.3 (B主机安装,kafka-connect-oracle-master 的打包工具)
    • jdk-8u261-linux-x64.rpm (B主机安装)

    下载地址

    实施过程

    Oracle主机(A)配置

    Oracle实例配置项:

    • 开启归档日志
    • 开启附加日志
    • 创建kafka-connect-oracle-master连接用户
    • 创建测试数据生成用户及测试表
    --开启归档日志
    sqlplus / as sysdba    
    SQL>shutdown immediate
    SQL>startup mount
    SQL>alter database archivelog;
    SQL>alter database open;
    --开启附加日志
    SQL>alter database add supplemental log data (all) columns;
    --创建kafka-connect-oracle-master连接用户
    create role logmnr_role;
    grant create session to logmnr_role;
    grant  execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
    create user kminer identified by kminerpass;
    grant  logmnr_role to kminer;
    alter user kminer quota unlimited on users;
    --创建测试数据生成用户及测试表
    create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100M autoextend on next 10M;
    create user whtest identified by whtest default tablespace test_date;
    grant connect,resource to whtest;
    grant execute on dbms_scheduler to whtest;
    grant execute on dbms_random to whtest;
    grant   create  job  to  whtest;
    create table t1 (
    id int ,
    name char(10),
    createtime date default sysdate
    );
    alter table WHTEST.T1  add constraint PK_ID_T1 primary key (ID)  using index   tablespace TEST_DATE;
    
    create table t2 (
    id int ,
    name char(10),
    createtime date default sysdate
    );
    alter table WHTEST.T2  add constraint PK_ID_T2 primary key (ID)  using index   tablespace TEST_DATE;
    create table t3 (
    id int ,
    name char(10),
    createtime date default sysdate
    );
    alter table WHTEST.T3  add constraint PK_ID_T3 primary key (ID)  using index   tablespace TEST_DATE;
    begin
    dbms_scheduler.create_job(
    job_name=> 't1_job',
    job_type=> 'PLSQL_BLOCK',
    job_action =>'declare
    v_id int;
    v_name char(10);
    begin
      for i in 1..10 loop
        v_id := round(dbms_random.value(1,1000000000));
        v_name :=round(dbms_random.value(1,1000000000));
        insert into whtest.t1 (id,name)values(v_id,v_name);
      end loop;
    end;',
    enabled=>true,
    repeat_interval=>'sysdate + 5/86400',
    comments=>'insert into t1 every 5 sec');
    end;
    / 
    
    begin
    dbms_scheduler.create_job(
    job_name=> 't2_job',
    job_type=> 'PLSQL_BLOCK',
    job_action =>'declare
    v_id int;
    v_name char(10);
    begin
      for i in 1..10 loop
        v_id := round(dbms_random.value(1,1000000000));
        v_name :=round(dbms_random.value(1,1000000000));
        insert into whtest.t2 (id,name)values(v_id,v_name);
      end loop;
    end;',
    enabled=>true,
    repeat_interval=>'sysdate + 5/86400',
    comments=>'insert into t1 every 5 sec');
    end;
    / 
    
    begin
    dbms_scheduler.create_job(
    job_name=> 't3_job',
    job_type=> 'PLSQL_BLOCK',
    job_action =>'declare
    v_id int;
    v_name char(10);
    begin
      for i in 1..10 loop
        v_id := round(dbms_random.value(1,1000000000));
        v_name :=round(dbms_random.value(1,1000000000));
        insert into whtest.t3 (id,name)values(v_id,v_name);
      end loop;
    end;',
    enabled=>true,
    repeat_interval=>'sysdate + 5/86400',
    comments=>'insert into t3 every 5 sec');
    end;
    / 
    --JOB创建之后,暂时先diable,待kafka配置完成之后再enable
    exec DBMS_SCHEDULER.DISABLE('T1_JOB');
    exec DBMS_SCHEDULER.DISABLE('T2_JOB');
    exec DBMS_SCHEDULER.DISABLE('T3_JOB');
    
    exec DBMS_SCHEDULER.ENABLE('T1_JOB');
    exec DBMS_SCHEDULER.ENABLE('T2_JOB');
    exec DBMS_SCHEDULER.ENABLE('T3_JOB');
    

    Kafka主机(B)配置

    ​ 将下载好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上传至B主机/soft目录待使用。

    主机hosts文件添加解析

    [root@softdelvily ~]# cat /etc/hosts
    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.20.44 softdelvily localhost
    

    安装JDK

    [root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm 
    warning: jdk-8u261-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY
    Preparing...                          ################################# [100%]
    Updating / installing...
       1:jdk1.8-2000:1.8.0_261-fcs        ################################# [100%]
    Unpacking JAR files...
            tools.jar...
            plugin.jar...
            javaws.jar...
            deploy.jar...
            rt.jar...
            jsse.jar...
            charsets.jar...
            localedata.jar...
    

    配置apache-maven工具

    ​ 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。

    [root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/
    apache-maven-3.6.3/README.txt
    apache-maven-3.6.3/LICENSE
    .....
    [root@softdelvily soft]# cd /usr/local/
    [root@softdelvily local]# ll
    total 0
    drwxr-xr-x. 6 root root  99 Sep 23 09:56 apache-maven-3.6.3
    drwxr-xr-x. 2 root root   6 Apr 11  2018 bin
    .....
    [root@softdelvily local]# vi /etc/profile
    .......
    ##添加如下环境变量
    MAVEN_HOME=/usr/local/apache-maven-3.6.3
    export MAVEN_HOME
    export PATH=${PATH}:${MAVEN_HOME}/bin
    [root@softdelvily local]# source /etc/profile
    [root@softdelvily local]# mvn -v
    Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
    Maven home: /usr/local/apache-maven-3.6.3
    Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre
    Default locale: en_US, platform encoding: UTF-8
    OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
    

    配置Kafka 2.13-2.6.0

    ​ 解压Kafka 2.13-2.6.0 至/usr/local目录。

    [root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/
    kafka_2.13-2.6.0/
    kafka_2.13-2.6.0/LICENSE
    ......
    [root@softdelvily soft]# cd /usr/local/
    [root@softdelvily local]# ll
    total 0
    drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3
    drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0
    .....
    

    ​ 开启kafka,并创建对应同步数据库过的topic

    --1、session1 启动ZK
    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
    [2020-09-23 10:06:49,158] INFO Reading configuration from: ../config/zookeeper.properties 
    .......
    [2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
    --2、session2 启动kafka
    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties
    --3、session3 创建cdczztar
    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar
    Created topic cdczztar.
    [root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
    __consumer_offsets
    cdczztar
    

    配置kafka-connect-oracle-maste

    解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。

    --解压zip包
    [root@softdelvily soft]# unzip kafka-connect-oracle-master.zip 
    [root@softdelvily soft]# ll
    total 201180
    -rw-r--r--. 1 root root   9506321 Sep 22 16:05 apache-maven-3.6.3-bin.tar.gz
    -rw-r--r--. 1 root root 127431820 Sep  8 10:43 jdk-8u261-linux-x64.rpm
    -rw-r--r--. 1 root root  65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz
    drwxr-xr-x. 5 root root       107 Sep  8 15:48 kafka-connect-oracle-master
    -rw-r--r--. 1 root root   3522729 Sep 22 14:14 kafka-connect-oracle-master.zip
    [root@softdelvily soft]# cd kafka-connect-oracle-master/config/
    [root@softdelvily config]# ll
    total 4
    -rw-r--r--. 1 root root 1135 Sep  8 15:48 OracleSourceConnector.properties
    --调整properties配置文件
    --需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考README.md
    [root@softdelvily config]# vi OracleSourceConnector.properties 
    name=oracle-logminer-connector
    connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
    db.name.alias=zztar
    tasks.max=1
    topic=cdczztar
    db.name=zztar
    db.hostname=192.168.xx.xx
    db.port=1521
    db.user=kminer
    db.user.password=kminerpass
    db.fetch.size=1
    table.whitelist=WHTEST.T1,WHTEST.T2
    table.blacklist=WHTEST.T3
    parse.dml.data=true
    reset.offset=true
    start.scn=
    multitenant=false
    --编译程序
    [root@softdelvily ~]# cd /soft/kafka-connect-oracle-master
    [root@softdelvily kafka-connect-oracle-master]# mvn clean package
    [INFO] Scanning for projects...
    .......
    [INFO] Building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
    with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  94.03 s
    [INFO] Finished at: 2020-09-23T10:25:52+08:00
    [INFO] ------------------------------------------------------------------------
    

    ​ 将如下文件复制到kafka工作目录。

    • 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$KAFKA_HOME/lib
    • 复制config/OracleSourceConnector.properties 文件到$KAFKA_HOME/config
    [root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/
    [root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/
    [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib
    [root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/
    [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/
    [root@softdelvily config]# cp OracleSourceConnector.properties /usr/local/kafka_2.13-2.6.0/config/
    

    启动kafka-connect-oracle

    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    [root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties
    ......
    (com.ecer.kafka.connect.oracle.OracleSourceTask:187)
    [2020-09-23 10:40:31,375] INFO Log Miner will start at new position SCN : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.OracleSourceTask:188)
    

    启动kafka消费者

    [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
    

    启动数据库JOB

    [oracle@oracle01 ~]$ sqlplus / as sysdba
    
    SQL*Plus: Release 11.2.0.4.0 Production on Wed Sep 23 10:45:16 2020
    
    Copyright (c) 1982, 2011, Oracle.  All rights reserved.
    
    set pagesize 999
    
    Connected to:
    Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
    With the Partitioning, OLAP, Data Mining and Real Application Testing options
    
    SQL> SQL> conn whtest/whtest
    Connected.
    SQL> exec DBMS_SCHEDULER.ENABLE('T1_JOB');
    
    PL/SQL procedure successfully completed.
    

    kafka消费者界面

    出现类似记录,表明同步成功,数据以key:value的形式输出。

    {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"SCN":2847668,"SEG_OWNER":"WHTEST","TABLE_NAME":"T1","TIMESTAMP":1600829206000,"SQL_REDO":"insert into "WHTEST"."T1"("ID","NAME","CREATETIME") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION":"INSERT","data":{"ID":5.57005146E8,"NAME":"533888119","CREATETIME":1600829206000},"before":null}}
    
  • 相关阅读:
    android 中文 api (43) —— Chronometer
    SVN客户端清除密码
    Android 中文 API (35) —— ImageSwitcher
    Android 中文API (46) —— SimpleAdapter
    Android 中文 API (28) —— CheckedTextView
    Android 中文 API (36) —— Toast
    Android 中文 API (29) —— CompoundButton
    android 中文 API (41) —— RatingBar.OnRatingBarChangeListener
    Android 中文 API (30) —— CompoundButton.OnCheckedChangeListener
    Android 中文 API (24) —— MultiAutoCompleteTextView.CommaTokenizer
  • 原文地址:https://www.cnblogs.com/bicewow/p/13717143.html
Copyright © 2011-2022 走看看