ProjectConfig:
package com.ultiwill.utils; import com.ultiwill.entity.EsConfigEntity; import com.alibaba.fastjson.JSON; public class ProjectConfig { public static EsConfigEntity getESConf() { return (EsConfigEntity)JSON.parseObject(JSON.toJSONString(ConfigUtil.readYamlByPrefix("application.yaml", "es")), EsConfigEntity.class); } public static Object getCommonValue(String key) { return ConfigUtil.readYaml("application.yaml").get(key); } }
ConfigUtil:
package com.ultiwill.utils; import org.yaml.snakeyaml.Yaml; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.util.HashMap; import java.util.Map; public class ConfigUtil { private ConfigUtil() { } public static Map readYaml(String file) { if (!(new File(file)).exists()) { String projectDir = System.getProperty("user.dir"); file = projectDir + File.separator + "conf" + File.separator +file; } try { return (Map)(new Yaml()).loadAs(new FileInputStream(file), HashMap.class); } catch (FileNotFoundException var2) { var2.printStackTrace(); return null; } } public static Map<String, Object> readYamlByPrefix(String file, String prefix) { return (Map)readYaml(file).get(prefix); } }
EsConfigEntity:
package com.ultiwill.entity; public class EsConfigEntity { private String clusterName; private String clusterHttpPort; private String clusterTcpPort; private String clusterNodes; private Long splitLimitNum; private Integer primaryShard; private Integer replicateShard; private String indexTranslogInterval; private String env; public EsConfigEntity() { } public String getClusterName() { return this.clusterName; } public String getClusterHttpPort() { return this.clusterHttpPort; } public String getClusterTcpPort() { return this.clusterTcpPort; } public String getClusterNodes() { return this.clusterNodes; } public Long getSplitLimitNum() { return this.splitLimitNum; } public Integer getPrimaryShard() { return this.primaryShard; } public Integer getReplicateShard() { return this.replicateShard; } public String getIndexTranslogInterval() { return this.indexTranslogInterval; } public String getEnv() { return this.env; } public void setClusterName(String clusterName) { this.clusterName = clusterName; } public void setClusterHttpPort(String clusterHttpPort) { this.clusterHttpPort = clusterHttpPort; } public void setClusterTcpPort(String clusterTcpPort) { this.clusterTcpPort = clusterTcpPort; } public void setClusterNodes(String clusterNodes) { this.clusterNodes = clusterNodes; } public void setSplitLimitNum(Long splitLimitNum) { this.splitLimitNum = splitLimitNum; } public void setPrimaryShard(Integer primaryShard) { this.primaryShard = primaryShard; } public void setReplicateShard(Integer replicateShard) { this.replicateShard = replicateShard; } public void setIndexTranslogInterval(String indexTranslogInterval) { this.indexTranslogInterval = indexTranslogInterval; } public void setEnv(String env) { this.env = env; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof EsConfigEntity)) { return false; } else { EsConfigEntity other = (EsConfigEntity)o; if (!other.canEqual(this)) { return false; } else { label119: { Object this$clusterName = this.getClusterName(); Object other$clusterName = other.getClusterName(); if (this$clusterName == null) { if (other$clusterName == null) { break label119; } } else if (this$clusterName.equals(other$clusterName)) { break label119; } return false; } Object this$clusterHttpPort = this.getClusterHttpPort(); Object other$clusterHttpPort = other.getClusterHttpPort(); if (this$clusterHttpPort == null) { if (other$clusterHttpPort != null) { return false; } } else if (!this$clusterHttpPort.equals(other$clusterHttpPort)) { return false; } label105: { Object this$clusterTcpPort = this.getClusterTcpPort(); Object other$clusterTcpPort = other.getClusterTcpPort(); if (this$clusterTcpPort == null) { if (other$clusterTcpPort == null) { break label105; } } else if (this$clusterTcpPort.equals(other$clusterTcpPort)) { break label105; } return false; } Object this$clusterNodes = this.getClusterNodes(); Object other$clusterNodes = other.getClusterNodes(); if (this$clusterNodes == null) { if (other$clusterNodes != null) { return false; } } else if (!this$clusterNodes.equals(other$clusterNodes)) { return false; } label91: { Object this$splitLimitNum = this.getSplitLimitNum(); Object other$splitLimitNum = other.getSplitLimitNum(); if (this$splitLimitNum == null) { if (other$splitLimitNum == null) { break label91; } } else if (this$splitLimitNum.equals(other$splitLimitNum)) { break label91; } return false; } Object this$primaryShard = this.getPrimaryShard(); Object other$primaryShard = other.getPrimaryShard(); if (this$primaryShard == null) { if (other$primaryShard != null) { return false; } } else if (!this$primaryShard.equals(other$primaryShard)) { return false; } label77: { Object this$replicateShard = this.getReplicateShard(); Object other$replicateShard = other.getReplicateShard(); if (this$replicateShard == null) { if (other$replicateShard == null) { break label77; } } else if (this$replicateShard.equals(other$replicateShard)) { break label77; } return false; } label70: { Object this$indexTranslogInterval = this.getIndexTranslogInterval(); Object other$indexTranslogInterval = other.getIndexTranslogInterval(); if (this$indexTranslogInterval == null) { if (other$indexTranslogInterval == null) { break label70; } } else if (this$indexTranslogInterval.equals(other$indexTranslogInterval)) { break label70; } return false; } Object this$env = this.getEnv(); Object other$env = other.getEnv(); if (this$env == null) { if (other$env != null) { return false; } } else if (!this$env.equals(other$env)) { return false; } return true; } } } protected boolean canEqual(Object other) { return other instanceof EsConfigEntity; } public String toString() { return "EsConfigEntity(clusterName=" + this.getClusterName() + ", clusterHttpPort=" + this.getClusterHttpPort() + ", clusterTcpPort=" + this.getClusterTcpPort() + ", clusterNodes=" + this.getClusterNodes() + ", splitLimitNum=" + this.getSplitLimitNum() + ", primaryShard=" + this.getPrimaryShard() + ", replicateShard=" + this.getReplicateShard() + ", indexTranslogInterval=" + this.getIndexTranslogInterval() + ", env=" + this.getEnv() + ")"; } }
application.yaml:
es: clusterName: cloud-shield clusterHttpPort: 9200 clusterTcpPort: 9300 clusterNodes: 10.20.0.1,10.20.0.2,10.20.0.3,10.20.0.4,10.20.0.5,10.20.0.6,10.20.0.7,10.20.0.8,10.20.0.9,10.20.0.10,10.20.0.11,10.20.0.12,10.20.0.13,10.20.0.14,10.20.0.15,10.20.0.16,10.20.0.17,10.20.0.18,10.20.0.19 splitLimitNum: 400000000 primaryShard: 11 replicateShard: 0 indexTranslogInterval: 10s env: ultiwill esClient: "cluster.name": cloud-shield "client.transport.sniff": "false" "xpack.security.user": "aw:123456" "processors": "30"
ESClientUtil:
package com.ultiwill.utils; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; public class ESClientUtil implements Serializable { private static org.slf4j.Logger logger = LoggerFactory.getLogger(ESClientUtil.class); private static TransportClient esClusterClient; private static BulkProcessor bulkProcessor; /** *@DESC: 初始化ES客户端 * @return * @throws UnknownHostException */ private static TransportClient initClient() throws UnknownHostException { String key = ProjectConfig.getESConf().getClusterName(); Map<String,String> esClientSetting = (Map<String,String>) ProjectConfig.getCommonValue("esClient"); Settings settings = Settings.builder() //.put("cluster.name", key) //.put("client.transport.sniff", true) //.put("xpack.security.user", "aw:123456") //.put("xpack.security.user", "developer:elastic_developer") .put(esClientSetting) .build(); return new PreBuiltXPackTransportClient(settings).addTransportAddresses(assembleESAddress().toArray(new InetSocketTransportAddress[assembleESAddress().size()])); } /** * @DESC: 获取TransportClient * @return */ public static TransportClient getClient(){ System.setProperty("es.set.netty.runtime.available.processors", "false"); if(esClusterClient==null){ synchronized (ESClientUtil.class){ try{ if(esClusterClient==null){ esClusterClient = initClient(); esClusterClient.settings(); } }catch (Exception e){ logger.error("ESClient创建失败...." + esClusterClient,e); } } } return esClusterClient; } /** * @DESC: 组装ES的hosts为TransportAddress * */ private static List<TransportAddress> assembleESAddress() throws UnknownHostException { ArrayList<TransportAddress> esAddrList = new ArrayList<>(); String[] esAddrArray = ProjectConfig.getESConf().getClusterNodes().split(","); for (String es:esAddrArray){ esAddrList.add(new InetSocketTransportAddress(InetAddress.getByName(es),Integer.valueOf(ProjectConfig.getESConf().getClusterTcpPort()))); } return esAddrList; } /** *@DESC: 关闭client连接 */ public static void closeClient(){ if (esClusterClient != null){ synchronized (ESClientUtil.class){ try { esClusterClient.close(); logger.info("ES Client 关闭成功..."); } catch (Exception e) { logger.error("ES Client关闭失败...",e); } } } } /** * @Desc: 利用bulk API将消息批量写入ES,不考虑routing * */ public static void save2ES(TransportClient client, String index, String type, List<Map<String,Object>> messages,boolean ifSetId) throws Exception{ try { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (Map<String,Object> message:messages) { IndexRequestBuilder indexRequestBuilder1; if (ifSetId) { indexRequestBuilder1 = client.prepareIndex().setId(message.get("id").toString()).setOpType(IndexRequest.OpType.INDEX).setIndex(index).setType("doc"); } else { indexRequestBuilder1 = client.prepareIndex() .setOpType(IndexRequest.OpType.INDEX) //如果该doc不存在则insert,存在则update .setIndex(index).setType("doc"); } IndexRequestBuilder indexRequestBuilder2 = indexRequestBuilder1.setSource(message); bulkRequestBuilder.add(indexRequestBuilder2); } // logger.info("开始通过单线程模式加载数据到ES....."); BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (bulkResponse.hasFailures()){ logger.error("写入ES失败:" + bulkResponse.buildFailureMessage()); throw new Exception("写入ES失败"+bulkResponse.buildFailureMessage()); } }catch (Exception e){ logger.error("ES 写入ES失败...",e); throw new Exception("写入ES失败",e); } } /** * @Desc: 利用bulk API将消息批量写入ES的索引信息表 * */ public static void saveES4IndexInfo(TransportClient client, String index, List<Map<String,String>> messages){ double begin = System.currentTimeMillis(); try { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (Map<String,String> message:messages) { IndexRequestBuilder indexRequestBuilder1 = client.prepareIndex() .setId(message.get("id").toString()) .setOpType(IndexRequest.OpType.INDEX) //如果该doc不存在则insert,存在则update .setIndex(index).setType("doc"); message.remove("id"); //message.remove("protocoltype"); IndexRequestBuilder indexRequestBuilder2 = indexRequestBuilder1.setSource(message); bulkRequestBuilder.add(indexRequestBuilder2); } logger.info("开始通过单线程模式加载数据到ES....."); BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (bulkResponse.hasFailures()){ logger.error("写入ES失败:" + bulkResponse.buildFailureMessage()); } } finally { //ESClientUtil.closeClient(); } double end = System.currentTimeMillis(); logger.info("The total load time cost " + (end - begin)/1000 + " S...");//打印写入到ES所花的总时间 } /** * @Desc : 根据index从已经完成的index中获取最大时间和最小时间 * */ public static String getTimeRangeByIndex(TransportClient client,String index, String sortField){ String fromTime = ""; String endTime = ""; try { SearchResponse ascResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.matchAllQuery()) .setSize(1).setScroll(TimeValue.timeValueMinutes(8)) .addSort(sortField, SortOrder.ASC) .execute().actionGet(); SearchResponse descResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.matchAllQuery()) .setSize(1).setScroll(TimeValue.timeValueMinutes(8)) .addSort(sortField, SortOrder.DESC) .execute().actionGet(); SearchHit[] ascHits = ascResponse.getHits().getHits(); SearchHit[] descHits = descResponse.getHits().getHits(); for (SearchHit hit:ascHits){ fromTime = hit.getSourceAsMap().get(sortField).toString(); } for (SearchHit hit:descHits){ endTime = hit.getSourceAsMap().get(sortField).toString(); } } catch (Exception e) { logger.error("从当前索引中获取时间范围失败...",e); }finally { //ESClientUtil.closeClient(); } return fromTime + "-" + endTime; } /** * @DESC: 从index_info中获取当前正在进行索引的others协议的索引别名,用来对拼接数据入ES * */ public static String getRunningOthersAlias(TransportClient client){ try { SearchResponse searchResponse = client.prepareSearch("index_info").setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.matchAllQuery()) .setSize(1000).setScroll(TimeValue.timeValueMinutes(8)) .execute().actionGet(); ArrayList<String> aliasList = new ArrayList<>(); SearchHit[] resultHits = searchResponse.getHits().getHits(); for (SearchHit hit:resultHits){ if(hit.getType().startsWith("others_") && hit.getType().endsWith("9999999999999")){ aliasList.add(hit.getType()); } } if (aliasList.size() > 1) { long maxNum = 0L; for (String alias:aliasList){ long eachNum = Long.parseLong(alias.split("_")[1].split("-")[0]); if (maxNum < eachNum){ maxNum = eachNum; } } return "others_" + maxNum + "-9999999999999"; } else{ return aliasList.get(0); } } finally { //ESClientUtil.closeClient(); } } /** * @DESC: 通过index以及某个field来判断该doc是否存在 * */ public static int ifFieldExist(TransportClient client,String index,String field,String fieldValue){ int length = 0; try { SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT) //.setQuery(QueryBuilders.termQuery("_id", id)) .setQuery(QueryBuilders.termQuery(field, fieldValue)) .execute().actionGet(); length = searchResponse.getHits().getHits().length; } catch (Exception e) { logger.error("查找字段是否存在报错..." + field,e); } finally { } return length; } /** * @DESC: 通过index以及某个field来判断该doc是否存在 * */ public static HashSet<Object> ifFieldExistV2(TransportClient client, String index, String field, List<String> fieldValues){ HashSet<Object> hashSet = new HashSet<>(); try { int interval = 1000; int size = fieldValues.size(); int num = size/interval + 1; //es批量查询支持有限字段值,因而分批次查询 for(int i=0;i < num;i++){ int from = i*interval; int to = (i+1)*interval; if(to>size) to = size; SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.termsQuery(field, fieldValues.subList(from,to))) .setFetchSource(field,null) .execute().actionGet(); SearchHit[] hits = searchResponse.getHits().getHits(); for(SearchHit hit : hits){ hashSet.add(hit.getSourceAsMap().get(field)); } } } catch (Exception e) { logger.error("查找字段是否存在报错..." + field,e); } return hashSet; } /** * @DESC: 通过id获取ES对应的doc内容 * */ public static Map<String, Object> getRecordById(TransportClient client, String index, String id){ Map<String, Object> rst = new HashMap<>(); SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.termQuery("_id", id)) .execute().actionGet(); SearchHit[] resultHits = searchResponse.getHits().getHits(); for(SearchHit hit : resultHits){ rst = hit.getSourceAsMap(); } return rst; } /** * 获取单例全局 BulkProcessor * * */ public static BulkProcessor getBulkProcessor(){ if(bulkProcessor==null){ synchronized (ESClientUtil.class){ try{ if(bulkProcessor==null){ bulkProcessor = bulkProcessor(getClient(),"",4); } }catch (Exception e){ logger.error("BulkProcessor创建失败...." + bulkProcessor,e); } } } return bulkProcessor; } private static BulkProcessor bulkProcessor(TransportClient client,String taskName,int dbNum){ return BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { try { if(response.hasFailures()){ String failureMessage = response.buildFailureMessage(); String[] info = failureMessage.split(" "); if(info.length>2){ String value=info[0]+":"+info[1]; logger.info("写入es情况: partitionId:"+ 22 +" ,executionId: "+ executionId+" ,数量: " +request.numberOfActions()+" ,写入存在失败:"+value); //任务名@IP地址@executionId@当前时间@当前发送量@成功OR失败 }else{ logger.info("写入es情况: partitionId:"+ 22 +" ,executionId: "+ executionId+" ,数量: " +request.numberOfActions()+" ,写入存在失败:"); } }else{ logger.info("写入es情况: partitionId:"+ 22 +" ,executionId: "+ executionId+" ,数量: " +request.numberOfActions()+" ,写入成功"); } }catch (Exception e){ logger.error("BulkProcessor信息写入Redis异常",e); }finally { } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { failure.printStackTrace(); logger.info("写入es失败: partitionId:"+ 44 +" ,executionId: "+ executionId+" ,数量: " +request.numberOfActions()+" ,错误: "+failure); } }) .setBulkActions(15000) .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(120)) .setConcurrentRequests(5) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); } }