zoukankan      html  css  js  c++  java
  • canal-adapter-进行全量和增量到ES7.*源码修改(六)

    说明

    不知道是否对7.*所有版本有效 我目前是7.4 如果按照下面方法不行 最好是下载源码调试

    源码获取参考 https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0

    支持的同步

    输入端

    canal adapter个人理解是canal实现的 数据同步增量全量到ES和和各个数据库 暂时只支持以下几种可以看出接口打了SPI注解 如果我们输出到其他端可以扩展

    输出端

    如以下配置 我大概看了源码 tcp增量数据来源是binlog 也可以是kafka和rocketMQ

    canal.conf:
      mode: tcp # kafka rocketMQ 数据来源TCP则是binlog 其他则增量数据来源是kafka和rocketMQ

    同步到ES7.*源码修改

    因为各个ES版本客户端api都不一致,默认只支持到了6.4以下的版本,如果以上版本 则需要自行替换es客户端 然后某些不兼容的地方修改,这里我拿ES7.4做比较

    /canal-canal-1.1.4/client-adapter/elasticsearch/pom.xml

    1.修改POM文件 提高客户端版本

    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch</groupId>-->
    <!--            <artifactId>elasticsearch</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch.client</groupId>-->
    <!--            <artifactId>transport</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch.client</groupId>-->
    <!--            <artifactId>elasticsearch-rest-client</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
    <!--        <dependency>-->
    <!--            <groupId>org.elasticsearch.client</groupId>-->
    <!--            <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
    <!--            <version>6.4.3</version>-->
    <!--        </dependency>-->
            <!--liqiangtodo 版本提高改动-->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.4.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>7.4.0</version>
            </dependency>
            <!--7.4.0没有对应版本的transport 所以我还是使用6.4.3-->
                    <dependency>
                        <groupId>org.elasticsearch.client</groupId>
                        <artifactId>transport</artifactId>
                        <version>6.4.3</version>
                    </dependency>
            <dependency>

    2.放开mapping判断

    com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping

      public MappingMetaData getMapping(String index, String type) {
            MappingMetaData mappingMetaData = null;
            if (mode == ESClientMode.TRANSPORT) {
                ImmutableOpenMap<String, MappingMetaData> mappings;
                try {
                    mappings = transportClient.admin()
                        .cluster()
                        .prepareState()
                        .execute()
                        .actionGet()
                        .getState()
                        .getMetaData()
                        .getIndices()
                        .get(index)
                        .getMappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                }
                mappingMetaData = mappings.get(type);
    
            } else {
                ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
                try {
                    GetMappingsRequest request = new GetMappingsRequest();
                    request.indices(index);
                    GetMappingsResponse response;
                    // try {
                    // response = restHighLevelClient
                    // .indices()
                    // .getMapping(request, RequestOptions.DEFAULT);
                    // // 6.4以下版本直接使用该接口会报错
                    // } catch (Exception e) {
                    // logger.warn("Low ElasticSearch version for getMapping");
                    response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                    // }
    
                    mappings = response.mappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                    return null;
                }
                mappingMetaData = mappings.get(index).get(type);
            
            }
            return mappingMetaData;
        }

    将红色部分改为

        //liqiangtodo 版本提高改动
        public MappingMetaData getMapping(String index, String type) {
            MappingMetaData mappingMetaData = null;
            if (mode == ESClientMode.TRANSPORT) {
                ImmutableOpenMap<String, MappingMetaData> mappings;
                try {
                    mappings = transportClient.admin()
                            .cluster()
                            .prepareState()
                            .execute()
                            .actionGet()
                            .getState()
                            .getMetaData()
                            .getIndices()
                            .get(index)
                            .getMappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                }
                mappingMetaData = mappings.get(type);
    
            } else {
                ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
                try {
                    GetMappingsRequest request = new GetMappingsRequest();
                    request.indices(index);
                    GetMappingsResponse response;
                    response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
                    mappings = response.mappings();
                } catch (NullPointerException e) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + index);
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                    return null;
                }
              //  mappingMetaData = mappings.get(index).get(type);
                //liqiangtodo 版本提高改动
                mappingMetaData = mappings.get(index).get("properties");
            }
            return mappingMetaData;
        }

    3.bulk版本兼容改动

    com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest#bulk

    红色部分是我的改动,刷新策略是 我需要理解可见 因为用户下单后 同步到es 需要马上在订单列表可见

    ublic BulkResponse bulk() {
                BulkResponse bulkResponse=null;
                if (mode == ESClientMode.TRANSPORT) {
                    bulkResponse=bulkRequestBuilder.execute().actionGet();
                } else {
                    try {
                        //详见文档https://blog.csdn.net/hanchao5272/article/details/89151166 刷新策略 立即可见
                        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                        //liqiangtodo 版本提高修改
                        bulkResponse =restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
                       // return restHighLevelClient.bulk(bulkRequest);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                return bulkResponse;
    
            }

    4.getEsType

    com.alibaba.otter.canal.client.adapter.es.support.ESTemplate#getEsType

    **
         * 获取es mapping中的属性类型
         *
         * @param mapping   mapping配置
         * @param fieldName 属性名
         * @return 类型
         */
        @SuppressWarnings("unchecked")
        private String getEsType(ESMapping mapping, String fieldName) {
            String key = mapping.get_index() + "-" + mapping.get_type();
            Map<String, String> fieldType = esFieldTypes.get(key);
            if (fieldType != null) {
                return fieldType.get(fieldName);
            } else {
                MappingMetaData mappingMetaData = esConnection.getMapping(mapping.get_index(), mapping.get_type());
    
                if (mappingMetaData == null) {
                    throw new IllegalArgumentException("Not found the mapping info of index: " + mapping.get_index());
                }
    
                fieldType = new LinkedHashMap<>();
    
                //liqiangtodo 版本升级改动
                Map<String, Object> esMapping =mappingMetaData.getSourceAsMap();
    //            Map<String, Object> sourceMap ==mappingMetaData.getSourceAsMap();
    //            Map<String, Object> esMapping = (Map<String, Object>) sourceMap.get("properties");
                for (Map.Entry<String, Object> entry : esMapping.entrySet()) {
                    Map<String, Object> value = (Map<String, Object>) entry.getValue();
                    if (value.containsKey("properties")) {
                        fieldType.put(entry.getKey(), "object");
                    } else {
                        fieldType.put(entry.getKey(), (String) value.get("type"));
                    }
                }
                esFieldTypes.put(key, fieldType);
    
                return fieldType.get(fieldName);
            }
        }

    5.count

    com.alibaba.otter.canal.client.adapter.es.ESAdapter#count

     @Override
        public Map<String, Object> count(String task) {
            ESSyncConfig config = esSyncConfig.get(task);
            ESMapping mapping = config.getEsMapping();
            SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index(), mapping.get_type()).size(0)
                .getResponse();
    
            //liqiangtodo 提高版本改动
            //long rowCount = response.getHits().getTotalHits();
            TotalHits totalHits= response.getHits().getTotalHits();
            Map<String, Object> res = new LinkedHashMap<>();
            res.put("esIndex", mapping.get_index());
            res.put("count", totalHits.value);
            return res;
        }

    6.修改es adapter的SPI的key

    client-adapter/elasticsearch/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

    #es=com.alibaba.otter.canal.client.adapter.es.ESAdapter
    es7.4.0=com.alibaba.otter.canal.client.adapter.es.ESAdapter

    消费不到binlog消息修改

    com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#CanalAdapterWorker

       public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address,
                                  List<List<OuterAdapter>> canalOuterAdapters){
            super(canalOuterAdapters);
            this.canalClientConfig = canalClientConfig;
            this.canalDestination = canalDestination;
            //liqiangtodo 消费不到改动
            connector = CanalConnectors.newClusterConnector(Arrays.asList(address), canalDestination, "", "");
            //connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
        }

    订阅指定表配置

    防止订阅到实例里面其他表变动

    com.alibaba.otter.canal.client.adapter.support.CanalClientConfig 增加配置

       //liqiangtodo canal订阅的消息
        private String subscribe;

    com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker#process

    ...... 
     //获取锁 com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch.init 初始化位置 会读取zk
                    /**
                     * /canal-adapter/sync-switch/{canalDestination} on为可获取锁 off为阻塞
                     */
                    syncSwitch.get(canalDestination);
    
                    logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                    connector.connect();
                    logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
                    //liqiangtodo 消费不到改动 订阅自己关心的
                    if(StringUtils.isEmpty(canalClientConfig.getSubscribe())){
                        connector.subscribe("*.*");
                    }else {
                        connector.subscribe(canalClientConfig.getSubscribe());
                    }
                    //connector.subscribe();
                    logger.info("=============> Subscribe destination: {},subscribe:{} succeed <=============",this.canalDestination,StringUtils.isEmpty(canalClientConfig.getSubscribe())?"*.*":canalClientConfig.getSubscribe());
    ......

    使用方式

    canal.conf:
      subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源码加的防止订阅不关心的消息

    基本类型空指针

    如Interger 数据库存的是null 

    com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil#typeConvert

     /**
         * 类型转换为Mapping中对应的类型
         */
        public static Object typeConvert(Object val, String esType) {
            if (val == null) {
                return null;
            }
            if (esType == null) {
                return val;
            }
            //liqiangtodo 增加代码
            //For input string: ""防止空转换异常指针异常
            if(StringUtil.isNullOrEmpty(val.toString())){
                return null;
            }
     ......
    }
  • 相关阅读:
    dfa最小化,终于完成了。
    nfa转dfa,正式完成
    正则转nfa:完成
    正则转nfa:bug消除
    myeclipse集成jad反编译步骤
    CSS声明 列表样式 显示方式 鼠标形状
    CSS声明2 定位
    CSS声明1
    CSS基础知识简介
    lol简介/html
  • 原文地址:https://www.cnblogs.com/LQBlog/p/14661238.html
Copyright © 2011-2022 走看看