zoukankan      html  css  js  c++  java
  • spring cloud集成canal

    前提

    win运行canal

    加入canal依赖

    1 <dependency>
    2     <groupId>com.alibaba.otter</groupId>
    3     <artifactId>canal.client</artifactId>
    4     <version>1.1.3</version>
    5 </dependency>

    把ip、端口、监听表名做成配置文件

    代码实现

      1 package com.frame.modules.dabis.archives.thread;
      2 
      3 import com.alibaba.fastjson.JSONObject;
      4 import com.alibaba.otter.canal.client.CanalConnector;
      5 import com.alibaba.otter.canal.client.CanalConnectors;
      6 import com.alibaba.otter.canal.protocol.CanalEntry;
      7 import com.alibaba.otter.canal.protocol.Message;
      8 import com.frame.solr.em.SolrCode;
      9 import com.frame.utils.PropertiesLoader;
     10 import org.apache.commons.logging.Log;
     11 import org.apache.commons.logging.LogFactory;
     12 
     13 import java.net.InetSocketAddress;
     14 import java.util.HashMap;
     15 import java.util.List;
     16 import java.util.Map;
     17 
     18 /**
     19  * @author liwei
     20  * @date 2019/8/2 14:39
     21  * @desc Created with IntelliJ IDEA.
     22  */
     23 public class CanalThread implements Runnable {
     24 
     25     Log log = LogFactory.getLog(CanalThread.class);
     26 
     27     private String solrName = SolrCode.ARCHIVES.getValue();
     28 
     29 
     30     @Override
     31     public void run() {
     32         PropertiesLoader loader = new PropertiesLoader("solrConfig.properties");
     33         listener(loader.getProperty("canalHost"), loader.getProperty("canalPort"), loader.getProperty("canalTable"));
     34     }
     35 
     36 
     37     public void listener(String canalHost, String canalPort, String table) {
     38         // 创建链接
     39         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, Integer.valueOf(canalPort)), "example", "", "");
     40         int batchSize = 1000;
     41         try {
     42             // 连接
     43             connector.connect();
     44             // 监听表
     45             connector.subscribe(table);
     46             connector.rollback();
     47             // 一直循环监听
     48             while (true) {
     49                 // 获取指定数量的数据
     50                 Message message = connector.getWithoutAck(batchSize);
     51                 long batchId = message.getId();
     52                 if(-1 != batchId && 0 != message.getEntries().size()) {
     53                     printEntry(message.getEntries());
     54                 }
     55                 // 提交确认
     56                 connector.ack(batchId);
     57             }
     58         } finally {
     59             connector.disconnect();
     60         }
     61     }
     62 
     63     /**
     64      * 打印具体变化
     65      * @param entrys
     66      */
     67     private void printEntry(List<CanalEntry.Entry> entrys) {
     68         for (CanalEntry.Entry entry : entrys) {
     69             if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
     70                 continue;
     71             }
     72 
     73             CanalEntry.RowChange rowChage = null;
     74             try {
     75                 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
     76             } catch (Exception e) {
     77                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
     78                         e);
     79             }
     80 
     81             CanalEntry.EventType eventType = rowChage.getEventType();
     82             System.out.println(String.format("================> binlog[%s:%s] , 数据库:%s,表名%s , 类型: %s",
     83                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
     84                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
     85                     eventType));
     86 
     87             for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
     88                 if (eventType == CanalEntry.EventType.DELETE) {
     89                     printColumn(rowData.getBeforeColumnsList());
     90                 } else if (eventType == CanalEntry.EventType.INSERT) {
     91                     printColumn(rowData.getAfterColumnsList());
     92                 } else {
     93                     System.out.println("-------修改之前");
     94                     printColumn(rowData.getBeforeColumnsList());
     95                     System.out.println("-------修改之后");
     96                     printColumn(rowData.getAfterColumnsList());
     97                 }
     98             }
     99         }
    100     }
    101 
    102     private void printColumn(List<CanalEntry.Column> columns) {
    103         Map<String,Object> aaMap = new HashMap<>();
    104         for (CanalEntry.Column column : columns) {
    105             aaMap.put(column.getName(), column.getValue());
    106         }
    107         System.out.println( new JSONObject(aaMap).toJSONString());
    108     }
    109 }

    启动线程

    新增

     

    修改

     

     

    删除

     

     

    注意:拿到的值都是字符串,建议拿到id反查数据库,拿到对象再同步到自己的缓存。

  • 相关阅读:
    将抓包工具证书从用户目录移动至系统目录,解决反爬对于本地证书认证(安卓7)
    《C++ concurrency in action》 读书笔记 -- Part 2 第三章 线程间的数据共享
    《C++ concurrency in action》 读书笔记 -- Part 3 第四章 线程的同步
    C++14 也快要来了
    《C++ concurrency in action》 读书笔记 -- Part 4 第五章 C++的多线程内存模型 (1)
    利用表达式树构建委托改善反射性能
    使用Task简化Silverlight调用Wcf(再续)
    逆变与协变详解
    Beginning Silverlight 4 in C#数据访问和网络
    使用Task简化Silverlight调用Wcf(续)
  • 原文地址:https://www.cnblogs.com/xiaostudy/p/11569750.html
Copyright © 2011-2022 走看看