zoukankan      html  css  js  c++  java
  • elasticsearch5.6.8 创建TransportClient工具类

    ProjectConfig:

    package com.ultiwill.utils;
    
    import com.ultiwill.entity.EsConfigEntity;
    import com.alibaba.fastjson.JSON;
    
    public class ProjectConfig {
    
    
        public static EsConfigEntity getESConf() {
            return (EsConfigEntity)JSON.parseObject(JSON.toJSONString(ConfigUtil.readYamlByPrefix("application.yaml", "es")), EsConfigEntity.class);
        }
    
        public static Object getCommonValue(String key) {
            return ConfigUtil.readYaml("application.yaml").get(key);
        }
    
    }
    View Code

    ConfigUtil:

    package com.ultiwill.utils;
    
    import org.yaml.snakeyaml.Yaml;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class ConfigUtil {
    
        private ConfigUtil() {
        }
    
        public static Map readYaml(String file) {
            if (!(new File(file)).exists()) {
                String projectDir = System.getProperty("user.dir");
                file = projectDir + File.separator + "conf" + File.separator +file;
            }
    
            try {
                return (Map)(new Yaml()).loadAs(new FileInputStream(file), HashMap.class);
            } catch (FileNotFoundException var2) {
                var2.printStackTrace();
                return null;
            }
        }
    
    
        public static Map<String, Object> readYamlByPrefix(String file, String prefix) {
            return (Map)readYaml(file).get(prefix);
        }
    
    }
    View Code

    EsConfigEntity:

    package com.ultiwill.entity;
    
    public class EsConfigEntity {
        private String clusterName;
        private String clusterHttpPort;
        private String clusterTcpPort;
        private String clusterNodes;
        private Long splitLimitNum;
        private Integer primaryShard;
        private Integer replicateShard;
        private String indexTranslogInterval;
        private String env;
    
        public EsConfigEntity() {
        }
    
        public String getClusterName() {
            return this.clusterName;
        }
    
        public String getClusterHttpPort() {
            return this.clusterHttpPort;
        }
    
        public String getClusterTcpPort() {
            return this.clusterTcpPort;
        }
    
        public String getClusterNodes() {
            return this.clusterNodes;
        }
    
        public Long getSplitLimitNum() {
            return this.splitLimitNum;
        }
    
        public Integer getPrimaryShard() {
            return this.primaryShard;
        }
    
        public Integer getReplicateShard() {
            return this.replicateShard;
        }
    
        public String getIndexTranslogInterval() {
            return this.indexTranslogInterval;
        }
    
        public String getEnv() {
            return this.env;
        }
    
        public void setClusterName(String clusterName) {
            this.clusterName = clusterName;
        }
    
        public void setClusterHttpPort(String clusterHttpPort) {
            this.clusterHttpPort = clusterHttpPort;
        }
    
        public void setClusterTcpPort(String clusterTcpPort) {
            this.clusterTcpPort = clusterTcpPort;
        }
    
        public void setClusterNodes(String clusterNodes) {
            this.clusterNodes = clusterNodes;
        }
    
        public void setSplitLimitNum(Long splitLimitNum) {
            this.splitLimitNum = splitLimitNum;
        }
    
        public void setPrimaryShard(Integer primaryShard) {
            this.primaryShard = primaryShard;
        }
    
        public void setReplicateShard(Integer replicateShard) {
            this.replicateShard = replicateShard;
        }
    
        public void setIndexTranslogInterval(String indexTranslogInterval) {
            this.indexTranslogInterval = indexTranslogInterval;
        }
    
        public void setEnv(String env) {
            this.env = env;
        }
    
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            } else if (!(o instanceof EsConfigEntity)) {
                return false;
            } else {
                EsConfigEntity other = (EsConfigEntity)o;
                if (!other.canEqual(this)) {
                    return false;
                } else {
                    label119: {
                        Object this$clusterName = this.getClusterName();
                        Object other$clusterName = other.getClusterName();
                        if (this$clusterName == null) {
                            if (other$clusterName == null) {
                                break label119;
                            }
                        } else if (this$clusterName.equals(other$clusterName)) {
                            break label119;
                        }
    
                        return false;
                    }
    
                    Object this$clusterHttpPort = this.getClusterHttpPort();
                    Object other$clusterHttpPort = other.getClusterHttpPort();
                    if (this$clusterHttpPort == null) {
                        if (other$clusterHttpPort != null) {
                            return false;
                        }
                    } else if (!this$clusterHttpPort.equals(other$clusterHttpPort)) {
                        return false;
                    }
    
                    label105: {
                        Object this$clusterTcpPort = this.getClusterTcpPort();
                        Object other$clusterTcpPort = other.getClusterTcpPort();
                        if (this$clusterTcpPort == null) {
                            if (other$clusterTcpPort == null) {
                                break label105;
                            }
                        } else if (this$clusterTcpPort.equals(other$clusterTcpPort)) {
                            break label105;
                        }
    
                        return false;
                    }
    
                    Object this$clusterNodes = this.getClusterNodes();
                    Object other$clusterNodes = other.getClusterNodes();
                    if (this$clusterNodes == null) {
                        if (other$clusterNodes != null) {
                            return false;
                        }
                    } else if (!this$clusterNodes.equals(other$clusterNodes)) {
                        return false;
                    }
    
                    label91: {
                        Object this$splitLimitNum = this.getSplitLimitNum();
                        Object other$splitLimitNum = other.getSplitLimitNum();
                        if (this$splitLimitNum == null) {
                            if (other$splitLimitNum == null) {
                                break label91;
                            }
                        } else if (this$splitLimitNum.equals(other$splitLimitNum)) {
                            break label91;
                        }
    
                        return false;
                    }
    
                    Object this$primaryShard = this.getPrimaryShard();
                    Object other$primaryShard = other.getPrimaryShard();
                    if (this$primaryShard == null) {
                        if (other$primaryShard != null) {
                            return false;
                        }
                    } else if (!this$primaryShard.equals(other$primaryShard)) {
                        return false;
                    }
    
                    label77: {
                        Object this$replicateShard = this.getReplicateShard();
                        Object other$replicateShard = other.getReplicateShard();
                        if (this$replicateShard == null) {
                            if (other$replicateShard == null) {
                                break label77;
                            }
                        } else if (this$replicateShard.equals(other$replicateShard)) {
                            break label77;
                        }
    
                        return false;
                    }
    
                    label70: {
                        Object this$indexTranslogInterval = this.getIndexTranslogInterval();
                        Object other$indexTranslogInterval = other.getIndexTranslogInterval();
                        if (this$indexTranslogInterval == null) {
                            if (other$indexTranslogInterval == null) {
                                break label70;
                            }
                        } else if (this$indexTranslogInterval.equals(other$indexTranslogInterval)) {
                            break label70;
                        }
    
                        return false;
                    }
    
                    Object this$env = this.getEnv();
                    Object other$env = other.getEnv();
                    if (this$env == null) {
                        if (other$env != null) {
                            return false;
                        }
                    } else if (!this$env.equals(other$env)) {
                        return false;
                    }
    
                    return true;
                }
            }
        }
    
        protected boolean canEqual(Object other) {
            return other instanceof EsConfigEntity;
        }
    
    
    
        public String toString() {
            return "EsConfigEntity(clusterName=" + this.getClusterName() + ", clusterHttpPort=" + this.getClusterHttpPort() + ", clusterTcpPort=" + this.getClusterTcpPort() + ", clusterNodes=" + this.getClusterNodes() + ", splitLimitNum=" + this.getSplitLimitNum() + ", primaryShard=" + this.getPrimaryShard() + ", replicateShard=" + this.getReplicateShard() + ", indexTranslogInterval=" + this.getIndexTranslogInterval() + ", env=" + this.getEnv() + ")";
        }
    }
    View Code

    application.yaml:

    es:
      clusterName: cloud-shield
      clusterHttpPort: 9200
      clusterTcpPort: 9300
      clusterNodes: 10.20.0.1,10.20.0.2,10.20.0.3,10.20.0.4,10.20.0.5,10.20.0.6,10.20.0.7,10.20.0.8,10.20.0.9,10.20.0.10,10.20.0.11,10.20.0.12,10.20.0.13,10.20.0.14,10.20.0.15,10.20.0.16,10.20.0.17,10.20.0.18,10.20.0.19
      splitLimitNum: 400000000
      primaryShard: 11
      replicateShard: 0
      indexTranslogInterval: 10s
      env: ultiwill
    
    esClient:
      "cluster.name": cloud-shield
      "client.transport.sniff": "false"
      "xpack.security.user": "aw:123456"
      "processors": "30"


    ESClientUtil:

    package com.ultiwill.utils;
    
    import org.elasticsearch.action.bulk.*;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexRequestBuilder;
    import org.elasticsearch.action.search.*;
    import org.elasticsearch.action.support.WriteRequest;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.sort.SortOrder;
    import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
    import org.slf4j.LoggerFactory;
    import java.io.Serializable;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.*;
    
    public class ESClientUtil implements Serializable {
        private static org.slf4j.Logger logger = LoggerFactory.getLogger(ESClientUtil.class);
        private static TransportClient esClusterClient;
        private static BulkProcessor bulkProcessor;
    
        /**
         *@DESC: 初始化ES客户端
         * @return
         * @throws UnknownHostException
         */
        private static TransportClient initClient() throws UnknownHostException {
            String key = ProjectConfig.getESConf().getClusterName();
            Map<String,String> esClientSetting = (Map<String,String>) ProjectConfig.getCommonValue("esClient");
            Settings settings = Settings.builder()
                    //.put("cluster.name", key)
                    //.put("client.transport.sniff", true)
                    //.put("xpack.security.user", "aw:123456")
                    //.put("xpack.security.user", "developer:elastic_developer")
                    .put(esClientSetting)
                    .build();
            return new PreBuiltXPackTransportClient(settings).addTransportAddresses(assembleESAddress().toArray(new InetSocketTransportAddress[assembleESAddress().size()]));
        }
    
        /**
         * @DESC: 获取TransportClient
         * @return
         */
        public static TransportClient getClient(){
            System.setProperty("es.set.netty.runtime.available.processors", "false");
            if(esClusterClient==null){
                synchronized (ESClientUtil.class){
                    try{
                        if(esClusterClient==null){
                            esClusterClient = initClient();
                            esClusterClient.settings();
                        }
                    }catch (Exception e){
                        logger.error("ESClient创建失败...." + esClusterClient,e);
                    }
                }
            }
            return esClusterClient;
        }
    
        /**
         * @DESC: 组装ES的hosts为TransportAddress
         * */
        private static List<TransportAddress> assembleESAddress() throws UnknownHostException {
            ArrayList<TransportAddress> esAddrList = new ArrayList<>();
            String[] esAddrArray = ProjectConfig.getESConf().getClusterNodes().split(",");
            for (String es:esAddrArray){
                esAddrList.add(new InetSocketTransportAddress(InetAddress.getByName(es),Integer.valueOf(ProjectConfig.getESConf().getClusterTcpPort())));
            }
            return esAddrList;
        }
    
        /**
         *@DESC: 关闭client连接
         */
        public static void closeClient(){
            if (esClusterClient != null){
                synchronized (ESClientUtil.class){
                    try {
                        esClusterClient.close();
                        logger.info("ES Client 关闭成功...");
                    } catch (Exception e) {
                        logger.error("ES Client关闭失败...",e);
                    }
                }
            }
        }
    
        /**
         * @Desc: 利用bulk API将消息批量写入ES,不考虑routing
         * */
        public static void save2ES(TransportClient client, String index, String type, List<Map<String,Object>> messages,boolean ifSetId) throws Exception{
            try {
                BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                for (Map<String,Object> message:messages) {
                    IndexRequestBuilder indexRequestBuilder1;
                    if (ifSetId) {
                        indexRequestBuilder1 = client.prepareIndex().setId(message.get("id").toString()).setOpType(IndexRequest.OpType.INDEX).setIndex(index).setType("doc");
                    } else {
                        indexRequestBuilder1 = client.prepareIndex()
                                .setOpType(IndexRequest.OpType.INDEX) //如果该doc不存在则insert,存在则update
                                .setIndex(index).setType("doc");
                    }
                    IndexRequestBuilder indexRequestBuilder2 = indexRequestBuilder1.setSource(message);
                    bulkRequestBuilder.add(indexRequestBuilder2);
                }
    //            logger.info("开始通过单线程模式加载数据到ES.....");
                BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
                if (bulkResponse.hasFailures()){
                    logger.error("写入ES失败:" + bulkResponse.buildFailureMessage());
                    throw new Exception("写入ES失败"+bulkResponse.buildFailureMessage());
                }
            }catch (Exception e){
                logger.error("ES 写入ES失败...",e);
                throw new Exception("写入ES失败",e);
            }
    
        }
    
        /**
         * @Desc: 利用bulk API将消息批量写入ES的索引信息表
         * */
        public static void saveES4IndexInfo(TransportClient client, String index, List<Map<String,String>> messages){
            double begin = System.currentTimeMillis();
            try {
                BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                for (Map<String,String> message:messages) {
                    IndexRequestBuilder indexRequestBuilder1 = client.prepareIndex()
                            .setId(message.get("id").toString())
                            .setOpType(IndexRequest.OpType.INDEX) //如果该doc不存在则insert,存在则update
                            .setIndex(index).setType("doc");
                    message.remove("id");
                    //message.remove("protocoltype");
                    IndexRequestBuilder indexRequestBuilder2 = indexRequestBuilder1.setSource(message);
                    bulkRequestBuilder.add(indexRequestBuilder2);
                }
                logger.info("开始通过单线程模式加载数据到ES.....");
                BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
                if (bulkResponse.hasFailures()){
                    logger.error("写入ES失败:" + bulkResponse.buildFailureMessage());
                }
            } finally {
                //ESClientUtil.closeClient();
            }
            double end = System.currentTimeMillis();
            logger.info("The total load time cost " + (end - begin)/1000 + " S...");//打印写入到ES所花的总时间
        }
    
        /**
         * @Desc : 根据index从已经完成的index中获取最大时间和最小时间
         * */
        public static String getTimeRangeByIndex(TransportClient client,String index, String sortField){
            String fromTime = "";
            String endTime = "";
            try {
                SearchResponse ascResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                        .setQuery(QueryBuilders.matchAllQuery())
                        .setSize(1).setScroll(TimeValue.timeValueMinutes(8))
                        .addSort(sortField, SortOrder.ASC)
                        .execute().actionGet();
                SearchResponse descResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                        .setQuery(QueryBuilders.matchAllQuery())
                        .setSize(1).setScroll(TimeValue.timeValueMinutes(8))
                        .addSort(sortField, SortOrder.DESC)
                        .execute().actionGet();
                SearchHit[] ascHits = ascResponse.getHits().getHits();
                SearchHit[] descHits = descResponse.getHits().getHits();
                for (SearchHit hit:ascHits){
                    fromTime = hit.getSourceAsMap().get(sortField).toString();
                }
                for (SearchHit hit:descHits){
                    endTime = hit.getSourceAsMap().get(sortField).toString();
                }
            } catch (Exception e) {
                logger.error("从当前索引中获取时间范围失败...",e);
            }finally {
                //ESClientUtil.closeClient();
            }
            return fromTime + "-" + endTime;
        }
        
        /**
         * @DESC: 从index_info中获取当前正在进行索引的others协议的索引别名,用来对拼接数据入ES
         * */
        public static String getRunningOthersAlias(TransportClient client){
            try {
                SearchResponse searchResponse = client.prepareSearch("index_info").setSearchType(SearchType.DEFAULT)
                        .setQuery(QueryBuilders.matchAllQuery())
                        .setSize(1000).setScroll(TimeValue.timeValueMinutes(8))
                        .execute().actionGet();
                ArrayList<String> aliasList = new ArrayList<>();
                SearchHit[] resultHits = searchResponse.getHits().getHits();
                for (SearchHit hit:resultHits){
    
                    if(hit.getType().startsWith("others_") && hit.getType().endsWith("9999999999999")){
                        aliasList.add(hit.getType());
                    }
                }
                if (aliasList.size() > 1) {
                    long maxNum = 0L;
                    for (String alias:aliasList){
                        long eachNum = Long.parseLong(alias.split("_")[1].split("-")[0]);
                        if (maxNum < eachNum){
                            maxNum = eachNum;
                        }
                    }
                    return "others_" + maxNum + "-9999999999999";
                }
                else{
                    return aliasList.get(0);
                }
            } finally {
                //ESClientUtil.closeClient();
            }
        }
    
        /**
         * @DESC: 通过index以及某个field来判断该doc是否存在
         * */
        public static int ifFieldExist(TransportClient client,String index,String field,String fieldValue){
            int length = 0;
            try {
                SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                        //.setQuery(QueryBuilders.termQuery("_id", id))
                        .setQuery(QueryBuilders.termQuery(field, fieldValue))
                        .execute().actionGet();
                length = searchResponse.getHits().getHits().length;
            } catch (Exception e) {
                logger.error("查找字段是否存在报错..." + field,e);
            } finally {
            }
            return length;
        }
    
        /**
         * @DESC: 通过index以及某个field来判断该doc是否存在
         * */
        public static HashSet<Object> ifFieldExistV2(TransportClient client, String index, String field, List<String> fieldValues){
            HashSet<Object> hashSet = new HashSet<>();
            try {
                int interval = 1000;
                int size = fieldValues.size();
                int  num = size/interval + 1;
                //es批量查询支持有限字段值,因而分批次查询
                for(int i=0;i < num;i++){
                    int from = i*interval;
                    int to = (i+1)*interval;
                    if(to>size) to = size;
                    SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                            .setQuery(QueryBuilders.termsQuery(field, fieldValues.subList(from,to)))
                            .setFetchSource(field,null)
                            .execute().actionGet();
                    SearchHit[] hits = searchResponse.getHits().getHits();
                    for(SearchHit hit : hits){
                        hashSet.add(hit.getSourceAsMap().get(field));
                    }
                }
            } catch (Exception e) {
                logger.error("查找字段是否存在报错..." + field,e);
            }
            return hashSet;
        }
    
    
        /**
         * @DESC: 通过id获取ES对应的doc内容
         * */
        public static Map<String, Object> getRecordById(TransportClient client, String index, String id){
            Map<String, Object> rst = new HashMap<>();
            SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                    .setQuery(QueryBuilders.termQuery("_id", id))
                    .execute().actionGet();
            SearchHit[] resultHits = searchResponse.getHits().getHits();
            for(SearchHit hit : resultHits){
                rst = hit.getSourceAsMap();
    
            }
            return rst;
        }
    
        /**
         * 获取单例全局 BulkProcessor
         *
         * */
        public static BulkProcessor getBulkProcessor(){
            if(bulkProcessor==null){
                synchronized (ESClientUtil.class){
                    try{
                        if(bulkProcessor==null){
                            bulkProcessor = bulkProcessor(getClient(),"",4);
                        }
                    }catch (Exception e){
                        logger.error("BulkProcessor创建失败...." + bulkProcessor,e);
                    }
                }
            }
            return bulkProcessor;
        }
    
        private  static BulkProcessor bulkProcessor(TransportClient client,String taskName,int dbNum){
            return    BulkProcessor.builder(
                    client,
                    new BulkProcessor.Listener() {
                        @Override
                        public void beforeBulk(long executionId,
                                               BulkRequest request) {
                        }
    
                        @Override
                        public void afterBulk(long executionId,
                                              BulkRequest request,
                                              BulkResponse response) {
                            try {
                                if(response.hasFailures()){
                                    String failureMessage = response.buildFailureMessage();
                                    String[] info = failureMessage.split("
    ");
                                    if(info.length>2){
                                        String value=info[0]+":"+info[1];
                                        logger.info("写入es情况:  partitionId:"+ 22 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"  ,写入存在失败:"+value);
                                        //任务名@IP地址@executionId@当前时间@当前发送量@成功OR失败
                                    }else{
                                        logger.info("写入es情况:  partitionId:"+ 22 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"  ,写入存在失败:");
                                    }
                                }else{
                                    logger.info("写入es情况:  partitionId:"+ 22 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"  ,写入成功");
                                }
                            }catch (Exception e){
                                logger.error("BulkProcessor信息写入Redis异常",e);
                            }finally {
                            }
    
                        }
    
                        @Override
                        public void afterBulk(long executionId,
                                              BulkRequest request,
                                              Throwable failure) {
                            failure.printStackTrace();
                            logger.info("写入es失败:  partitionId:"+ 44 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"   ,错误: "+failure);
                        }
                    })
                    .setBulkActions(15000)
                    .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                    .setFlushInterval(TimeValue.timeValueSeconds(120))
                    .setConcurrentRequests(5)
                    .setBackoffPolicy(
                            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
        }
    
    
    }
    View Code
  • 相关阅读:
    mysql"ON DUPLICATE KEY UPDATE"的用法
    shell 数组用法
    linux命令行提示符显示太长怎么办?
    热备份、温备份、冷备份(Hot/Warm/Cold Backup)
    Domain key在反垃圾邮件中的应用
    计算机的存储单位
    IIS W3C日志记录字段和HTTP状态代码的说明
    noarch
    日志传送
    Remote Desktop Issues
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13229448.html
Copyright © 2011-2022 走看看