zoukankan      html  css  js  c++  java
  • PostgreSQL逻辑复制到kafka-实践

    kafka 安装

      wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

      cp kafka_2.12-2.0.1.tgz  kafka.tgz

          sudo tar xzvf kafka.tgz --directory=/opt/java/kafka --strip 1

    启动 kafka需要先启动本地的 zookeeper,注意修改配置文件中zk的连接地址

      /opt/kafka/bin/kafka-server-start.sh   /opt/kafka/config/server.properties

    kafkacat 是一个C语言编写的 kafka 生产者、消费者程序。安装过程需要一些库可能需要手动下载。

    postgres 逻辑解码, 程序 jsoncdc

    jsoncdc 依赖于rust可能需要先安装 rust 或者可以使用 wal2json替代

    编译好之后本地目录下有 jsoncdc.so 或者 wal2json.so

    postgres 安装解码插件:

    vim  $PGDATA/postgresql.conf

    shared_preload_libraries = 'jsoncdc.so'

    安装完成插件

    postgres 插入数据

    生产数据到本地 kafka

    /opt/bin/pgsql/pg_10/bin/pg_recvlogical -d postgres -S jsoncdc --start -o pretty-print=1 -f - | ./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg

     Auto-selecting Producer mode (use -P or -C to override)

    消费数据测试:

    ./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg

    kafka自消费实验:

    启动zk,我这边有zk服务器,因此不需要启动:

    bin/zookeeper-server-start.sh config/zookeeper.properties &

    启动kafka

    bin/kafka-server-start.sh config/server.properties

    在里面修改zk连接信息

    创建topic

    bin/kafka-topics.sh --create --zookeeper --replication-factor 1 --partitions 1 --topic test

    创建一个叫做“test”topic,它只有一个分区,一个副本。

    > bin/kafka-topics.sh --create --zookeeper 10.9.5.20:4119,10.9.5.21:4119,10.9.5.22:4119,10.9.5.24:4119,10.9.5.35:4119,10.9.5.36:4119 --replication-factor 1 --partitions 1 --topic test

    可以通过list命令查看创建的topic:

    > bin/kafka-topics.sh --list --zookeeper localhost:2181

    test

    运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

    This is a messageThis is another message

    ctrl+c可以退出发送。

    消费消息:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/server.properties --topic test --from-beginning

    接下来想写一个nodejs程序,将pg的变化动态回放到web中,显示出来。

  • 相关阅读:
    纯CSS星级评价
    Enterprise Library启用签名后发生 PublicKeyToken错误,HRESULT:0x80131040解决
    SQL Server
    该如何选择国外VPS
    网站的伪静态化
    kernel FIELD_SIZEOF宏 NULL地址不访问不出错
    Activity的四种加载模式
    Git magic 简短git使用命令集
    为什么包含多句代码的宏要用do while包括起来?
    使用lsof来查看FD和进程的关系
  • 原文地址:https://www.cnblogs.com/kuang17/p/11084567.html
Copyright © 2011-2022 走看看