zoukankan      html  css  js  c++  java
  • canal实时同步mysql binlog到rabbitmq

    本文使用mysql+canal+rabbitmq

    mysql 配置和canal安装请参考canal官网

    https://github.com/alibaba/canal/wiki/QuickStart

    Canal Kafka RocketMQ RabbitMQ QuickStart

    # 说明 canal_v_1.1.5虽然官方文档上没有明确支持rabbitmq,实际上已经支持了
    # 提示 canal rabbitmq依赖zookeeper,因此文档安装zookeeper不可忽略,很多博文中没有提到这点,特别提示
    https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

     上述配置完成之后,MQ接收到数据JSON格式如下:

    {
        "data":[
            {
                "id":"5",
                "name":"666"
            }
        ],
        "database":"test",
        "es":1609404510000,
        "id":4,
        "isDdl":false,
        "mysqlType":{
            "id":"int(11)",
            "name":"varchar(255)"
        },
        "old":null,
        "pkNames":[
            "id"
        ],
        "sql":"",
        "sqlType":{
            "id":4,
            "name":12
        },
        "table":"t_user",
        "ts":1609404510314,
        "type":"INSERT"
    }

    下面重点记录由json还原成SQL实现

    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import net.dreamlu.mica.core.utils.$;
    import org.apache.commons.lang.StringUtils;
    
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * canal rabbitmq 消息解析成dml
     *
     * @author : Lee
     * @date : 2021-01-04
     */
    @Slf4j
    @AllArgsConstructor
    public class MysqlDmlFieldData extends LinkedHashMap<String, Object> {
    
        private MysqlDmlFieldData(Map<String, Object> data) {
            super();
            super.putAll(data);
        }
    
        public static MysqlDmlFieldData fromJsonString(String json) {
            return new MysqlDmlFieldData($.readJsonAsMap(json, String.class, Object.class));
        }
    
        public String getTable() {
            return super.containsKey("table") ? (String) super.get("table") : StringUtils.EMPTY;
        }
    
        public String getType() {
            return super.containsKey("type") ? (String) super.get("type") : StringUtils.EMPTY;
        }
    
        public String getDmlSql() {
            String type = this.getType();
            switch (type) {
                case "INSERT":
                    return getInsertSql();
                case "UPDATE":
                    return getUpdateSql();
                default:
                    log.warn("不支持该DML操作type:", type);
            }
            return "";
        }
    
        public String getInsertSql() {
            String insertSql = "insert into " + getTable() + insColAndVal();
            return insertSql;
        }
    
        public String getUpdateSql() {
            String updateSql = "update " + getTable() + " set " + updColAndVal();
            return updateSql;
        }
    
        public Map<String, String> getData() {
            String json = $.toJson(super.get("data"));
            return $.readJsonAsList(json, Map.class).get(0);
        }
    
        public String updColAndVal() {
            String updColAndVal = "";
            Map<String, String> dataMap = this.getData();
            for (String key : dataMap.keySet()) {
                if ($.equalsSafe(key, getPkNames())) {
                    continue;
                }
                updColAndVal += key + "='" + dataMap.get(key) + "',";
    
            }
            updColAndVal = StringUtils.substringBeforeLast(updColAndVal, ",");
            updColAndVal += " where " + getPkNames() + "='" + dataMap.get(getPkNames()) + "'";
            return updColAndVal;
        }
    
        public String insColAndVal() {
            Map<String, String> sqlType = this.getData();
            Set<String> set = sqlType.keySet();
            String columns = StringUtils.join(set.iterator(), ",");
            String values = StringUtils.join(sqlType.values(), "','");
            return " (" + columns + ") values ('" + values + "')";
    
        }
    
        public String getPkNames() {
            String json = $.toJson(super.get("pkNames"));
            $.readJsonAsList(json, String.class);
            return $.readJsonAsList(json, String.class).get(0);
    
        }
    
        public String getMysqlType() {
            return super.containsKey("mysqlType") ? (String) super.get("mysqlType") : StringUtils.EMPTY;
        }
    
    
    }
  • 相关阅读:
    头条前端笔试最后一道题
    Node读取和写入json,格式化输出json
    CSS中的未定义行为,浏览器的差异(一)
    18.2.28阿里前端实习生内推面补坑
    18.2.26深信服Web实习生补坑(已拿到offer)
    MySQL Parameter '?…' has already been defined 是什么问题
    C# List<T>的 Find方法、FindLast方法、FindAll方法、FindIndex方法
    C# 对List<T>进行排序
    SQL里 asc和desc的意思
    Visual Studio同步的时候显示 team foundation 错误 系统找不到指定文件夹
  • 原文地址:https://www.cnblogs.com/lixyu/p/14267622.html
Copyright © 2011-2022 走看看