zoukankan      html  css  js  c++  java
  • ogg(19)抽取oracle(11.2)使用ogg for big data(12.3) 发送到kafka-connect(2.12_2.5.0)

    下载地址:
    https://www.oracle.com/middleware/technologies/goldengate-downloads.html

    OGG_GoldenGate数据迁移三进程Extract / Dump / Relicat
    源头抽取进程Extract:读取oracle redo log(假设源头数据库是oracle),结合抽取进程的参数文件,将redo log中的内容变为OGG自有格式的文件(即trail file),并写入到源头操作系统的指定路径下
    源头传输进程Dump:读取“源头抽取进程写入的trail file”,结合传输进程的参数文件,发送需要同步的数据到目的端,在目的端是由server 进程负责写入。
    目的端应用进程Replicat:读取目的端操作系统的指定路径下的trail file,结合应用进程的参数文件,形成sql语句,在目的端db上执行这些sql语句。

    https://blog.csdn.net/warren_zqw/article/details/52894586
    https://blog.csdn.net/qq_28356739/article/details/88585561

    Oracle源端部署与配置
    https://docs.oracle.com/en/middleware/goldengate/core/19.1/installing/database-requirements.html

    数据库配置要求详情

    在Oracle数据源端开启archive logging

    1.检查是否开启Archive logging
    SQL> archive log list;

    如果没有开启Archive logging,执行下面操作开启,该操作需要关闭Oracle
    SQL> alter database mount;
    SQL> alter database archivelog;
    SQL> alter database open;
    开启force logging和 minimalsupplemental logging

    1. 检查是否开启forcelogging和 minimal supplemental logging
      

    SQL > SELECT supplemental_log_data_min,force_logging FROM v$database;
    如果没有开启Supplemental logging,执行下面操作开启,该操作无需关闭Oracle
    SQL > alter database add supplemental log data (primary key) columns;
    SQL > alter database add supplemental log data (unique) columns;
    注意:如果不指定Primary key 和unique 属性,OGG将不会传送PK字段或Unique indiex字段信息。这样,下游的应用,在处理update数据时将失去依据。
    SQL > alter database force logging;
    SQL> alter system switch logfile;
    创建OGG帐户

    SQL >create user ogg identified by oggaccount;

    SQL >grant connect,resource to ogg;

    SQL >grant select any dictionary to ogg;

    SQL >grant select any table to ogg;

    源端安装配置OGG
    安装源端ogg
    设置response参数
    cd /opt/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response
    vi oggcore.rsp
    INSTALL_OPTION=ORA11g (oracle 版本11)
    INVENTORY_LOCATION=/opt/oraInventory-ogg-source
    SOFTWARE_LOCATION=/opt/ogg/
    START_MANAGER=false
    MANAGER_PORT=9001
    DATABASE_LOCATION=$ORACLE_HOME
    UNIX_GROUP_NAME=dba

    注意要使用oracle用户来进行安装,oggcore.rsp配置需要根据实际需要进行修改,也可使用图形界面进行安装
    运行安装程序
    cd /opt/ogg-source/fbo_ggs_Linux_x64_shiphome/Disk1
    ./runInstaller -silent -nowait -responseFile /opt/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

    error:
    1...../runInstaller: line 45: 7165 Segmentation fault $CMDDIR/install/.oui $*
    原因 文件无权限 仔细观看操作的目录 Preparing to launch Oracle Universal Installer from /tmp/OraInstall2020-08-13_02-51-37PM.

    2.No value specified for central inventory (oraInventory) ownership group.
    在/opt目录下创建ogg-source目录后正常运行安装
    mkdir ogg-source
    安装成功

    https://www.cnblogs.com/soysauce/p/7411314.html
    安装后开始配置源端ogg
    1.开启主库归档日志、补充日志及force logging
    alter database add supplemental log data;
    alter database force logging;
    alter system set enable_goldengate_replication=true scope=both;
    2.创建ogg的管理用户

    create user ogg identified by password account unlock;

    grant connect,resource to ogg;
    grant select any dictionary to ogg;
    grant select any table to ogg;
    grant execute on utl_file to ogg;
    grant restricted session to ogg;
    grant create table,create sequence to ogg;
    alter table DCSDBA.YH_ITF_INVENTORY_TRANSACTION add supplemental log data (all) columns;
    --如果kafka需要接收到完整的修改信息就需要开启补全日志

    首先安装jdk
    需要使用java 打开./ggsci
    配置环境变量:

    oracle安装目录(注意:目录最好使用双引号区分):

    export ORACLE_HOME="/opt/oracle/product/11.2.0/db_1";

    oracle实例名

    export ORACLE_SID=orcl;

    配置系统变量

    export PATH=$ORACLE_HOME/bin:$PATH;
    export GG_HOME="/opt/ogg-source";
    export PATH=$GG_HOME/bin:$PATH;
    export LD_LIBRARY_PATH=$GG_HOME:$ORACLE_HOME/lib
    export JAVA_HOME="/usr/local/java/jdk1.8.0_261";
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$JAVA_HOME/bin:$PATH

    登录ggsci
    GGSCI (zookeeper) 2> create subdirs

    Creating subdirectories under current directory

    Parameter file ./dirprm: created.
    Report file ./dirrpt: created.
    Checkpoint file ./dirchk: created.
    Process status files ./dirpcs: created.
    SQL script files ./dirsql: created.
    Database definitions files ./dirdef: created.
    Extract data files ./dirdat: created.
    Temporary files ./dirtmp: created.
    Credential store files ./dircrd: created.
    Masterkey wallet files ./dirwlt: created.
    Dump files ./dirdmp: created.

    配置源端manager进程
    port 7809
    -- DYNAMICPORTLIST 7810-7860 --动态端口当7809不可用,则自动获取动态端口
    -- autostart er * --开机自启 需要则开启
    -- autorestart er *
    -- autorestart EXTRACT ,RETRIES 3,WAITMINUTES 5,RESETMINUTES 60 --进程失败重启次数
    PURGEOLDEXTRACTS ./dirdat/
    ,usecheckpoints, minkeepdays 2 --定期清理trail文件设置
    USERID ogg, PASSWORD ogg
    LAGREPORTHOURS 1 --表示MGR进程每隔 x 小时,检查EXTRACT的延迟情况
    LAGINFOMINUTES 30 --如果超过了 y 分钟,就把延迟作为信息记录到错误日志中
    LAGCRITICALMINUTES 45 --如果延迟超过了z 分钟,则把它作为警告写到错误日志中

    启动manager并确认状态
    start mgr
    info all 查看MANAGER进程为running状态

    源端抽取传输一体进程配置(仅DML)
    抽取进程和传输进程其实都是EXTRACT进程,也可以配置在一个进程完成这两个功能,但是当网络传输有问题时,这样抽取也就不能继续运行了,所以推荐分开配置为两个进程;

    定义extract参数
    edit params customer
    配置源端代码实例

    配置抽取进程:
    --表示从当前时刻开始抽取oracle日志信息 也可以指定从某一时刻开始如 begin 2020-08-19 16:42
    GGSCI(NDSCDB1) 12> add extract customer,tranlog,begin now
    EXTRACTadded.

    --配置远程传输文件队列 trail file的地址和名称
    GGSCI(NDSCDB1) 14> add RMTTRAIL /opt/OGG_BigData_Linux_x64_12.3.2.1.1/dirdat/nc,extract customer,megabytes 100
    EXTTRAILadded.
    Megabytes:指定队列大小,本处设置表示100M。
    GGSCI(NDSCDB1) 15> edit params customer

    EXTRACT customer
    SETENV(NLS_LANG = "AMERICAN_AMERICA.UTF8")
    SETENV(ORACLE_HOME = "/opt/oracle/product/11.2.0/db_1")
    SETENV(ORACLE_SID = "orcl")
    userid ogg, password ogg
    gettruncates
    REPORTCOUNT EVERY 1 MINUTES, RATE
    DISCARDFILE ./dirrpt/extntest.dsc,APPEND,MEGABYTES 1024
    DBOPTIONS ALLOWUNUSEDCOLUMN
    WARNLONGTRANS 2h,CHECKINTERVAL 2000000s
    RMTHOST 192.168.219.128, MGRPORT 7809, compress
    RMTTRAIL /opt/OGG_BigData_Linux_x64_12.3.2.1.1/dirdat/nc , FORMAT release 12.3
    GETUPDATEBEFORES
    NOCOMPRESSUPDATES
    NOCOMPRESSDELETES
    table ogg.CUSTOMERS3;

    表结构传递
    源端:
    GGSCI> ggsci
    GGSCI> edit param defcom
    内容
    DEFSFILE dirdef/source.def, PURGE
    NOEXTATTR
    USERID ogg, PASSWORD ogg
    TABLE ogg.* ;

    [ogg@tdb1 ogg]$ ./defgen paramfile ./dirprm/defcom.prm --shell命令 若双方版本不相同 则用下面的命令
    把source.def放到目标端的/ogg/dirdef/下

    目标端指定表结构文件路径
    如果对方版本较低 需要添加NOEXTATTR 参数
    ./defgen paramfile ./dirprm/defcom.prm NOEXTATTR
    测试版本 源端19 目标端12.3 所以需要使用此命令

    查询状态

    info CUSTOMER detail

    ④启动进程
    start

    文件权限异常
    Login to the database as user ogg failed because of error ORA-12547: TNS:lost contact.
    chmod 6751 $ORACLE_HOME/bin/oracle

    目标端Replication安装配置

    安装OGG for Big Data

    下载Oracle GoldenGate for Big Data V12.2.0.1.1
    配置环境变量
    vim /etc/profile
    export GGHOME=/opt/OGG_BigData_Linux_x64_12.3.2.1.1
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$GG_HOME:/lib

    测试环境变量
    cd /opt/OGG_BigData_Linux_x64_12.3.2.1.1
    ./ggsci

    表示成功
    配置kc.props 和kafka_custom.properties
    kafka_custom.properties要添加
    converter.type=key
    converter.type=value
    converter.type=header

    kc.props和kafka_custom.properties要放在dirprm路径下
    OGG_BigData_Linux_x64_12.3.2.1.1/AdapterExamples/big-data/kafka_connect 下有示例
    将kafka-connect中的connect-distributed.properties文件拷过来放到./dirprm/文件夹下

    gg.handlerlist=kafkaconnect

    The handler properties

    gg.handler.kafkaconnect.type=kafkaconnect
    gg.handler.kafkaconnect.kafkaProducerConfigFile=connect-distributed.properties
    gg.handler.kafkaconnect.mode=tx

    The following selects the topic name based on the fully qualified table name

    gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}

    The following selects the message key using the concatenated primary keys

    gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}

    The formatter properties

    gg.handler.kafkaconnect.messageFormatting=row
    gg.handler.kafkaconnect.insertOpKey=I
    gg.handler.kafkaconnect.updateOpKey=U
    gg.handler.kafkaconnect.deleteOpKey=D
    gg.handler.kafkaconnect.truncateOpKey=T
    gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
    gg.handler.kafkaconnect.iso8601Format=false
    gg.handler.kafkaconnect.pkUpdateHandling=abend
    gg.handler.kafkaconnect.includeTableName=true
    gg.handler.kafkaconnect.includeOpType=true
    gg.handler.kafkaconnect.includeOpTimestamp=true
    gg.handler.kafkaconnect.includeCurrentTimestamp=true
    gg.handler.kafkaconnect.includePosition=true
    gg.handler.kafkaconnect.includePrimaryKeys=true
    gg.handler.kafkaconnect.includeTokens=false

    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE

    gg.log=log4j
    gg.log.level=INFO

    gg.report.time=30sec

    Apache Kafka Classpath

    gg.classpath=/opt/kafka_2.12-2.5.0/libs/*

    Confluent IO classpath

    gg.classpath={Confluent install dir}/share/java/kafka-serde-tools/:{Confluent install dir}/share/java/kafka/:{Confluent install dir}/share/java/confluent-common/*

    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

    初始化目录
    create subdirs

    编辑MGR进程
    edit params mgr
    添加如下内容
    PORT 7809
    DYNAMICPORTLIST 7810-7860
    AUTORESTART ER , RETRIES 3, WAITMINUTES 5
    PURGEOLDEXTRACTS ./dirdat/
    , USECHECKPOINTS, MINKEEPDAYS 30
    lagreporthours 1
    laginfominutes 30
    lagcriticalminutes 60

    https://blog.csdn.net/brj880719/article/details/77182644

    启动mgr
    start mgr
    info all 检查状态

    添加仅DML复制进程
    add replicat rcust3,exttrail dirdat/nc,begin now
    edit params rcust3

    示例
    REPLICAT rcust3
    TARGETDB LIBFILE libggjava.so SET property=/opt/OGG_BigData_Linux_x64_12.3.2.1.1/dirprm/kc.props
    SOURCEDEFS /opt/OGG_BigData_Linux_x64_12.3.2.1.1/dirdef/customer3.def
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 10000
    MAP ogg.CUSTOMERS3, TARGET CUSTOMERS.CUSTOMERS3;

    -- TARGET CUSTOMERS.CUSTOMERS3 目标topic 名

    开启复制进程
    start replicat rcust3

    参数说明
    MANAGER进程参数配置说明:
    PORT:指定服务监听端口;这里以7839为例,默认端口为7809
    DYNAMICPORTLIST:动态端口:可以制定最大256个可用端口的动态列表,当指定的端口不可用时,管理进程将会从列表中选择一个可用的端口,源端和目标段的Collector、Replicat、GGSCI进程通信也会使用这些端口;
    COMMENT:注释行,也可以用--来代替;
    AUTOSTART:指定在管理进程启动时自动启动哪些进程;
    AUTORESTART:自动重启参数设置:本处设置表示每3分钟尝试重新启动所有EXTRACT进程,共尝试5次;
    PURGEOLDEXTRACTS:定期清理trail文件设置:本处设置表示对于超过3天的trail文件进行删除。
    LAGREPORT、LAGINFO、LAGCRITICAL:
    定义数据延迟的预警机制:本处设置表示MGR进程每隔1小时检查EXTRACT的延迟情况,如果超过了30分钟就把延迟作为信息记录到错误日志中,如果延迟超过了45分钟,则把它作为警告写到错误日志中。

    EXTRACT进程参数配置说明:
    SETENV:配置系统环境变量
    USERID/ PASSWORD:指定OGG连接数据库的用户名和密码,这里使用前面部分中创建的数据库用户OGG;
    COMMENT:注释行,也可以用--来代替;
    TABLE:定义需复制的表,后面需以;结尾
    TABLEEXCLUDE:定义需要排除的表,如果在TABLE参数中使用了通配符,可以使用该参数指定排除掉得表。
    GETUPDATEAFTERS|IGNOREUPDATEAFTERS:
    是否在队列中写入后影像,缺省复制
    GETUPDATEBEFORES| IGNOREUPDATEBEFORES:
    是否在队列中写入前影像,缺省不复制
    GETUPDATES|IGNOREUPDATES:
    是否复制UPDATE操作,缺省复制
    GETDELETES|IGNOREDELETES:
    是否复制DELETE操作,缺省复制
    GETINSERTS|IGNOREINSERTS:
    是否复制INSERT操作,缺省复制
    GETTRUNCATES|IGNORETRUNDATES:
    是否复制TRUNCATE操作,缺省不复制;
    RMTHOST:指定目标系统及其GoldengateManager进程的端口号,还用于定义是否使用压缩进行传输,本例中的compress为压缩传输;
    RMTTRAIL:指定写入到目标断的哪个队列;
    EXTTRAIL:指定写入到本地的哪个队列;
    SQLEXEC:在extract进程运行时首先运行一个SQL语句;
    PASSTHRU:禁止extract进程与数据库交互,适用于Data Pump传输进程;
    REPORT:定义自动定时报告;
    STATOPTIONS:定义每次使用stat时统计数字是否需要重置;
    REPORTCOUNT:报告已经处理的记录条数统计数字;
    TLTRACE:打开对于数据库日志的跟踪日志;
    DISCARDFILE:定义discardfile文件位置,如果处理中油记录出错会写入到此文件中;
    DBOPTIONS:指定对于某种特定数据库所需要的特殊参数;
    TRANLOGOPTIONS:指定在解析数据库日志时所需要的特殊参数,例如:对于裸设备,可能需要加入以下参数 rawdeviceoggset 0
    WARNLONGTRANS:指定对于超过一定时间的长交易可以在gsserr.log里面写入警告信息,本处配置为每隔3分钟检查一次场交易,对于超过2小时的进行警告;

    使用DYNAMICPORTLIST参数去指定一个可用端口列表,它可使用于以下本地OGG进程对于与远程OGG进程通信的绑定

    、AUTORESTART
    语法:AUTORESTART EXTRACT *, RETRIES , WAITMINUTES , RESETMINUTES
    RETRIES:尝试次数
    WAITMINUTES:每次间隔时间长度
    RESETMINUTES:多长时间后清零
    PURGEOLDEXTRACTS
    定期清理trail文件设置
    语法:PURGEOLDEXTRACTS , USECHECKPOINTS, MINKEEPHOURS <"x" hours> MINKEEPFILES <"y" number of files>
    REPORT & INFO & CRITICAL
    表示MGR进程每隔 x 小时/分钟,检查EXTRACT的延迟情况
    LAGREPORTHOURS/LAGREPORTMINUTES
    如果超过了 y 小时/分钟,就把延迟作为信息记录到错误日志中
    LAGINFOHOURS/LAGINFOMINUTES
    如果延迟超过了z 小时/分钟,则把它作为警告写到错误日志中
    LAGCRITICALHOURS/LAGCRITICALMINUTES

    复制进程配置参数:
    REPLICAT进程参数配置说明:
    ASSUMETARGETDEFS:假定两端数据结构一致使用此参数;
    SOURCEDEFS:假定两端数据结构不一致,使用此参数指定源端的数据结构定义文件,该文件需要由GlodenGate工具产生。
    MAP:用于指定源端与目标端表的映射关系;
    MAPEXCLUDE:用于使用在MAP中使用*匹配时排除掉指定的表;
    REPERROR:定义出错以后进程的响应,一般可以定义为两种:
    ABEND,即一旦出现错误即停止复制,此为缺省配置;
    DISCARD,出现错误后继续复制,只是把错误的数据放到discard文件中。
    DISCARDFILE:定义discardfile文件位置,如果处理中油记录出错会写入到此文件中;
    SQLEXEC:在进程运行时首先运行一个SQL语句;
    GROUPTRANSOPS:将小交易合并成一个大的交易进行提交,减少提交次数,降低系统IO消耗。
    MAXTRANSOPS:将大交易拆分,每XX条记录提交一次。
    DBOPTIONSDEFERREFCONST
    约束延迟设置。在复制进程的事物被提交之前,延迟级联删除、级联更新时的校验和实施。
    GETTRUNCATES
    捕获生产端truncate table的操作。
    REPORTAT 01:00
    每天早上1点报告
    REPORTCOUNTEVERY 30 MINUTES, RATE
    每隔30分钟报告一次从程序开始到现在的抽取进程或者复制进程的事物记录数,并汇报进程的统计信息
    REPORTROLLOVERAT 02:00
    为了防止report file被写满,每天2:00做一次文件过期设定
    REPERRORDEFAULT, ABEND
    除了特殊指定的REPERROR语句,报告所有复制期间出现的错误,回滚非正常中断的事物和进程
    定义出错以后进程的响应,一般可以定义为两种:abend,即一旦出现错误即停止复制,此为缺省配置;discard,出现错误后继续复制,只是把错误的数据放到discard文件中。
    —HANDLECOLLISIONS
    当灾备端已经存在数据的情况下,解决复制过程中出现的冲突。如果要重新做初始化,可以删除drop灾备端数据库后再rman恢复,这样做的话就不需要该参数了。
    ALLOWNOOPUPDATES
    当生产端有某些列但是目标表却没有,或者复制进程中配置了COLSEXCEPT 参数 在这些情况下,当生产端对那些列进行更新,目标表将不发生任何变化
    DISCARDFILE./dirrpt/repsa.dsc, APPEND, MEGABYTES 1024M
    将执行失败的记录保存在discard file中,该文件位于./dirrpt/extya.dsc,大小为1024MB。 文件中已经包含记录的话,再后面继续追加,不删除之前的记录。
    SOURCEDEFS
    假定两端数据结构不一致,使用此参数指定源端的数据结构定义文件,该文件需要由glodengate工具产生。

  • 相关阅读:
    java项目中常用的定时任务实现方法
    mysql8.0只能本地连接解决方法
    自定义Mybatis Plus代码生成器(增加Vo类的生成)
    VMware的安装
    HDU 1728 逃离迷宫
    HDU2191 悼念512汶川大地震遇难同胞——珍惜现在,感恩生活
    HDU1059 Dividing
    HDU1114 Piggy-Bank
    HDU4508 湫湫系列故事——减肥记I
    HDU 2602 Bone Collector
  • 原文地址:https://www.cnblogs.com/cg14/p/13535881.html
Copyright © 2011-2022 走看看