zoukankan      html  css  js  c++  java
  • 使用Java High Level REST Client操作elasticsearch

    说明

    在明确了ES的基本概念和使用方法后,我们来学习如何使用ES的Java API.
    本文假设你已经对ES的基本概念已经有了一个比较全面的认识。

    客户端

    你可以用Java客户端做很多事情:

    • 执行标准的index,get,delete,update,search等操作。
    • 在正在运行的集群上执行管理任务。

    但是,通过官方文档可以得知,现在存在至少三种Java客户端。

    1. Transport Client
    2. Java High Level REST Client
    3. Java Low Level Rest Client

    造成这种混乱的原因是:

    • 长久以来,ES并没有官方的Java客户端,并且Java自身是可以简单支持ES的API的,于是就先做成了TransportClient。但是TransportClient的缺点是显而易见的,它没有使用RESTful风格的接口,而是二进制的方式传输数据。

    • 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起来也不错。但是缺点也很明显,因为TransportClient的使用者把代码迁移到Low Level REST Client的工作量比较大。官方文档专门为迁移代码出了一堆文档来提供参考。

    • 现在ES官方推出Java High Level REST Client,它是基于Java Low Level REST Client的封装,并且API接收参数和返回值和TransportClient是一样的,使得代码迁移变得容易并且支持了RESTful的风格,兼容了这两种客户端的优点。当然缺点是存在的,就是版本的问题。ES的小版本更新非常频繁,在最理想的情况下,客户端的版本要和ES的版本一致(至少主版本号一致),次版本号不一致的话,基本操作也许可以,但是新API就不支持了。

    • 强烈建议ES5及其以后的版本使用Java High Level REST Client。笔者这里使用的是ES5.6.3,下面的文章将基于JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven进行示例。

    前置条件:

    • JDK1.8
    • elasticsearch 6.3.2(其他版本未做测试,不保证完全兼容)
    • maven 
    • spring boot
    • 1.maven依赖:
            <!--elasticsearch base-->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- Java Low Level REST Client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- Java High Level REST Client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>6.3.2</version>
            </dependency>
    • 2.接入rest-higl-level-client
     1 import org.apache.http.HttpHost;
     2 import org.apache.http.auth.AuthScope;
     3 import org.apache.http.auth.UsernamePasswordCredentials;
     4 import org.apache.http.client.CredentialsProvider;
     5 import org.apache.http.impl.client.BasicCredentialsProvider;
     6 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
     7 import org.elasticsearch.client.RestClient;
     8 import org.elasticsearch.client.RestClientBuilder;
     9 import org.elasticsearch.client.RestHighLevelClient;
    10 import org.slf4j.Logger;
    11 import org.slf4j.LoggerFactory;
    12 import org.springframework.beans.factory.DisposableBean;
    13 import org.springframework.beans.factory.FactoryBean;
    14 import org.springframework.beans.factory.InitializingBean;
    15 import org.springframework.beans.factory.annotation.Value;
    16 import org.springframework.context.annotation.Configuration;
    17 
    18 @Configuration
    19 public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
    20     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
    21 
    22     @Value("${spring.data.elasticsearch.host}")
    23     private String host;
    24     @Value("${spring.data.elasticsearch.port}")
    25     private int port;
    26     @Value("${spring.data.elasticsearch.username}")
    27     private String username;
    28     @Value("${spring.data.elasticsearch.password}")
    29     private String password;
    30 
    31     private RestHighLevelClient restHighLevelClient;
    32 
    33     @Override
    34     public void destroy() throws Exception {
    35         try {
    36             LOGGER.info("Closing elasticSearch client");
    37             if (restHighLevelClient != null) {
    38                 restHighLevelClient.close();
    39             }
    40         } catch (final Exception e) {
    41             LOGGER.error("Error closing ElasticSearch client: ", e);
    42         }
    43     }
    44 
    45     @Override
    46     public RestHighLevelClient getObject() throws Exception {
    47         return restHighLevelClient;
    48     }
    49 
    50     @Override
    51     public Class<RestHighLevelClient> getObjectType() {
    52         return RestHighLevelClient.class;
    53     }
    54 
    55     @Override
    56     public boolean isSingleton() {
    57         return false;
    58     }
    59 
    60     @Override
    61     public void afterPropertiesSet() throws Exception {
    62         buildClient();
    63     }
    64 
    65     protected void buildClient() {
    66         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    67         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    68         RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
    69                 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    70                     @Override
    71                     public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
    72                         return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
    73                     }
    74                 });
    75 
    76         restHighLevelClient = new RestHighLevelClient(builder);
    77     }
    78 
    79 }
    • 3.index api
    1 Map<String, Object> jsonMap = new HashMap<>();
    2 jsonMap.put("user", "laimailai");
    3 jsonMap.put("postDate", new Date());
    4 jsonMap.put("message", "trying out Elasticsearch");
    5 IndexRequest indexRequest = new IndexRequest("index", "type", "1")
    6         .source(jsonMap);
    7 IndexResponse indexResponse = client.index(request);
    • 4.get api
    1 GetRequest getRequest = new GetRequest(
    2         "index",
    3         "type",
    4         "1");
    5 GetResponse getResponse = client.get(request);
    • 5.update api
    1 UpdateRequest request = new UpdateRequest(
    2         "index",
    3         "type",
    4         "1");
    5 UpdateResponse updateResponse = client.update(request);
    • 6.delete api
    1 DeleteRequest request = new DeleteRequest(
    2         "index",    
    3         "type",     
    4         "1");
    • 7.bulk api

    之前的文档说明过,bulk接口是批量index/update/delete操作
    在API中,只需要一个bulk request就可以完成一批请求。

     1 //1.bulk
     2 BulkRequest request = new BulkRequest();
     3 request.add(new IndexRequest("index", "type", "1")
     4         .source(XContentType.JSON, "field", "foo"));
     5 request.add(new IndexRequest("index", "type", "2")
     6         .source(XContentType.JSON, "field", "bar"));
     7 request.add(new IndexRequest("index", "type", "3")
     8         .source(XContentType.JSON, "field", "baz"));
     9 
    10 //同步
    11 BulkResponse bulkResponse = client.bulk(request);
    12 
    13 //异步
    14 client.bulkAsync(request, new ActionListener<BulkResponse>() {
    15     @Override
    16     public void onResponse(BulkResponse bulkResponse) {
    17 
    18     }
    19 
    20     @Override
    21     public void onFailure(Exception e) {
    22 
    23     }
    24 });
    • 8.bulkprocessor 划重点!!!

    BulkProcessor 简化bulk API的使用,并且使整个批量操作透明化。
    BulkProcessor 的执行需要三部分组成:

    1. RestHighLevelClient :执行bulk请求并拿到响应对象。
    2. BulkProcessor.Listener:在执行bulk request之前、之后和当bulk response发生错误时调用。
    3. ThreadPool:bulk request在这个线程池中执行操作,这使得每个请求不会被挡住,在其他请求正在执行时,也可以接收新的请求。

    示例代码:

     1 @Service
     2 public class ElasticSearchUtil {
     3     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);
     4 
     5     @Autowired
     6     private RestHighLevelClient restHighLevelClient;
     7 
     8     private BulkProcessor bulkProcessor;
     9 
    10     @PostConstruct
    11     public void init() {
    12         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    13             @Override
    14             public void beforeBulk(long executionId, BulkRequest request) {
    15                 //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数
    16                 int numberOfActions = request.numberOfActions();
    17                 LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    18             }
    19 
    20             @Override
    21             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    22                 //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。
    23                 if (response.hasFailures()) {
    24                     LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
    25                 } else {
    26                     LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
    27                 }
    28                 BulkItemResponse[] responses = response.getItems();
    29             }
    30 
    31             @Override
    32             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    33                 //重写方法,如果发生错误就会调用。
    34                 LOGGER.error("Failed to execute bulk", failure);
    35             }
    36         };
    37 
    38         //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
    39         BulkProcessor bulkProcessor = BulkProcessor.builder(restHighLevelClient::bulkAsync, listener)
    40                 // 1000条数据请求执行一次bulk
    41                 .setBulkActions(1000)
    42                 // 5mb的数据刷新一次bulk
    43                 .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
    44                 // 并发请求数量, 0不并发, 1并发允许执行
    45                 .setConcurrentRequests(0)
    46                 // 固定1s必须刷新一次
    47                 .setFlushInterval(TimeValue.timeValueSeconds(1L))
    48                 // 重试5次,间隔1s
    49                 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
    50                 .build();
    51         this.bulkProcessor = bulkProcessor;
    52     }
    53 
    54     @PreDestroy
    55     public void destroy() {
    56         try {
    57             bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
    58         } catch (InterruptedException e) {
    59             LOGGER.error("Failed to close bulkProcessor", e);
    60         }
    61         LOGGER.info("bulkProcessor closed!");
    62     }
    63 
    64     /**
    65      * 修改
    66      *
    67      * @param request
    68      * @throws IOException
    69      */
    70     public void update(UpdateRequest request) {
    71         this.bulkProcessor.add(request);
    72     }
    73 
    74     /**
    75      * 新增
    76      *
    77      * @param request
    78      */
    79     public void insert(IndexRequest request) {
    80         this.bulkProcessor.add(request);
    81     }
    82 }

    bulkProcessor使用用例:

     1         //新建三个 index 请求
     2         IndexRequest one = new IndexRequest("posts", "doc", "1").
     3                 source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
     4         IndexRequest two = new IndexRequest("posts", "doc", "2")
     5                 .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
     6         IndexRequest three = new IndexRequest("posts", "doc", "3")
     7                 .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
     8         //新的三条index请求加入到上面配置好的bulkProcessor里面。
     9         bulkProcessor.add(one);
    10         bulkProcessor.add(two);
    11         bulkProcessor.add(three);
    12         // add many request here.
    13         //bulkProcess必须被关闭才能使上面添加的操作生效
    14         bulkProcessor.close(); //立即关闭
    15         //关闭bulkProcess的两种方法:
    16         try {
    17             //2.调用awaitClose.
    18             //简单来说,就是在规定的时间内,是否所有批量操作完成。全部完成,返回true,未完成返//回false
    19             
    20             boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
    21             
    22         } catch (InterruptedException e) {
    23             // TODO Auto-generated catch block
    24             e.printStackTrace();
    25         }
    • 9.upsert api

    update --当id不存在时将会抛出异常:

    1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);
    2 UpdateResponse response = restHighLevelClient.update(request);

    upsert--id不存在时就插入:

    1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);
    2 UpdateResponse response = restHighLevelClient.update(request);
    • 10.search api

    Search API提供了对文档的查询和聚合的查询。
    它的基本形式:

    1 SearchRequest searchRequest = new SearchRequest();  //构造search request .在这里无参,查询全部索引
    2 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多数查询参数要写在searchSourceBuilder里 
    3 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的条件。
    1 SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引
    2 searchRequest.types("doc"); //指定doc类型

    使用SearchSourceBuilder

    大多数的查询控制都可以使用SearchSourceBuilder实现。
    举一个简单例子:

    1 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造一个默认配置的对象
    2 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //设置查询
    3 sourceBuilder.from(0); //设置从哪里开始
    4 sourceBuilder.size(5); //每页5条
    5 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //设置超时时间

    配置好searchSourceBuilder后,将它传入searchRequest里:

    1 SearchRequest searchRequest = new SearchRequest();
    2 searchRequest.source(sourceBuilder);
    1 //全量搜索
    2 SearchRequest searchRequest = new SearchRequest();
    3 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    4 searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    5 searchRequest.source(searchSourceBuilder);
    6 SearchRequest searchRequest = new SearchRequest("index");
     1 //根据多个条件搜索
     2 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
     3 for (String id: ids) {
     4     TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);
     5     boolQueryBuilder.should(termQueryBuilder);
     6 }
     7 SearchRequest searchRequest = new SearchRequest(index);
     8 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
     9 searchSourceBuilder.query(boolQueryBuilder);
    10 searchRequest.source(searchSourceBuilder);
    11 SearchResponse response = null;
    12     response = restHighLevelClient.search(searchRequest);
    13 return response;
    • 11.search scroll api
     1 //scroll 分页搜索
     2 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
     3 SearchRequest searchRequest = new SearchRequest("posts");
     4 searchRequest.scroll(scroll);
     5 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
     6 searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
     7 searchRequest.source(searchSourceBuilder);
     8 
     9 SearchResponse searchResponse = client.search(searchRequest);
    10 String scrollId = searchResponse.getScrollId();
    11 SearchHit[] searchHits = searchResponse.getHits().getHits();
    12 
    13 while (searchHits != null && searchHits.length > 0) {
    14     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
    15     scrollRequest.scroll(scroll);
    16     searchResponse = client.searchScroll(scrollRequest);
    17     scrollId = searchResponse.getScrollId();
    18     searchHits = searchResponse.getHits().getHits();
    19 
    20 }
    21 
    22 ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
    23 clearScrollRequest.addScrollId(scrollId);
    24 ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
    25 boolean succeeded = clearScrollResponse.isSucceeded();
    • 12.排序

    SearchSourceBuilder可以添加一种或多种SortBuilder。
    有四种特殊的排序实现:

      • field
      • score
      • GeoDistance
      • scriptSortBuilder
    1 sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列
    2 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  //并且按照id正序排列
    • 13.过滤

    默认情况下,searchRequest返回文档内容,与REST API一样,这里你可以重写search行为。例如,你可以完全关闭"_source"检索。

    1 sourceBuilder.fetchSource(false);

    该方法还接受一个或多个通配符模式的数组,以更细粒度地控制包含或排除哪些字段。

    1 String[] includeFields = new String[] {"title", "user", "innerObject.*"};
    2 String[] excludeFields = new String[] {"_type"};
    3 sourceBuilder.fetchSource(includeFields, excludeFields);
    • 14.聚合

    通过配置适当的 AggregationBuilder ,再将它传入SearchSourceBuilder里,就可以完成聚合请求了。
    之前的文档里面,我们通过下面这条命令,导入了一千条account信息:

    curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json"

    随后,我们介绍了如何通过聚合请求进行分组:

    GET /bank/_search?pretty
    {
      "size": 0,
      "aggs": {
        "group_by_state": {
          "terms": {
            "field": "state.keyword"
          }
        }
      }
    }

    我们将这一千条数据根据state字段分组,得到响应:

    {
      "took": 2,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 999,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "group_by_state": {
          "doc_count_error_upper_bound": 20,
          "sum_other_doc_count": 770,
          "buckets": [
            {
              "key": "ID",
              "doc_count": 27
            },
            {
              "key": "TX",
              "doc_count": 27
            },
            {
              "key": "AL",
              "doc_count": 25
            },
            {
              "key": "MD",
              "doc_count": 25
            },
            {
              "key": "TN",
              "doc_count": 23
            },
            {
              "key": "MA",
              "doc_count": 21
            },
            {
              "key": "NC",
              "doc_count": 21
            },
            {
              "key": "ND",
              "doc_count": 21
            },
            {
              "key": "MO",
              "doc_count": 20
            },
            {
              "key": "AK",
              "doc_count": 19
            }
          ]
        }
      }
    }

    Java实现:

     1    @Test
     2     public void test2(){
     3         RestClient lowLevelRestClient = RestClient.builder(
     4                 new HttpHost("172.16.73.50", 9200, "http")).build();
     5         RestHighLevelClient client =
     6                 new RestHighLevelClient(lowLevelRestClient);
     7         SearchRequest searchRequest = new SearchRequest("bank");
     8         searchRequest.types("account");
     9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
    10                 .field("state.keyword");
    11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    12         searchSourceBuilder.aggregation(aggregation);
    13         searchSourceBuilder.size(0);
    14         searchRequest.source(searchSourceBuilder);
    15         try {
    16             SearchResponse searchResponse = client.search(searchRequest);
    17             System.out.println(searchResponse.toString());
    18         } catch (IOException e) {
    19             e.printStackTrace();
    20         }
    21         
    22     }

    Search response

    Search response返回对象与其在API里的一样,返回一些元数据和文档数据。
    首先,返回对象里的数据十分重要,因为这是查询的返回结果、使用分片情况、文档数据,HTTP状态码等

    1 RestStatus status = searchResponse.status();
    2 TimeValue took = searchResponse.getTook();
    3 Boolean terminatedEarly = searchResponse.isTerminatedEarly();
    4 boolean timedOut = searchResponse.isTimedOut();

    其次,返回对象里面包含关于分片的信息和分片失败的处理:

    1 int totalShards = searchResponse.getTotalShards();
    2 int successfulShards = searchResponse.getSuccessfulShards();
    3 int failedShards = searchResponse.getFailedShards();
    4 for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
    5     // failures should be handled here
    6 }

    取回searchHit

    为了取回文档数据,我们要从search response的返回对象里先得到searchHit对象:

    1 SearchHits hits = searchResponse.getHits();

    取回文档数据:

     1     @Test
     2     public void test2(){
     3         RestClient lowLevelRestClient = RestClient.builder(
     4                 new HttpHost("172.16.73.50", 9200, "http")).build();
     5         RestHighLevelClient client =
     6                 new RestHighLevelClient(lowLevelRestClient);
     7         SearchRequest searchRequest = new SearchRequest("bank");
     8         searchRequest.types("account");
     9         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    10         searchRequest.source(searchSourceBuilder);
    11         try {
    12             SearchResponse searchResponse = client.search(searchRequest);
    13             SearchHits searchHits = searchResponse.getHits();
    14             SearchHit[] searchHit = searchHits.getHits();
    15             for (SearchHit hit : searchHit) {
    16                 System.out.println(hit.getSourceAsString());
    17             }
    18         } catch (IOException e) {
    19             e.printStackTrace();
    20         }
    21         
    22     }

    根据需要,还可以转换成其他数据类型:

    1 String sourceAsString = hit.getSourceAsString();
    2 Map<String, Object> sourceAsMap = hit.getSourceAsMap();
    3 String documentTitle = (String) sourceAsMap.get("title");
    4 List<Object> users = (List<Object>) sourceAsMap.get("user");
    5 Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");

    取回聚合数据

    聚合数据可以通过SearchResponse返回对象,取到它的根节点,然后再根据名称取到聚合数据。

    GET /bank/_search?pretty
    {
      "size": 0,
      "aggs": {
        "group_by_state": {
          "terms": {
            "field": "state.keyword"
          }
        }
      }
    }

    响应:

    {
      "took": 2,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 999,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "group_by_state": {
          "doc_count_error_upper_bound": 20,
          "sum_other_doc_count": 770,
          "buckets": [
            {
              "key": "ID",
              "doc_count": 27
            },
            {
              "key": "TX",
              "doc_count": 27
            },
            {
              "key": "AL",
              "doc_count": 25
            },
            {
              "key": "MD",
              "doc_count": 25
            },
            {
              "key": "TN",
              "doc_count": 23
            },
            {
              "key": "MA",
              "doc_count": 21
            },
            {
              "key": "NC",
              "doc_count": 21
            },
            {
              "key": "ND",
              "doc_count": 21
            },
            {
              "key": "MO",
              "doc_count": 20
            },
            {
              "key": "AK",
              "doc_count": 19
            }
          ]
        }
      }
    }

    Java实现:

     1     @Test
     2     public void test2(){
     3         RestClient lowLevelRestClient = RestClient.builder(
     4                 new HttpHost("172.16.73.50", 9200, "http")).build();
     5         RestHighLevelClient client =
     6                 new RestHighLevelClient(lowLevelRestClient);
     7         SearchRequest searchRequest = new SearchRequest("bank");
     8         searchRequest.types("account");
     9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
    10                 .field("state.keyword");
    11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    12         searchSourceBuilder.aggregation(aggregation);
    13         searchSourceBuilder.size(0);
    14         searchRequest.source(searchSourceBuilder);
    15         try {
    16             SearchResponse searchResponse = client.search(searchRequest);
    17             Aggregations aggs = searchResponse.getAggregations();
    18             Terms byStateAggs = aggs.get("group_by_state");
    19             Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket
    20             System.out.println(b.getKeyAsString()+","+b.getDocCount());
    21             System.out.println("!!!");
    22             List<? extends Bucket> aggList = byStateAggs.getBuckets();//获取bucket数组里所有数据
    23             for (Bucket bucket : aggList) {
    24                 System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());;
    25             }
    26         } catch (IOException e) {
    27             e.printStackTrace();
    28         }
    29     }

    参考:https://www.jianshu.com/p/5cb91ed22956

    参考:https://my.oschina.net/u/3795437/blog/2253366

  • 相关阅读:
    CF1539 VP 记录
    CF1529 VP 记录
    CF875C National Property 题解
    CF1545 比赛记录
    CF 1550 比赛记录
    CF1539E Game with Cards 题解
    CF1202F You Are Given Some Letters... 题解
    vmware Linux虚拟机挂载共享文件夹
    利用SOLR搭建企业搜索平台 之九(solr的查询语法)
    利用SOLR搭建企业搜索平台 之四(MultiCore)
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10750868.html
Copyright © 2011-2022 走看看