zoukankan      html  css  js  c++  java
  • Canal-从零开始:Canal连接kafka同步ElasticSearch

      省略kafka和zookeeper的安装,直接开始server的配置

    修改conf/example下的instance.properties文件

      

    # mysql的master地址,账号及密码
    canal.instance.master.address=127.0.0.1:13306
    canal.instance.dbUsername=root
    canal.instance.dbPassword=123456
    #解析白名单,正则;
    canal.instance.filter.regex=mytest\..*
    # mq config
    canal.mq.topic=example
    

      我这一段的配置表示,example这个示例,监听的是mytest这个database下面所有的表,会把监听到的结果发送到kafka的example队列上

    修改conf/canal.properties文件

      

    # tcp, kafka, RocketMQ,serverMode改成kafka
    canal.serverMode = kafka
    

      

    ##################################################
    #########                    MQ                      #############
    ##################################################
    canal.mq.servers = 127.0.0.1:9092 #这里改成连接的kafka地址
    

      

    修改完之后重启canal.server;

    如果kafka在canal启动后有更改,canal需要重启

    kafka消费端代码

    package com.canal.demo.handler;
    
    import com.alibaba.fastjson.JSON;
    import com.canal.demo.constants.BinlogConstants;
    import com.canal.demo.es.EsRestClient;
    import com.canal.demo.transfer.DataConvertFactory;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * <p>
     * 消息处理器
     * </p>
     *
     * @package: com.xkcoding.mq.kafka.handler
     * @description: 消息处理器
     * @author: yangkai.shen
     * @date: Created in 2019-01-07 14:58
     * @copyright: Copyright (c) 2019
     * @version: V1.0
     * @modified: yangkai.shen
     */
    @Component
    @Slf4j
    public class MessageHandler {
        @Autowired
        DataConvertFactory dataConvertFactory;
        @Autowired
        EsRestClient esRestClient;
        @Value("${hostAndPort}")
        private String hostAndPort;
    
        @KafkaListener(topics = {"example","example_2"}, containerFactory = "ackContainerFactory")
        public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
            try {
                String message = (String) record.value();
                log.info("收到消息Str: {}", message);
                Map<String, Object> map = JSON.parseObject(message);
                List<Map<String, String>> dataList = (List<Map<String, String>>) map.get(BinlogConstants.BINLOG_DATA_KEY);
                if (dataList != null) {
                    log.info("接受行数据:{}", JSON.toJSONString(dataList));
                    String table = (String) map.get(BinlogConstants.BINLOG_TBL_KEY);
                    // 进行格式转换的数据
                    Map<String, String> params = dataConvertFactory.transferData(dataList.get(0), table);
                    String type = (String) map.get(BinlogConstants.BINLOG_TYPE_KEY);
                    esRestClient.buildClient(hostAndPort);
                    switch (type) {
                        case BinlogConstants.INSERT_EVENT:
                            String doc = esRestClient.insertDocument(table, "_doc", params);
                            log.info("doc_id:{}", doc);
                            break;
                        case BinlogConstants.UPDATE_EVENT:
                            log.info("update---");
                            break;
                        case BinlogConstants.DELETE_EVENT:
                            log.info("delete");
                            break;
                    }
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                // 手动提交 offset
                acknowledgment.acknowledge();
            }
        }
    }
    

    在数据库插入数据

    insert into mytest.test_tab_3 (c_id,c_name) values(225,'abda');
    

      

    控制台输出结果

    2020-11-13 17:22:25.606  INFO 9872 --- [ntainer#0-0-C-1] com.canal.demo.handler.MessageHandler    : 收到消息Str: {"data":[{"c_id":"225","c_name":"abda","c_age":null}],"database":"mytest","es":1605259339000,"id":3,"isDdl":false,"mysqlType":{"c_id":"int","c_name":"varchar(20)","c_age":"int"},"old":null,"pkNames":["c_id"],"sql":"","sqlType":{"c_id":4,"c_name":12,"c_age":4},"table":"test_tab_3","ts":1605259339448,"type":"INSERT"}
    2020-11-13 17:22:25.672  INFO 9872 --- [ntainer#0-0-C-1] com.canal.demo.handler.MessageHandler    : 接受行数据:[{"c_id":"225","c_name":"abda"}]
    

      

  • 相关阅读:
    [leetcode] Combinations
    Binary Tree Level Order Traversal I II
    [leetcode] Remove Duplicates from Sorted Array I II
    [leetcode] Permutations II
    [leetcode] Permutations
    如何在线程间进行事件通知?
    如何实现迭代对象和迭代器对象?
    如何判断字符串a是否以字符串 b开头或者结尾?
    如何实现用户的历史记录功能(最多n条)?
    如何让字典保持有序?
  • 原文地址:https://www.cnblogs.com/chylcblog/p/13968910.html
Copyright © 2011-2022 走看看