OGG实时同步Oracle数据到Kafka实施文档(供flink流式计算)
GoldenGate12C For Bigdata+Kafka:通过OGG将Oracle数据以Json格式同步到Kafka提供给flink流式计算
注意:这篇文章告诉了大家怎么搭建OGG for bigdata做测试,但是实际生活中,因为这个文章中对于insert,delete,update均放到一个topic,在后期flink注册流表或则Kylin流式构建cube时候解析有问题(因为json结构不一致),现在给出本人实际flink开发过程中用到的oggfor bigdata配置文档OGG For Bigdata 12按操作类型同步Oracle数据到kafka不同topic
Oracle可以通过OGG for Bigdata将Oracle数据库数据实时增量同步至hadoop平台(kafka,hdfs等)进行消费,笔者搭建这个环境的目的是将Oracle数据库表通过OGG同步到kafka来提供给flink做流计算。这里介绍Oracle通过OGG for Bigdata将数据变更同步至kafka的详细实施过程,整个过程已经通过本人测试没问题。
主机规划与配置:
篇幅原因,Linux系统、源端Oracle数据库和源端OGG12C的软件安装在其他文档写了,这里不再赘述。
一、安装Zookeeper集群
1、安装JDK(目的端操作)
之前安装的jdk1.7,1.7版本jdk在启replicat 进程时由于jdk版本问题导致进程abend,OGG For Bigdata12.3不支持1.7。具体报错详见:四、安装过程遇到的错误
1.1、先看下当前环境是否有安装的jdk
1[root@zookeeper ~]# rpm -qa | grep java
2java-1.7.0-openjdk-1.7.0.99-2.6.5.1.0.1.el6.x86_64
3tzdata-java-2016c-1.el6.noarch
4java-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64
5[root@zookeeper ~]# rpm -qa | grep jdk
6java-1.7.0-openjdk-1.7.0.99-2.6.5.1.0.1.el6.x86_64
7java-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64
8[root@zookeeper ~]# rpm -qa | grep gcj
9
10
1.2、删除Linux自带的jdk
1[root@zookeeper ~]# rpm -e --nodeps java-1.7.0-openjdk-1.7.0.99-2.6.5.1.0.1.el6.x86_64
2[root@zookeeper ~]# rpm -e --nodeps tzdata-java-2016c-1.el6.noarch
3[root@zookeeper ~]# rpm -e java-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64
4[root@zookeeper ~]# rpm -e java-1.7.0-openjdk-1.7.0.99-2.6.5.1.0.1.el6.x86_64
5
6
1.3、检查是否还存在linuk自带jdk
1[root@zookeeper ~]# rpm -qa | grep java
2[root@zookeeper ~]# rpm -qa | grep jdk
3[root@zookeeper ~]# rpm -qa | grep gcj
4已经不存在了
5
6
1.4、创建jdk目录
1[root@zookeeper ~]# mkdir -p /usr/java
2
3
1.5、上传并解压jdk到此目录
1[root@zookeeper ~]# cd /usr/java/
2[root@zookeeper java]# ls
3jdk-8u151-linux-x64.tar.gz
4[root@zookeeper java]# tar -zxvf jdk-8u151-linux-x64.tar.gz
5[root@zookeeper java]# rm -rf jdk-8u151-linux-x64.tar.gz
6[root@zookeeper java]# ls
7jdk1.8.0_151
8
9
1.6、编辑/etc/profile
1[root@zookeeper java]# vim /etc/profile
2写入下面jdk环境变量,保存退出
3export JAVA_HOME=/usr/java/jdk1.8.0_151
4export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
5export PATH=$PATH:$JAVA_HOME/bin
6使环境变量生效
7[root@zookeeper java]# source /etc/profile
8
9
1.7、检查jdk是否配置成功
1[root@zookeeper java]# java -version
2java version "1.8.0_151"
3Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
4Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
5配置没问题
6
7
2、安装Zookeeper集群 (目的端操作)
这里安装方式采用在一台机器上部署一套三节点(官方推荐最少三节点)的伪集群,后面的zookeeper、kafka、ogg for bigdata软件均放在/kafka目录下
2.1、创建软件存放目录
1[root@zookeeper java]# mkdir /kafka
2[root@zookeeper java]# chown -R oracle:oinstall /kafka/
3[root@zookeeper java]# chmod -R 777 /kafka/
4
5
2.2、创建3个Zk server的集群安装目录
1[root@zookeeper java]# mkdir -p /kafka/zookeeper/zookeeper1
2[root@zookeeper java]# mkdir -p /kafka/zookeeper/zookeeper2
3[root@zookeeper java]# mkdir -p /kafka/zookeeper/zookeeper3
4
5
2.3、上传解压zookeeper软件到三个目录
1先上传文件并解压到第一个目录,cd /kafka/zookeeper/zookeeper1/做下面操作
2[root@zookeeper zookeeper1]# tar -zxvf zookeeper-3.4.6.tar.gz
3[root@zookeeper zookeeper1]# rm -rf zookeeper-3.4.6.tar.gz
4[root@zookeeper zookeeper1]# mv zookeeper-3.4.6/* .
5[root@zookeeper zookeeper1]# ls
6bin CHANGES.txt contrib docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.6 zookeeper-3.4.6.jar.asc zookeeper-3.4.6.jar.sha1
7build.xml conf dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.6.jar zookeeper-3.4.6.jar.md5
8另外两个目录的zookeeper软件可以通过第一个目录进行copy就可
9[root@zookeeper zookeeper1]# cp -rp * /kafka/zookeeper/zookeeper2/
10[root@zookeeper zookeeper1]# cp -rp * /kafka/zookeeper/zookeeper3/
11
12
2.4、创建日志目录
1创建快照日志存放目录:
2mkdir -p /kafka/zookeeper/zookeeper1/dataDir
3mkdir -p /kafka/zookeeper/zookeeper2/dataDir
4mkdir -p /kafka/zookeeper/zookeeper3/dataDir
5创建事务日志存放目录:
6mkdir -p /kafka/zookeeper/zookeeper1/dataLogDir
7mkdir -p /kafka/zookeeper/zookeeper2/dataLogDir
8mkdir -p /kafka/zookeeper/zookeeper3/dataLogDir
9【注意】:如果不配置dataLogDir,那么事务日志也会写在dataDir目录中。这样会严重影响zk的性能。因为在zk吞吐量很高的时候,产生的事务日志和快照日志太多。
10
11
2.5、修改/etc/hosts
1修改/etc/hosts内容如下
2127.0.0.1 localhost
3192.168.1.21 zookeeper
4192.168.1.21 zookeeper1
5192.168.1.21 zookeeper2
6192.168.1.21 zookeeper3
7
8
2.6、修改zookeeper配置文件
1server1配置如下:
2[root@zookeeper ~]# cd /kafka/zookeeper/zookeeper1/conf/
3[root@zookeeper conf]# ls
4configuration.xsl log4j.properties zoo_sample.cfg
5[root@zookeeper conf]# mv zoo_sample.cfg zoo.cfg
6[root@zookeeper conf]# vim zoo.cfg
7配置内容如下:
8[root@zookeeper conf]# cat zoo.cfg |grep -v ^#|grep -v ^$
9tickTime=2000
10initLimit=10
11syncLimit=5
12clientPort=2181
13dataDir=/kafka/zookeeper/zookeeper1/dataDir
14dataLogDir=/kafka/zookeeper/zookeeper1/dataLogDir
15server.1=zookeeper1:2887:3887
16server.2=zookeeper2:2888:3888
17server.3=zookeeper3:2889:3889
18server2配置文件内容如下:
19[root@zookeeper conf]# cat /kafka/zookeeper/zookeeper2/conf/zoo.cfg |grep -v ^#|grep -v ^$
20tickTime=2000
21initLimit=10
22syncLimit=5
23clientPort=2182
24dataDir=/kafka/zookeeper/zookeeper2/dataDir
25dataLogDir=/kafka/zookeeper/zookeeper2/dataLogDir
26server.1=zookeeper1:2887:3887
27server.2=zookeeper2:2888:3888
28server.3=zookeeper3:2889:3889
29server3配置文件内容如下:
30[root@zookeeper conf]# cat /kafka/zookeeper/zookeeper3/conf/zoo.cfg |grep -v ^#|grep -v ^$
31tickTime=2000
32initLimit=10
33syncLimit=5
34clientPort=2183
35dataDir=/kafka/zookeeper/zookeeper3/dataDir
36dataLogDir=/kafka/zookeeper/zookeeper3/dataLogDir
37server.1=zookeeper1:2887:3887
38server.2=zookeeper2:2888:3888
39server.3=zookeeper3:2889:3889
40在我们配置的dataDir指定的目录下面,创建一个myid文件,里面内容为一个数字,用来标识当前主机,conf/zoo.cfg文件中配置的server.X中X为什么数字,则myid文件中就输入这个数字:
41[root@zookeeper conf]# echo "1" > /kafka/zookeeper/zookeeper1/dataDir/myid
42[root@zookeeper conf]# echo "2" > /kafka/zookeeper/zookeeper2/dataDir/myid
43[root@zookeeper conf]# echo "3" > /kafka/zookeeper/zookeeper3/dataDir/myid
44
45
2.7、关闭防火墙
1关闭防火墙并禁止开机自启:
2service iptables stop
3sudo chkconfig iptables off
4
5
2.8、启动zookeeper集群
1[root@zookeeper bin]# /kafka/zookeeper/zookeeper1/bin/zkServer.sh start
2JMX enabled by default
3Using config: /kafka/zookeeper/zookeeper1/bin/../conf/zoo.cfg
4Starting zookeeper ... STARTED
5这里虽然显示启动了,但是来看一下启动日志:
6[root@zookeeper bin]# tail -f zookeeper.out
7 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
8 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
9 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
10 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
11 at java.net.Socket.connect(Socket.java:579)
12 at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
13 at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
14 at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
15 at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
162018-12-11 16:58:26,328 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:FastLeaderElection@849] - Notification time out: 3200
172018-12-11 16:58:29,530 [myid:1] - WARN [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@382] - Cannot open channel to 2 at election address zookeeper2/192.168.1.21:3888
18java.net.ConnectException: Connection refused
19 at java.net.PlainSocketImpl.socketConnect(Native Method)
20 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
21 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
22 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
23 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
24 at java.net.Socket.connect(Socket.java:579)
25 at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
26 at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
27 at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
28 at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
292018-12-11 16:58:29,531 [myid:1] - WARN [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@382] - Cannot open channel to 3 at election address zookeeper3/192.168.1.21:3889
30java.net.ConnectException: Connection refused
31 at java.net.PlainSocketImpl.socketConnect(Native Method)
32 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
33 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
34 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
35 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
36 at java.net.Socket.connect(Socket.java:579)
37 at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
38 at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
39 at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
40 at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
412018-12-11 16:58:29,531 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:FastLeaderElection@849] - Notification time out: 6400
42查看日志,发现日志报错,报错内容为myid=1的节点不能连接到2和3节点,这里不用管,只要启动集群另外其他节点就可正常,直接继续手动启动节点二和节点三:
43[root@zookeeper bin]# /kafka/zookeeper/zookeeper2/bin/zkServer.sh start
44JMX enabled by default
45Using config: /kafka/zookeeper/zookeeper2/bin/../conf/zoo.cfg
46Starting zookeeper ... STARTED
47[root@zookeeper bin]# /kafka/zookeeper/zookeeper3/bin/zkServer.sh start
48JMX enabled by default
49Using config: /kafka/zookeeper/zookeeper3/bin/../conf/zoo.cfg
50Starting zookeeper ... STARTED
51节点二和节点三都起来之后,再去看节点一的日志:
52[root@zookeeper bin]# tail -f zookeeper.out
532018-12-11 16:58:41,828 [myid:3] - INFO [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2183:FileTxnSnapLog@240] - Snapshotting: 0x100000000 to /kafka/zookeeper/zookeeper3/dataDir/version-2/snapshot.1
54000000002018-12-11 16:58:41,717 [myid:2] - INFO [zookeeper2/192.168.1.21:3888:QuorumCnxManager$Listener@511] - Received connection request /192.168.1.21:38969
552018-12-11 16:58:41,729 [myid:2] - INFO [WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message format version), 3 (n.leader), 0x0 (n.zxid), 0x1 (n.round), LOOKING (n.st
56ate), 3 (n.sid), 0x0 (n.peerEpoch) LEADING (my state)2018-12-11 16:58:41,761 [myid:2] - INFO [LearnerHandler-/192.168.1.21:49909:LearnerHandler@330] - Follower sid: 3 : info : org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer@7399ae
57792018-12-11