zoukankan      html  css  js  c++  java
  • Canal-从零开始:Canal连接kafka同步ElasticSearch

      省略kafka和zookeeper的安装,直接开始server的配置

    修改conf/example下的instance.properties文件

      

    # mysql的master地址,账号及密码
    canal.instance.master.address=127.0.0.1:13306
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123456
    #解析白名单,正则;
    canal.instance.filter.regex=mytest\..*
    # mq config
    canal.mq.topic=example
    

      我这一段的配置表示,example这个示例,监听的是mytest这个database下面所有的表,会把监听到的结果发送到kafka的example队列上

    修改conf/canal.properties文件

      

    # tcp, kafka, RocketMQ,serverMode改成kafka
    canal.serverMode = kafka
    

      

    ##################################################
    #########                    MQ                      #############
    ##################################################
    canal.mq.servers = 127.0.0.1:9092 #这里改成连接的kafka地址
    

      

    修改完之后重启canal.server;

    如果kafka在canal启动后有更改,canal需要重启

    kafka消费端代码

    package com.canal.demo.handler;
    
    import com.alibaba.fastjson.JSON;
    import com.canal.demo.constants.BinlogConstants;
    import com.canal.demo.es.EsRestClient;
    import com.canal.demo.transfer.DataConvertFactory;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * <p>
     * 消息处理器
     * </p>
     *
     * @package: com.xkcoding.mq.kafka.handler
     * @description: 消息处理器
     * @author: yangkai.shen
     * @date: Created in 2019-01-07 14:58
     * @copyright: Copyright (c) 2019
     * @version: V1.0
     * @modified: yangkai.shen
     */
    @Component
    @Slf4j
    public class MessageHandler {
        @Autowired
        DataConvertFactory dataConvertFactory;
        @Autowired
        EsRestClient esRestClient;
        @Value("${hostAndPort}")
        private String hostAndPort;
    
        @KafkaListener(topics = {"example","example_2"}, containerFactory = "ackContainerFactory")
        public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
            try {
                String message = (String) record.value();
                log.info("收到消息Str: {}", message);
                Map<String, Object> map = JSON.parseObject(message);
                List<Map<String, String>> dataList = (List<Map<String, String>>) map.get(BinlogConstants.BINLOG_DATA_KEY);
                if (dataList != null) {
                    log.info("接受行数据:{}", JSON.toJSONString(dataList));
                    String table = (String) map.get(BinlogConstants.BINLOG_TBL_KEY);
                    // 进行格式转换的数据
                    Map<String, String> params = dataConvertFactory.transferData(dataList.get(0), table);
                    String type = (String) map.get(BinlogConstants.BINLOG_TYPE_KEY);
                    esRestClient.buildClient(hostAndPort);
                    switch (type) {
                        case BinlogConstants.INSERT_EVENT:
                            String doc = esRestClient.insertDocument(table, "_doc", params);
                            log.info("doc_id:{}", doc);
                            break;
                        case BinlogConstants.UPDATE_EVENT:
                            log.info("update---");
                            break;
                        case BinlogConstants.DELETE_EVENT:
                            log.info("delete");
                            break;
                    }
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                // 手动提交 offset
                acknowledgment.acknowledge();
            }
        }
    }
    

    在数据库插入数据

    insert into mytest.test_tab_3 (c_id,c_name) values(225,'abda');
    

      

    控制台输出结果

    2020-11-13 17:22:25.606  INFO 9872 --- [ntainer#0-0-C-1] com.canal.demo.handler.MessageHandler    : 收到消息Str: {"data":[{"c_id":"225","c_name":"abda","c_age":null}],"database":"mytest","es":1605259339000,"id":3,"isDdl":false,"mysqlType":{"c_id":"int","c_name":"varchar(20)","c_age":"int"},"old":null,"pkNames":["c_id"],"sql":"","sqlType":{"c_id":4,"c_name":12,"c_age":4},"table":"test_tab_3","ts":1605259339448,"type":"INSERT"}
    2020-11-13 17:22:25.672  INFO 9872 --- [ntainer#0-0-C-1] com.canal.demo.handler.MessageHandler    : 接受行数据:[{"c_id":"225","c_name":"abda"}]
    

      

  • 相关阅读:
    CentOS虚拟机和物理机共享文件夹实现
    集训第六周 数学概念与方法 概率 数论 最大公约数 G题
    集训第六周 数学概念与方法 概率 F题
    集训第六周 E题
    集训第六周 古典概型 期望 D题 Discovering Gold 期望
    集训第六周 古典概型 期望 C题
    集训第六周 数学概念与方法 UVA 11181 条件概率
    集训第六周 数学概念与方法 UVA 11722 几何概型
    DAG模型(矩形嵌套)
    集训第五周 动态规划 K题 背包
  • 原文地址:https://www.cnblogs.com/chylcblog/p/13968910.html
Copyright © 2011-2022 走看看