zoukankan      html  css  js  c++  java
  • ElasticSearch工具类

      1 import com.alibaba.fastjson.JSON;
      2 import lombok.extern.slf4j.Slf4j;
      3 import org.apache.commons.collections4.CollectionUtils;
      4 import org.apache.commons.collections4.MapUtils;
      5 import org.apache.commons.lang.ArrayUtils;
      6 import org.apache.commons.lang.StringUtils;
      7 import org.apache.poi.ss.formula.functions.T;
      8 import org.elasticsearch.action.bulk.BulkRequest;
      9 import org.elasticsearch.action.bulk.BulkResponse;
     10 import org.elasticsearch.action.get.GetRequest;
     11 import org.elasticsearch.action.get.GetResponse;
     12 import org.elasticsearch.action.index.IndexRequest;
     13 import org.elasticsearch.action.search.SearchRequest;
     14 import org.elasticsearch.action.search.SearchResponse;
     15 import org.elasticsearch.action.search.SearchScrollRequest;
     16 import org.elasticsearch.action.update.UpdateRequest;
     17 import org.elasticsearch.action.update.UpdateResponse;
     18 import org.elasticsearch.client.RequestOptions;
     19 import org.elasticsearch.client.RestHighLevelClient;
     20 import org.elasticsearch.client.indices.GetIndexRequest;
     21 import org.elasticsearch.common.unit.TimeValue;
     22 import org.elasticsearch.common.xcontent.XContentType;
     23 import org.elasticsearch.index.query.BoolQueryBuilder;
     24 import org.elasticsearch.index.query.QueryBuilders;
     25 import org.elasticsearch.search.Scroll;
     26 import org.elasticsearch.search.SearchHit;
     27 import org.elasticsearch.search.aggregations.Aggregation;
     28 import org.elasticsearch.search.aggregations.AggregationBuilder;
     29 import org.elasticsearch.search.aggregations.AggregationBuilders;
     30 import org.elasticsearch.search.aggregations.BucketOrder;
     31 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
     32 import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
     33 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
     34 import org.elasticsearch.search.aggregations.metrics.Cardinality;
     35 import org.elasticsearch.search.builder.SearchSourceBuilder;
     36 import org.springframework.beans.factory.annotation.Autowired;
     37 import org.springframework.beans.factory.annotation.Qualifier;
     38 import org.springframework.stereotype.Component;
     39 import org.trimps.ticp.fuzhou.telecom.vo.pagination.BasePageCondition;
     40 
     41 import java.io.IOException;
     42 import java.util.ArrayList;
     43 import java.util.HashMap;
     44 import java.util.List;
     45 import java.util.Map;
     46 
     47 @Component
     48 @Slf4j
     49 public class ESUtils {
     50 
     51     @Autowired
     52     @Qualifier("highLevelClient")
     53     RestHighLevelClient client;
     54 
     55     public long countLogs(String esIndex, SearchSourceBuilder builder) {
     56         SearchRequest searchRequest = new SearchRequest();
     57         searchRequest.indices(esIndex);
     58         searchRequest.source(builder);
     59         try {
     60             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
     61             return result.getHits().getTotalHits().value;
     62         } catch (Exception e) {
     63             log.error("error query data", e);
     64             return 0;
     65         }
     66     }
     67 
     68     /**
     69      * 根据id精确查询
     70      *
     71      * @param esIndex
     72      * @param id
     73      * @return
     74      */
     75     public Map<String, Object> getOne(String esIndex, String id) {
     76         GetRequest getRequest = new GetRequest(esIndex, id);
     77         try {
     78             GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
     79             if (getResponse.getSource() == null) {
     80                 return null;
     81             }
     82             return getResponse.getSourceAsMap();
     83         } catch (Exception e) {
     84             log.warn("getOne from ES Exception : ", e);
     85             return null;
     86         }
     87     }
     88 
     89     public SearchHit[] queryDocsByIds(String esIndex, List<String> ids) {
     90         SearchSourceBuilder builder = new SearchSourceBuilder();
     91         SearchRequest searchRequest = new SearchRequest();
     92         builder.query(QueryBuilders.termsQuery("_id", ids));
     93         searchRequest.indices(esIndex);
     94         searchRequest.source(builder);
     95         try {
     96             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
     97             return result.getHits().getHits();
     98         } catch (Exception e) {
     99             log.error("Batch query docs by ids failed!", e);
    100         }
    101         return null;
    102     }
    103 
    104     public void insertOrUpdateOne(String index, String id, String data) {
    105         IndexRequest request = new IndexRequest(index);
    106         request.id(id);
    107         request.source(data, XContentType.JSON);
    108         try {
    109             client.index(request, RequestOptions.DEFAULT);
    110         } catch (Exception e) {
    111             log.error("error save info", e);
    112         }
    113     }
    114 
    115     /**
    116      * 修改文档-只需要给要修改的字段
    117      */
    118     public void updateOne(String index, String id, Map<String, Object> map) {
    119         UpdateRequest updateRequest = new UpdateRequest(index, id);
    120         updateRequest.doc(map);
    121         try {
    122             UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
    123             log.info("updateOne result:{}", JSON.toJSONString(updateResponse));
    124         } catch (Exception e) {
    125             log.warn("updateOne to ES Exception : ", e);
    126             throw new RuntimeException(e);
    127         }
    128     }
    129 
    130     /**
    131      * 批量插入文档
    132      *
    133      * @param index     ES索引
    134      * @param documents 待提交的批量文档
    135      * @param uuidKey   文档中ID字段对应的key值
    136      */
    137     public BulkResponse insertDocumentsAsBatch(String index, List<Map<String, Object>> documents, String uuidKey) {
    138         BulkResponse response = null;
    139         if (StringUtils.isBlank(index) || CollectionUtils.isEmpty(documents)) {
    140             log.warn("Es index is blank or documents is empty.");
    141             return response;
    142         }
    143 
    144         try {
    145             int size = documents.size();
    146             BulkRequest bulkRequest = new BulkRequest();
    147             for (int i = 0; i < size; i++) {
    148                 Map<String, Object> document = documents.get(i);
    149                 if (MapUtils.isEmpty(document) || !document.containsKey(uuidKey)) {
    150                     continue;
    151                 }
    152                 bulkRequest.add(new IndexRequest(index).opType("create").id(document.get(uuidKey).toString()).source(document));
    153             }
    154             response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    155         } catch (Exception e) {
    156             log.error("Insert documents to es as batch failed!", e);
    157         }
    158         return response;
    159     }
    160 
    161 
    162     /**
    163      * 批量更新文档
    164      *
    165      * @param index     ES索引
    166      * @param documents 待提交的批量文档
    167      * @param uuidKey   文档中ID字段对应的key值
    168      */
    169     public BulkResponse updateDocumentsAsBatch(String index, List<Map<String, Object>> documents, String uuidKey) {
    170         BulkResponse response = null;
    171         if (StringUtils.isBlank(index) || CollectionUtils.isEmpty(documents)) {
    172             log.warn("Es index is blank or documents is empty.");
    173             return response;
    174         }
    175 
    176         try {
    177             int size = documents.size();
    178             BulkRequest bulkRequest = new BulkRequest();
    179             for (int i = 0; i < size; i++) {
    180                 Map<String, Object> document = documents.get(i);
    181                 if (MapUtils.isEmpty(document) || !document.containsKey(uuidKey)) {
    182                     continue;
    183                 }
    184                 bulkRequest.add(new UpdateRequest(index, document.get(uuidKey).toString()).doc(document));
    185             }
    186             response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    187         } catch (Exception e) {
    188             log.error("Update documents to es as batch failed!", e);
    189         }
    190         return response;
    191     }
    192 
    193     public SearchResponse queryData2(SearchSourceBuilder sourceBuilder, String... esIndex) {
    194         SearchRequest searchRequest = new SearchRequest();
    195         searchRequest.indices(esIndex);
    196         searchRequest.source(sourceBuilder);
    197         try {
    198             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
    199             return result;
    200         } catch (Exception e) {
    201             log.error("error query info", e);
    202         }
    203         return null;
    204     }
    205 
    206     /**
    207      * 判断索引名是否存在
    208      *
    209      * @param indexName
    210      * @param
    211      * @return
    212      */
    213     public boolean isExistsIndex(String indexName) {
    214         GetIndexRequest request = new GetIndexRequest(indexName);
    215         try {
    216             boolean response = client.indices().exists(request, RequestOptions.DEFAULT);
    217             return response;
    218         } catch (IOException e) {
    219             log.error("error", e);
    220             return false;
    221         }
    222 
    223     }
    224 
    225     public long counts4Index(String esIndex) {
    226         SearchRequest searchRequest = new SearchRequest();
    227         searchRequest.indices(esIndex);
    228         try {
    229             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
    230             return result.getHits().getTotalHits().value;
    231         } catch (Exception e) {
    232             log.error("error query data", e);
    233             return 0;
    234         }
    235     }
    236 
    237     public long countDistinctField(String esIndex, String countField, SearchSourceBuilder sourceBuilder) {
    238         long count = 0;
    239         if (StringUtils.isBlank(esIndex) || StringUtils.isBlank(countField)) {
    240             return count;
    241         }
    242 
    243         SearchRequest searchRequest = new SearchRequest();
    244         searchRequest.indices(esIndex);
    String identifier = UUID.randomUUID().toString();
    245 AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality(identifier).field(countField); 246 sourceBuilder.aggregation(aggregationBuilder); 247 sourceBuilder.size(0); 248 searchRequest.source(sourceBuilder); 249 try { 250 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 251 Histogram histogram = (Histogram) result.getAggregations().asMap().get(countField); 252 long total_value = 0; 253 for (Histogram.Bucket t : histogram.getBuckets()) { 254 Cardinality cardinality = t.getAggregations().get(identifier); 255 long value = cardinality.getValue(); 256 total_value = total_value + value; 257 } 258 return total_value; 259 } catch (Exception e) { 260 log.error("Count field failed!", e); 261 } 262 return 0; 263 } 264 265 public long countDistinctField(String esIndex, String countField) { 266 long count = 0; 267 if (StringUtils.isBlank(esIndex) || StringUtils.isBlank(countField)) { 268 return count; 269 } 270 271 SearchRequest searchRequest = new SearchRequest(); 272 searchRequest.indices(esIndex); 273 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 274 AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality("field_count").field(countField); 275 sourceBuilder.aggregation(aggregationBuilder); 276 sourceBuilder.size(0); 277 searchRequest.source(sourceBuilder); 278 try { 279 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 280 Histogram histogram = (Histogram) result.getAggregations().asMap().get(countField); 281 long total_value = 0; 282 for (Histogram.Bucket t : histogram.getBuckets()) { 283 Cardinality cardinality = t.getAggregations().get("field_count"); 284 long value = cardinality.getValue(); 285 total_value = total_value + value; 286 } 287 return total_value; 288 } catch (Exception e) { 289 log.error("Count field failed!", e); 290 } 291 return 0; 292 } 293 294 public boolean deleteData(String esIndexName, String esId) { 295 org.elasticsearch.action.delete.DeleteRequest deleteRequest = new org.elasticsearch.action.delete.DeleteRequest(esIndexName, esId); 296 org.elasticsearch.action.delete.DeleteResponse deleteResponse = null; 297 try { 298 deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); 299 } catch (Exception e) { 300 log.error("error is " + e); 301 return false; 302 303 } 304 return true; 305 } 306 307 /** 308 * 分组统计 309 * 310 * @param esIndex 索引 311 * @param groupFiled 被统计字段 312 */ 313 public Map<String, Long> groupCount(String esIndex, String groupFiled) { 314 Map<String, Long> statistics = new HashMap<>(); 315 SearchRequest searchRequest = new SearchRequest(); 316 searchRequest.indices(esIndex); 317 318 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 319 try { 320 sourceBuilder.size(0); 321 searchRequest.source(sourceBuilder); 322 323 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("fieldCount").field(groupFiled); 324 sourceBuilder.aggregation(aggregationBuilder); 325 326 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 327 Map<String, Aggregation> aggregationMap = result.getAggregations().asMap(); 328 ParsedStringTerms grageTerms = (ParsedStringTerms) aggregationMap.get("fieldCount"); 329 List buckets = grageTerms.getBuckets(); 330 331 for (Object object : buckets) { 332 ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object; 333 String name = obj.getKeyAsString(); 334 long count = obj.getDocCount(); 335 statistics.putIfAbsent(name, count); 336 } 337 } catch (Exception e) { 338 log.error("Group count failed!", e); 339 } 340 return statistics; 341 } 342 343 /** 344 * 分组统计 345 * 346 * @param esIndex 索引 347 * @param groupFiled 被统计字段 348 * @param boolQueryBuilder 查询条件 349 */ 350 public Map<String, Long> groupCount(String esIndex, String groupFiled, BoolQueryBuilder boolQueryBuilder) { 351 Map<String, Long> statistics = new HashMap<>(); 352 SearchRequest searchRequest = new SearchRequest(); 353 searchRequest.indices(esIndex); 354 355 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 356 try { 357 sourceBuilder.size(0); 358 searchRequest.source(sourceBuilder); 359 sourceBuilder.query(boolQueryBuilder); 360 361 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("fieldCount").field(groupFiled); 362 aggregationBuilder.order(BucketOrder.count(false)); 363 aggregationBuilder.size(10000); 364 sourceBuilder.aggregation(aggregationBuilder); 365 log.info("sourceBuilder:" + sourceBuilder.toString()); 366 367 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 368 Map<String, Aggregation> aggregationMap = result.getAggregations().asMap(); 369 ParsedStringTerms grageTerms = (ParsedStringTerms) aggregationMap.get("fieldCount"); 370 List buckets = grageTerms.getBuckets(); 371 372 for (Object object : buckets) { 373 ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object; 374 String name = obj.getKeyAsString(); 375 long count = obj.getDocCount(); 376 statistics.putIfAbsent(name, count); 377 } 378 } catch (Exception e) { 379 log.error("Group count failed!", e); 380 } 381 return statistics; 382 } 383 384 public SearchHit[] queryDataBig(String esIndex, SearchSourceBuilder sourceBuilder) { 385 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1)); 386 SearchRequest searchRequest = new SearchRequest(); 387 searchRequest.indices(esIndex); 388 searchRequest.source(sourceBuilder); 389 searchRequest.scroll(scroll); 390 try { 391 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 392 String scrollId = result.getScrollId(); 393 SearchHit[] searchHits2 = result.getHits().getHits(); 394 List<SearchHit> searchHitList = new ArrayList<>(); 395 for (int i = 0; i < searchHits2.length; i++) { 396 searchHitList.add(searchHits2[i]); 397 } 398 if (result.getHits().getTotalHits().value > 100000) { 399 long count = searchHits2.length; 400 while (count < 100000) { 401 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 402 scrollRequest.scroll(scroll); 403 try { 404 result = client.scroll(scrollRequest, RequestOptions.DEFAULT); 405 } catch (Exception e) { 406 log.error("error is " + e); 407 } 408 scrollId = result.getScrollId(); 409 searchHits2 = result.getHits().getHits(); 410 for (int i = 0; i < searchHits2.length; i++) { 411 searchHitList.add(searchHits2[i]); 412 } 413 long size = searchHits2.length; 414 count = count + size; 415 416 } 417 SearchHit[] searchHits = new SearchHit[searchHitList.size()]; 418 searchHitList.toArray(searchHits); 419 return searchHits; 420 } else { 421 while (searchHits2 != null && searchHits2.length > 0) { 422 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 423 scrollRequest.scroll(scroll); 424 try { 425 result = client.scroll(scrollRequest, RequestOptions.DEFAULT); 426 } catch (Exception e) { 427 log.error("error is " + e); 428 } 429 scrollId = result.getScrollId(); 430 searchHits2 = result.getHits().getHits(); 431 for (int i = 0; i < searchHits2.length; i++) { 432 searchHitList.add(searchHits2[i]); 433 } 434 } 435 SearchHit[] searchHits = new SearchHit[searchHitList.size()]; 436 searchHitList.toArray(searchHits); 437 return searchHits; 438 } 439 440 } catch (Exception e) { 441 log.error("error query info", e); 442 } 443 return null; 444 } 445 }
  • 相关阅读:
    野生前端的数据结构基础练习(3)——链表
    野生前端的数据结构基础练习(3)——链表
    野生前端的数据结构基础练习(3)——链表
    Spring MVC之LocaleResolver详解
    Winfrom 屏蔽Alt+F4
    最简单的单例模式
    Eclipse的优化
    Eclipse的优化
    用PULL解析器解析XML文件
    用PULL解析器解析XML文件
  • 原文地址:https://www.cnblogs.com/seufelix/p/13097762.html
Copyright © 2011-2022 走看看