zoukankan      html  css  js  c++  java
  • 通过Bottledwater同步PostgreSQL中的数据变化到Kafka消息队列

    当我们遇到需要捕获数据库中数据变化的时候,总是会想到通过消息队列来实现该需求,通过把数据变化发布到消息队列,来完成系统上下游的解耦。关心这些数据变化的应用可以从消息队列上获取这些数据。

    Bottledwater-pg是针对PostgreSQL数据库的一种消息生产者,可以将PostgreSQL数据库的数据写入confluent Kafka,从而实时的分享给消息订阅者。支持PostgreSQL 9.4以及以上版本,支持全量快照,以及持续解析数据WAL日志中的增量数据并写入Kafka。每一张数据库表为一个topic。数据在使用decode从WAL取出后,使用Avro将数据格式化(通常格式化为JSON)再写入Kafka。

    Bottledwater-pg有docker、源码编译、Ubuntu三种使用方式,本文以源码编译方式说明如何部署。

    一. 环境说明

    • 源端

    IP:10.19.100.23

    操作系统:RHEL6.3

    数据库:PostgreSQL-9.4.11

    • 目的端

    IP:10.19.100.21

    操作系统:RHEL6.3

    Kafka:2.10-0.10.2.0

    二. 源端配置(10.19.100.23)

    Bottledwater-pg依赖以下软件包:

    • avro-c > =1.8.0
    • jansson
    • libcurl
    • librdkafka > =0.9.1

    Bottledwater-pg可选以下软件包:

    • libsnappy
    • boost

    另外编译要求较高的cmake版本,操作系统自带的cmake会出现编译错误,本文使用:

    • cmake-3.8.0

    2.0 卸载系统自带PG包

    root用户操作)

     RedHat根据安装模式可能自带8.4版本的PostgreSQL,请务必卸载,否则会对编译造成影响

    # rpm -qa|grep postgres
    postgresql-libs-8.4.11-1.el6_2.x86_64
    postgresql-devel-8.4.11-1.el6_2.x86_64
    postgresql-8.4.11-1.el6_2.x86_64
    
    # rpm -e postgresql-devel-8.4.11-1.el6_2.x86_64
    # rpm -e postgresql-8.4.11-1.el6_2.x86_64
    # rpm -e postgresql-libs-8.4.11-1.el6_2.x86_64

    2.1 编译安装cmake

    root用户操作)

    # cd /opt
    # tar zxvf cmake-3.8.0.tar.gz
    # cd cmake-3.8.0
    # ./bootstrap
    # make
    # make install
    
    # cmake -version
    cmake version 3.8.0
    CMake suite maintained and supported by Kitware (kitware.com/cmake).

    2.2 编译安装jansson

    root用户操作)

    # cd /opt
    # tar jxvf jansson-2.9.tar.bz2
    # cd jansson-2.9
    # ./configure
    # make
    # make install
    
    # ls /usr/local/lib/pkgconfig
    jansson.pc
    
    #export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

    2.3 编译安装avro

    root用户操作)

    # cd /opt
    # yum install -y xz-*
    # yum install -y zlib-devel.x86_64
    
    # tar zxvf avro-src-1.8.1.tar.gz
    # cd avro-src-1.8.1/lang/c
    # mkdir build
    # cd build
    # cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true
    # make
    # make test
    # make install

    导入库文件

    # vi /etc/ld.so.conf
    /opt/avro/lib
    
    # ldconfig

    配置临时环境变量

    # export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH
    
    # export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

    2.4 安装libcurl

    root用户操作)

    # yum install -y libcurl-devel.x86_64

    2.5 编译安装librdkafka

    root用户操作)

    # cd /opt
    # unzip librdkafka-master.zip
    # cd librdkafka-master
    # ./configure
    # make
    # make install
    
    # ls /usr/local/lib/pkgconfig
    jansson.pc  rdkafka.pc  rdkafka++.pc

    2.6 添加引用库

    root用户操作)

    # vi /etc/ld.so.conf.d/bottledwater.conf
    /opt/avro/lib
    /usr/local/lib
    /PostgreSQL/9.4/lib
    
    # ldconfig

    2.7 编译安装bottledwater-pg

    root用户操作)

    # chown -R postgres:postgres /opt/

    postgres用户操作)

    配置环境变量

    $ vi ~/.bash_profile
    
    # .bash_profile
    # Get the aliases and functions
    
    if [ -f ~/.bashrc ]; then
            . ~/.bashrc
    fi
    
    # User specific environment and startup programs
    
    export PG_HOME=/PostgreSQL/9.4
    export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH
    export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH
    
    PATH=$PG_HOME/bin:$PATH:$HOME/bin
    export PATH

    准备安装包

    $ unzip bottledwater-pg-master.zip
    $ cd bottledwater-pg-master

    这里不知道是操作系统环境的问题还是开源软件本身的问题,需要修改源码包里的2处Makefile才能通过编译。

    • 修改client/Makefile
    PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)
    • 修改kafka/Makefile
    PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)
    
    LDFLAGS=-L/usr/lib64 $(CURL_LDFLAGS) $(PG_LDFLAGS) $(KAFKA_LDFLAGS) $(AVRO_LDFLAGS) $(JSON_LDFLAGS)

    编译并安装bottledwater-pg

    $ make
    $ make install

    安装完成后会自动在PostgreSQL数据库扩展包目录下生成扩展库文件和扩展库控制文件。

    $ ls /PostgreSQL/9.4/lib/postgresql/bottledwater*
    /PostgreSQL/9.4/lib/postgresql/bottledwater.so
    
    $ ls /PostgreSQL/9.4/share/postgresql/extension/bottledwater*
    /PostgreSQL/9.4/share/postgresql/extension/bottledwater--0.1.sql
    /PostgreSQL/9.4/share/postgresql/extension/bottledwater.control

    2.7 数据库配置

    试验环境中PostgreSQL已有数据库mas,用户mas,密码mas。

    Mas库中有一张表:

    mastest(col1 numeric primary key,col2 character varying(10));
    • 修改数据库配置文件
    $ vi /PostgreSQL/9.4/data/postgresql.conf
    
    max_worker_processes = 8        # 至少为8
    wal_level = logical             # 至少为logical,可以更高
    max_wal_senders = 8             # 至少为8
    wal_keep_segments = 256         # 至少为256
    max_replication_slots = 4       # 至少为4
    • 修改数据库白名单配置文件

      *这部分权限可能过大,可以精简

    $ vi /PostgreSQL/9.4/data/pg_hba.conf
    
    local   replication     all                                     trust
    host    replication     all             127.0.0.1/32            trust
    host    replication     all             0.0.0.0/0               md5
    host    all             postgres        10.19.100.23/32         trust
    • 重启数据库并对要监控的库创建bottledwater扩展
    $ pg_ctl restart -D /PostgreSQL/9.4/data/ -l /PostgreSQL/9.4/data/pglog.log -m fast
    
    $ psql -U postgres -d mas -c "create extension bottledwater;"
    Password for user postgres:
    
    CREATE EXTENSION

    三. 测试同步

    • 在源端(10.19.100.23)启动bottledwater

     

    $ cd /opt/bottledwater-pg-master/kafka
    
    $ ./bottledwater --postgres=postgres://postgres:123456@10.19.100.23:5432/mas --broker=10.19.100.21:9092 -f json
    
    [INFO] Writing messages to Kafka in JSON format
    [INFO] Created replication slot "bottledwater", capturing consistent snapshot "00000718-1".
    INFO:  bottledwater_export: Table mas.mastest is keyed by index mastest_pkey
    [INFO] Snapshot complete, streaming changes from 0/1749F58.
    • 向源端数据库写入数据
    mas=> insert into mastest values(1,'A');
    INSERT 0 1
    
    mas=> insert into mastest values(2,'B');
    INSERT 0 1
    
    mas=> update mastest set col2='C' where col1=2;
    UPDATE 1
    
    mas=> delete from mastest where col1=1;
    DELETE 1
    • 目的端查看消息事件
    # bin/kafka-topics.sh --list --zookeeper localhost:2181
    mas.mastest
    
    # bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mas.mastest --from-beginning --property print.key=true
    {"col1": {"double": 1.0}}       {"col1": {"double": 1.0}, "col2": {"string": "A"}}
    {"col1": {"double": 2.0}}       {"col1": {"double": 2.0}, "col2": {"string": "B"}}
    {"col1": {"double": 2.0}}       {"col1": {"double": 2.0}, "col2": {"string": "C"}}
    {"col1": {"double": 1.0}}       null

    说明:

    1. 目的端Kafka配置文件中,Listener的配置不能用默认值localhost

    2. 每一张PostgreSQL中的表都会被做为一个topic,每个topic是自动创建的不需要人工干预,即使后期新建的表也是如此。

    3. 消息事件以“主键列 + 变更后的所有列”作为消息内容

    4. 主从复制模式下,slave不能启动bottledwater,因为备库不产生wal日志

  • 相关阅读:
    Atitit.兼具兼容性和扩展性的配置方案attilax总结
    Atitit.异步编程技术原理与实践attilax总结
    Atitit.异步编程技术原理与实践attilax总结
    Atitit. 获取cpu占有率的 java c# .net php node.js的实现
    Atitit. 获取cpu占有率的 java c# .net php node.js的实现
    Atitit.研发团队的管理原则---立长不立贤与按资排辈原则
    Atitit.研发团队的管理原则---立长不立贤与按资排辈原则
    Atitit.研发团队与公司绩效管理的原理概论的attilax总结
    Atitit.研发团队与公司绩效管理的原理概论的attilax总结
    Atitit selenium3 新特性
  • 原文地址:https://www.cnblogs.com/aegis1019/p/9051462.html
Copyright © 2011-2022 走看看