zoukankan      html  css  js  c++  java
  • canal-adapter-进行全量和增量到ES7.*(七)

    下载

    参考:https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0

    源码修改参考:https://www.cnblogs.com/LQBlog/p/14661238.html

    zk分布式锁

    get /canal-adapter/sync-switch/{canalDestination} on为可获取锁 off为阻塞

    配置文件修改

    server:
      port: 8085
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    canal.conf:
      mode: tcp # kafka rocketMQ 数据来源TCP则是binlog 其他则增量数据来源是kafka和rocketMQ
      #canalServerHost: 192.168.20.5:11111              #canal 单机地址和端口
      zookeeperHosts: 192.168.20.4:2181 #基于zookeeper集群
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源码加的防止订阅不关心的消息
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/merge_test?useUnicode=true      #数据库
          username: kuaihe
          password: Kuaihe0910Mysql
      canalAdapters:
        - instance: kuaihe_db_test            # canal instance Name or mq topic name  对应canal创建的文件夹 如果是canal则是订阅指定canal-server instance 配置的的binlog
          groups:
            - groupId: g1
              outerAdapters:
                - name: es7.4.0    #适配器名称 SPI扩展点 可参考此配置 对应的key com.alibaba.otter.canal.client.adapter.OuterAdapter
                  hosts: 127.0.0.1.1:9200                     # es 集群地址, 逗号分隔
                  properties:
                    mode: rest # transport # or rest
                    security:
                      auth: elastic:T3StdABAk #es用户名密码

    增加es文件夹

    我是基于源码自己打包 所以 直接在源码修改打包 启动会自动扫描es 目录下的所有yml

     pro_brand.yml例子 我的是单表同步 其他复杂同步 可以看官方文档

    dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
    #outerAdapterKey: exampleKey     # 对应application.yml中es配置的key
    destination: kuaihe_db_test            # key=value_+_defaultDS对应库名字+_sql对应表  收到canal的binlog增量消息 则通过当前binlog消息的destination+数据库+表 找到当前配置com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(int, java.lang.Long, java.util.concurrent.TimeUnit)
    groupId: g1                       # 对应MQ模式下的groupId, 只会同步对应groupId的数据
    esMapping:
      _index: pro_brand
      _type: _doc
      _id: id
      pk: id
      sql: "
    select id,create_timestamp,update_timestamp,`code`,brand_code,brand_mall_idx,delete_flag,depot_id,description,idx,kuaihe_idx,logo,`name`,name_en,prefix,sale_channel,seo_description,seo_keywords,seo_title,`status`,vendor_id from pro_brand
      "
      commitBatch: 3000

    全量

    导的时候记得看日志 有没有报错

    post:http://192.168.20.5:8085/etl/es7.4.0/soa_ord_order_item.yml

    {
        "succeeded": true,
        "resultMessage": "导入ES 数据:61513 条"
    }

    增量

    增量是启动自动触发binlog订阅 可参考源码

    com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader

    ->启动worker

    com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker

    根据条件增量

    我自己改了源码更累灵活

    post http://127.0.0.1:8087/etl/es7.4.0/soa_ord_order.yml?params=683071930737168384;2021-04-29 00:00:12;2021-04-29 23:55:12

    {
        "condition":"where user_id={} and create_timestamp>={} and create_timestamp<={}"
    }

    改动点1 com.alibaba.otter.canal.adapter.launcher.rest.CommonRest#etl

       @PostMapping("/etl/{type}/{key}/{task}")
        public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
                             @RequestParam(name = "params", required = false) String params,@RequestBody  Map<String,String>  mapParameters)
      try {
                    List<String> paramArray = null;
                    if (params != null) {
                        paramArray = Arrays.asList(params.trim().split(";"));
                    }
                    String condition=mapParameters==null?null:mapParameters.get("condition");
                    return adapter.etl(task,condition, paramArray);
                } finally {
                    if (destination != null && oriSwitchStatus) {
                        syncSwitch.on(destination);
                    } else if (destination == null && oriSwitchStatus) {
                        syncSwitch.on(task);
                    }
                }

    改动点2

    com.alibaba.otter.canal.client.adapter.es.ESAdapter#etl

     @Override
        public EtlResult etl(String task,String condition, List<String> params) {
            EtlResult etlResult = new EtlResult();
            ESSyncConfig config = esSyncConfig.get(task);
            if (config != null) {
               .........
                if (dataSource != null) {
                    return esEtlService.importData(params,condition);
                } 
              .........
            } else {
                     .........
                        EtlResult etlRes = esEtlService.importData(params,condition);
                     .......... 
                    
                }
                
            etlResult.setSucceeded(false);
            etlResult.setErrorMessage("Task not found");
            return etlResult;
        }

    改动点3:com.alibaba.otter.canal.client.adapter.support.AbstractEtlService#importData

        DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
    
                List<Object> values = new ArrayList<>();
                //liqiangtodo etl支持动态传入condition改动
                condition=condition==null?config.getMapping().getEtlCondition():condition;
                // 拼接条件
                if (condition != null && params != null) {
                    String etlCondition =condition;
                    for (String param : params) {
                        etlCondition = etlCondition.replace("{}", "?");
                        values.add(param);
                    }
                    sql += " " + etlCondition;
                }
    
                if (logger.isDebugEnabled()) {
                    logger.debug("etl sql : {}", sql);
                }
    
                // 获取总数
                String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";

    动态更新配置文件

    依赖2表canal_config(存储application配置),canal_adapter_config(存储es目录下的配置) 获取sql脚本

    参考:https://www.cnblogs.com/LQBlog/p/14598582.html 

    配置application.properties

    修改的话记得修改修改时间

    INSERT INTO `merge_test`.`canal_config`(`id`, `cluster_id`, `server_id`, `name`, `status`, `content`, `content_md5`, `modified_time`) VALUES (2, NULL, NULL, 'application.yml', NULL, 'server:
      port: 8085
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    canal.conf:
      mode: tcp # kafka rocketMQ
      #canalServerHost: 192.168.20.5:11111              #canal 地址和端口
      zookeeperHosts: 192.168.20.4:2181 #基于zookeeper集群
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源码加的防止订阅不关心的消息
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://rm-2zeqc826391dt0lk2.mysql.rds.aliyuncs.com:3306/merge_test?useUnicode=true      #数据库
          username: kuaihe
          password: Kuaihe0910Mysql
      canalAdapters:
        - instance: kuaihe_db_test            # canal instance Name or mq topic name  对应canal创建的文件夹
          groups:
            - groupId: g1
              outerAdapters:
                - name: es7.4.0    #适配器名称
                  hosts: es-cn-st21taqhf0003bw9m.elasticsearch.aliyuncs.com:9200                     # es 集群地址, 逗号分隔
                  properties:
                    mode: rest # transport # or rest
                    security:
                      auth: elastic:T3StdABAk #es用户名密码
    ', '', '2021-04-14 15:09:07');

     id一定要为2 详情可看源码com.alibaba.otter.canal.adapter.launcher.monitor.remote.DbRemoteConfigLoader#getRemoteAdapterConfig

        /**
         * 获取远程application.yml配置
         *
         * @return 配置对象
         */
        private ConfigItem getRemoteAdapterConfig() {
            String sql = "select name, content, modified_time from canal_config where id=2";
            try (Connection conn = dataSource.getConnection();
                    Statement stmt = conn.createStatement();
                    ResultSet rs = stmt.executeQuery(sql)) {
                if (rs.next()) {
                    ConfigItem configItem = new ConfigItem();
                    configItem.setId(2L);
                    configItem.setName(rs.getString("name"));
                    configItem.setContent(rs.getString("content"));
                    configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
                    return configItem;
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            return null;
        }

    配置canal_adapter_config

     canal_adapter_config

    category为目录路径 name为文件名字 修改 之后 记得修改以下修改时间

    修改bootstrap.yml 增加读取配置的数据库

    canal: #基于数据库
      manager:
        jdbc:
          url: jdbc:127.0.0.13306/merge_test?useUnicode=true
          username: root
          password: 123456

    原理

    启动 会读取这2个表的配置然后刷新到项目文件 项目打了@RefreceScore注解  同时会启动一个线程监听改动

  • 相关阅读:
    ios 分享腾讯微博
    ios 分享微信
    elinput 价格校验 大于0保留2位小数,不包含01,01.1这种
    从范闲到许乐,从宁缺再到陈长生
    二进制流 ajax 实现图片上传
    php 操作redis 部分命令
    matlab 与vs2008联合编程的设置备忘
    新型机器人闯入职场:到底是工作缔造者还是工作终结者?
    c++ vs2008 多线程编程的样例
    虚拟试衣创业公司Fitiquette被印度电商Myntra收购
  • 原文地址:https://www.cnblogs.com/LQBlog/p/14661570.html
Copyright © 2011-2022 走看看