zoukankan      html  css  js  c++  java
  • canal-adapter-进行全量和增量到ES7.*源码修改(六)

    说明

    不知道是否对7.*所有版本有效 我目前是7.4 如果按照下面方法不行 最好是下载源码调试

    源码获取参考 https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0

    支持的同步

    输入端

    canal adapter个人理解是canal实现的 数据同步增量全量到ES和和各个数据库 暂时只支持以下几种可以看出接口打了SPI注解 如果我们输出到其他端可以扩展

    输出端

    如以下配置 我大概看了源码 tcp增量数据来源是binlog 也可以是kafka和rocketMQ

    canal.conf:
      mode: tcp # kafka rocketMQ 数据来源TCP则是binlog 其他则增量数据来源是kafka和rocketMQ

    同步到ES7.*源码修改

    因为各个ES版本客户端api都不一致,默认只支持到了6.4以下的版本,如果以上版本 则需要自行替换es客户端 然后某些不兼容的地方修改,这里我拿ES7.4做比较

    /canal-canal-1.1.4/client-adapter/elasticsearch/pom.xml

    1.修改POM文件 提高客户端版本

    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch</groupId>-->
    <!--            <artifactId>elasticsearch</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch.client</groupId>-->
    <!--            <artifactId>transport</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch.client</groupId>-->
    <!--            <artifactId>elasticsearch-rest-client</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch.client</groupId>-->
    <!--            <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
            <!--liqiangtodo 版本提高改动-->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.4.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>7.4.0</version>
            </dependency>
            <!--7.4.0没有对应版本的transport 所以我还是使用6.4.3-->
                    <dependency>
                        <groupId>org.elasticsearch.client</groupId>
                        <artifactId>transport</artifactId>
                        <version>6.4.3</version>
                    </dependency>
            <dependency>

    2.放开mapping判断

    com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping

      public MappingMetaData getMapping(String index, String type) {
            MappingMetaData mappingMetaData = null;
            if (mode == ESClientMode.TRANSPORT) {
                ImmutableOpenMap<String, MappingMetaData> mappings;
                try {
                    mappings = transportClient.admin()
                        .cluster()
                        .prepareState()
                        .execute()
                        .actionGet()
                        .getState()
                        .getMetaData()
                        .getIndices()
                        .get(index)
                        .getMappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                }
                mappingMetaData = mappings.get(type);
    
            } else {
                ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
                try {
                    GetMappingsRequest request = new GetMappingsRequest();
                    request.indices(index);
                    GetMappingsResponse response;
                    // try {
                    // response = restHighLevelClient
                    // .indices()
                    // .getMapping(request, RequestOptions.DEFAULT);
                    // // 6.4以下版本直接使用该接口会报错
                    // } catch (Exception e) {
                    // logger.warn("Low ElasticSearch version for getMapping");
                    response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                    // }
    
                    mappings = response.mappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                    return null;
                }
                mappingMetaData = mappings.get(index).get(type);
            
            }
            return mappingMetaData;
        }

    将红色部分改为

        //liqiangtodo 版本提高改动
        public MappingMetaData getMapping(String index, String type) {
            MappingMetaData mappingMetaData = null;
            if (mode == ESClientMode.TRANSPORT) {
                ImmutableOpenMap<String, MappingMetaData> mappings;
                try {
                    mappings = transportClient.admin()
                            .cluster()
                            .prepareState()
                            .execute()
                            .actionGet()
                            .getState()
                            .getMetaData()
                            .getIndices()
                            .get(index)
                            .getMappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                }
                mappingMetaData = mappings.get(type);
    
            } else {
                ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
                try {
                    GetMappingsRequest request = new GetMappingsRequest();
                    request.indices(index);
                    GetMappingsResponse response;
                    response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                    mappings = response.mappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                    return null;
                }
              //  mappingMetaData = mappings.get(index).get(type);
                //liqiangtodo 版本提高改动
                mappingMetaData = mappings.get(index).get("properties");
            }
            return mappingMetaData;
        }

    3.bulk版本兼容改动

    com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest#bulk

    红色部分是我的改动,刷新策略是 我需要理解可见 因为用户下单后 同步到es 需要马上在订单列表可见

    ublic BulkResponse bulk() {
                BulkResponse bulkResponse=null;
                if (mode == ESClientMode.TRANSPORT) {
                    bulkResponse=bulkRequestBuilder.execute().actionGet();
                } else {
                    try {
                        //详见文档https://blog.csdn.net/hanchao5272/article/details/89151166 刷新策略 立即可见
                        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                        //liqiangtodo 版本提高修改
                        bulkResponse =restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
                       // return restHighLevelClient.bulk(bulkRequest);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                return bulkResponse;
    
            }

    4.getEsType

    com.alibaba.otter.canal.client.adapter.es.support.ESTemplate#getEsType

    **
         * 获取es mapping中的属性类型
         *
         * @param mapping   mapping配置
         * @param fieldName 属性名
         * @return 类型
         */
        @SuppressWarnings("unchecked")
        private String getEsType(ESMapping mapping, String fieldName) {
            String key = mapping.get_index() + "-" + mapping.get_type();
            Map<String, String> fieldType = esFieldTypes.get(key);
            if (fieldType != null) {
                return fieldType.get(fieldName);
            } else {
                MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index(), mapping.get_type());
    
                if (mappingMetaData == null) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
                }
    
                fieldType = new LinkedHashMap<>();
    
                //liqiangtodo 版本升级改动
                Map<String, Object> esMapping =mappingMetaData.getSourceAsMap();
    //            Map<String, Object> sourceMap ==mappingMetaData.getSourceAsMap();
    //            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
                for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
                    Map<String, Object> value = (Map<String, Object>) entry.getValue();
                    if (value.containsKey("properties")) {
                        fieldType.put(entry.getKey(), "object");
                    } else {
                        fieldType.put(entry.getKey(), (String) value.get("type"));
                    }
                }
                esFieldTypes.put(key, fieldType);
    
                return fieldType.get(fieldName);
            }
        }

    5.count

    com.alibaba.otter.canal.client.adapter.es.ESAdapter#count

     @Override
        public Map<String, Object> count(String task) {
            ESSyncConfig config = esSyncConfig.get(task);
            ESMapping mapping = config.getEsMapping();
            SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index(), mapping.get_type()).size(0)
                .getResponse();
    
            //liqiangtodo 提高版本改动
            //long rowCount = response.getHits().getTotalHits();
            TotalHits totalHits= response.getHits().getTotalHits();
            Map<String, Object> res = new LinkedHashMap<>();
            res.put("esIndex", mapping.get_index());
            res.put("count", totalHits.value);
            return res;
        }

    6.修改es adapter的SPI的key

    client-adapter/elasticsearch/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

    #es=com.alibaba.otter.canal.client.adapter.es.ESAdapter
    es7.4.0=com.alibaba.otter.canal.client.adapter.es.ESAdapter

    消费不到binlog消息修改

    com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#CanalAdapterWorker

       public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address,
                                  List<List<OuterAdapter>> canalOuterAdapters){
            super(canalOuterAdapters);
            this.canalClientConfig = canalClientConfig;
            this.canalDestination = canalDestination;
            //liqiangtodo 消费不到改动
            connector = CanalConnectors.newClusterConnector(Arrays.asList(address), canalDestination, "", "");
            //connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
        }

    订阅指定表配置

    防止订阅到实例里面其他表变动

    com.alibaba.otter.canal.client.adapter.support.CanalClientConfig 增加配置

       //liqiangtodo canal订阅的消息
        private String subscribe;

    com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#process

    ...... 
     //获取锁 com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch.init 初始化位置 会读取zk
                    /**
                     * /canal-adapter/sync-switch/{canalDestination} on为可获取锁 off为阻塞
                     */
                    syncSwitch.get(canalDestination);
    
                    logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                    connector.connect();
                    logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
                    //liqiangtodo 消费不到改动 订阅自己关心的
                    if(StringUtils.isEmpty(canalClientConfig.getSubscribe())){
                        connector.subscribe("*.*");
                    }else {
                        connector.subscribe(canalClientConfig.getSubscribe());
                    }
                    //connector.subscribe();
                    logger.info("=============> Subscribe destination: {},subscribe:{} succeed <=============",this.canalDestination,StringUtils.isEmpty(canalClientConfig.getSubscribe())?"*.*":canalClientConfig.getSubscribe());
    ......

    使用方式

    canal.conf:
      subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源码加的防止订阅不关心的消息

    基本类型空指针

    如Interger 数据库存的是null 

    com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil#typeConvert

     /**
         * 类型转换为Mapping中对应的类型
         */
        public static Object typeConvert(Object val, String esType) {
            if (val == null) {
                return null;
            }
            if (esType == null) {
                return val;
            }
            //liqiangtodo 增加代码
            //For input string: ""防止空转换异常指针异常
            if(StringUtil.isNullOrEmpty(val.toString())){
                return null;
            }
     ......
    }
  • 相关阅读:
    利用dockerfile定制镜像
    发布Docker 镜像到dockerhub
    Docker 停止容器
    133. Clone Graph
    132. Palindrome Partitioning II
    131. Palindrome Partitioning
    130. Surrounded Regions
    129. Sum Root to Leaf Numbers
    128. Longest Consecutive Sequence
    127. Word Ladder
  • 原文地址:https://www.cnblogs.com/LQBlog/p/14661238.html
Copyright © 2011-2022 走看看