zoukankan      html  css  js  c++  java
  • PostgreSQL逻辑复制槽 pg_recvlogical test_decoding wal2json

       Schema   |                Name                 | Result data type |                            Argument data types                            |  Type
    ------------+-------------------------------------+------------------+---------------------------------------------------------------------------+--------
     pg_catalog | pg_create_logical_replication_slot  | record           | slot_name name, plugin name, OUT slot_name text, OUT xlog_position pg_lsn | normal
     pg_catalog | pg_create_physical_replication_slot | record           | slot_name name, OUT slot_name name, OUT xlog_position pg_lsn              | normal
    

    1.需求

    遇到一种需要将一部分表通过logical_tool用逻辑复制槽的方式同步到kaffka,之前没有使用过,因此对逻辑复制槽进行了了解。

    2.资料

    pg_create_logical_replication_slot方法配置两个参数:slot_name、plugin_name,例如:

    pg_create_logical_replication_slot('test_logical_slot', 'wal2json');

    这样就可以通过wal2json这个工具来解析wal日志,也可以用test_decoding自带的解析工具来做,有时间进行详细了解····

     

    --201906027有时间了:

    test_decoding自带的,直接在psql中安装插件:create extension test_decoding;

    wal2json需要下载编译安装:

    安装:
    $ git clone https://github.com/eulerto/wal2json.git
    $ cd wal2json
    # Make sure your path includes the bin directory that contains the correct `pg_config`
    $ PATH=/path/to/pg/bin:$PATH
    $ USE_PGXS=1 make
    $ USE_PGXS=1 make install
    
    修改wal日志级别:
    wal_level = logical
    max_replication_slots = 5
    max_wal_senders = 5
    
    参数:
    include-xids: add xid to each changeset. Default is false.
    include-timestamp: add timestamp to each changeset. Default is false.
    include-schemas: add schema to each change. Default is true.
    include-types: add type to each change. Default is true.
    include-typmod: add modifier to types that have it (eg. varchar(20) instead of varchar). Default is true.
    include-type-oids: add type oids. Default is false.
    include-not-null: add not null information as columnoptionals. Default is false.
    pretty-print: add spaces and indentation to JSON structures. Default is false.
    write-in-chunks: write after every change instead of every changeset. Default is false.
    include-lsn: add nextlsn to each changeset. Default is false.
    include-unchanged-toast (deprecated): add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is true.
    filter-tables: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. *.foo means table foo in all schemas and bar.* means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as public.Foo bar.
    add-tables: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from filter-tables.
    format-version: defines which format to use. Default is 1.
    
    常用参数是,pretty-print 1
    
    修改pg访问权限,可选:
    local    replication     myuser                     trust
    
    启动:
    $ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
    $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f -
    
    停止直接ctrl+c
    
    删除:
    pg_recvlogical -d postgres --slot test_slot --drop-slot
    
    
    测试脚本:
    $ cat /tmp/example2.sql
    CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
    CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
    
    SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
    
    BEGIN;
    INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
    DELETE FROM table2_with_pk WHERE a < 3;
    
    INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
    -- it is not added to stream because there isn't a pk or a replica identity
    UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
    COMMIT;
    
    SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
    SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
    

     

    3.类似工具


    另外关于逻辑槽的工具,在阿里云中看到他们用逻辑复制槽的方式进行数据库迁移:
    1.安装他们的ali_decoding工具;
    2.创建逻辑复制槽:SELECT * FROM pg_create_logical_replication_slot('replication_slot_test', 'ali_decoding');
    然后配置连接参数,开始迁移。

    4.应对集群切换

    而客户这边有一些不一样的地方,为了适应集群的切换,他们将逻辑槽所在的目录,pglogical设置为共享目录,当主节点被切换走了,在新的主节点重新启动逻辑复制槽(通过守护进程实现,同时,他们还需要重启数据库才能让共享目录被识别,不能简单的promote方式提示从节点为主节点)。

     

  • 相关阅读:
    Struts2SpringHibernate整合示例,一个HelloWorld版的在线书店(项目源码+详尽注释+单元测试)
    Java实现蓝桥杯勇者斗恶龙
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 224 基本计算器
    Java实现 LeetCode 224 基本计算器
  • 原文地址:https://www.cnblogs.com/kuang17/p/10136365.html
Copyright © 2011-2022 走看看