zoukankan      html  css  js  c++  java
  • 使用Java High Level REST Client操作elasticsearch

    说明

    在明确了ES的基本概念和使用方法后,我们来学习如何使用ES的Java API.
    本文假设你已经对ES的基本概念已经有了一个比较全面的认识。

    客户端

    你可以用Java客户端做很多事情:

    • 执行标准的index,get,delete,update,search等操作。
    • 在正在运行的集群上执行管理任务。

    但是,通过官方文档可以得知,现在存在至少三种Java客户端。

    1. Transport Client
    2. Java High Level REST Client
    3. Java Low Level Rest Client

    造成这种混乱的原因是:

    • 长久以来,ES并没有官方的Java客户端,并且Java自身是可以简单支持ES的API的,于是就先做成了TransportClient。但是TransportClient的缺点是显而易见的,它没有使用RESTful风格的接口,而是二进制的方式传输数据。

    • 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起来也不错。但是缺点也很明显,因为TransportClient的使用者把代码迁移到Low Level REST Client的工作量比较大。官方文档专门为迁移代码出了一堆文档来提供参考。

    • 现在ES官方推出Java High Level REST Client,它是基于Java Low Level REST Client的封装,并且API接收参数和返回值和TransportClient是一样的,使得代码迁移变得容易并且支持了RESTful的风格,兼容了这两种客户端的优点。当然缺点是存在的,就是版本的问题。ES的小版本更新非常频繁,在最理想的情况下,客户端的版本要和ES的版本一致(至少主版本号一致),次版本号不一致的话,基本操作也许可以,但是新API就不支持了。

    • 强烈建议ES5及其以后的版本使用Java High Level REST Client。笔者这里使用的是ES5.6.3,下面的文章将基于JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven进行示例。

    前置条件:

    • JDK1.8
    • elasticsearch 6.3.2(其他版本未做测试,不保证完全兼容)
    • maven 
    • spring boot
    • 1.maven依赖:
            <!--elasticsearch base-->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- Java Low Level REST Client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- Java High Level REST Client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>6.3.2</version>
            </dependency>
    • 2.接入rest-higl-level-client
     1 import org.apache.http.HttpHost;
     2 import org.apache.http.auth.AuthScope;
     3 import org.apache.http.auth.UsernamePasswordCredentials;
     4 import org.apache.http.client.CredentialsProvider;
     5 import org.apache.http.impl.client.BasicCredentialsProvider;
     6 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
     7 import org.elasticsearch.client.RestClient;
     8 import org.elasticsearch.client.RestClientBuilder;
     9 import org.elasticsearch.client.RestHighLevelClient;
    10 import org.slf4j.Logger;
    11 import org.slf4j.LoggerFactory;
    12 import org.springframework.beans.factory.DisposableBean;
    13 import org.springframework.beans.factory.FactoryBean;
    14 import org.springframework.beans.factory.InitializingBean;
    15 import org.springframework.beans.factory.annotation.Value;
    16 import org.springframework.context.annotation.Configuration;
    17 
    18 @Configuration
    19 public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
    20     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
    21 
    22     @Value("${spring.data.elasticsearch.host}")
    23     private String host;
    24     @Value("${spring.data.elasticsearch.port}")
    25     private int port;
    26     @Value("${spring.data.elasticsearch.username}")
    27     private String username;
    28     @Value("${spring.data.elasticsearch.password}")
    29     private String password;
    30 
    31     private RestHighLevelClient restHighLevelClient;
    32 
    33     @Override
    34     public void destroy() throws Exception {
    35         try {
    36             LOGGER.info("Closing elasticSearch client");
    37             if (restHighLevelClient != null) {
    38                 restHighLevelClient.close();
    39             }
    40         } catch (final Exception e) {
    41             LOGGER.error("Error closing ElasticSearch client: ", e);
    42         }
    43     }
    44 
    45     @Override
    46     public RestHighLevelClient getObject() throws Exception {
    47         return restHighLevelClient;
    48     }
    49 
    50     @Override
    51     public Class<RestHighLevelClient> getObjectType() {
    52         return RestHighLevelClient.class;
    53     }
    54 
    55     @Override
    56     public boolean isSingleton() {
    57         return false;
    58     }
    59 
    60     @Override
    61     public void afterPropertiesSet() throws Exception {
    62         buildClient();
    63     }
    64 
    65     protected void buildClient() {
    66         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    67         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    68         RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
    69                 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    70                     @Override
    71                     public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
    72                         return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
    73                     }
    74                 });
    75 
    76         restHighLevelClient = new RestHighLevelClient(builder);
    77     }
    78 
    79 }
    • 3.index api
    1 Map<String, Object> jsonMap = new HashMap<>();
    2 jsonMap.put("user", "laimailai");
    3 jsonMap.put("postDate", new Date());
    4 jsonMap.put("message", "trying out Elasticsearch");
    5 IndexRequest indexRequest = new IndexRequest("index", "type", "1")
    6         .source(jsonMap);
    7 IndexResponse indexResponse = client.index(request);
    • 4.get api
    1 GetRequest getRequest = new GetRequest(
    2         "index",
    3         "type",
    4         "1");
    5 GetResponse getResponse = client.get(request);
    • 5.update api
    1 UpdateRequest request = new UpdateRequest(
    2         "index",
    3         "type",
    4         "1");
    5 UpdateResponse updateResponse = client.update(request);
    • 6.delete api
    1 DeleteRequest request = new DeleteRequest(
    2         "index",    
    3         "type",     
    4         "1");
    • 7.bulk api

    之前的文档说明过,bulk接口是批量index/update/delete操作
    在API中,只需要一个bulk request就可以完成一批请求。

     1 //1.bulk
     2 BulkRequest request = new BulkRequest();
     3 request.add(new IndexRequest("index", "type", "1")
     4         .source(XContentType.JSON, "field", "foo"));
     5 request.add(new IndexRequest("index", "type", "2")
     6         .source(XContentType.JSON, "field", "bar"));
     7 request.add(new IndexRequest("index", "type", "3")
     8         .source(XContentType.JSON, "field", "baz"));
     9 
    10 //同步
    11 BulkResponse bulkResponse = client.bulk(request);
    12 
    13 //异步
    14 client.bulkAsync(request, new ActionListener<BulkResponse>() {
    15     @Override
    16     public void onResponse(BulkResponse bulkResponse) {
    17 
    18     }
    19 
    20     @Override
    21     public void onFailure(Exception e) {
    22 
    23     }
    24 });
    • 8.bulkprocessor 划重点!!!

    BulkProcessor 简化bulk API的使用,并且使整个批量操作透明化。
    BulkProcessor 的执行需要三部分组成:

    1. RestHighLevelClient :执行bulk请求并拿到响应对象。
    2. BulkProcessor.Listener:在执行bulk request之前、之后和当bulk response发生错误时调用。
    3. ThreadPool:bulk request在这个线程池中执行操作,这使得每个请求不会被挡住,在其他请求正在执行时,也可以接收新的请求。

    示例代码:

     1 @Service
     2 public class ElasticSearchUtil {
     3     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);
     4 
     5     @Autowired
     6     private RestHighLevelClient restHighLevelClient;
     7 
     8     private BulkProcessor bulkProcessor;
     9 
    10     @PostConstruct
    11     public void init() {
    12         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    13             @Override
    14             public void beforeBulk(long executionId, BulkRequest request) {
    15                 //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数
    16                 int numberOfActions = request.numberOfActions();
    17                 LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    18             }
    19 
    20             @Override
    21             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    22                 //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。
    23                 if (response.hasFailures()) {
    24                     LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
    25                 } else {
    26                     LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
    27                 }
    28                 BulkItemResponse[] responses = response.getItems();
    29             }
    30 
    31             @Override
    32             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    33                 //重写方法,如果发生错误就会调用。
    34                 LOGGER.error("Failed to execute bulk", failure);
    35             }
    36         };
    37 
    38         //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
    39         BulkProcessor bulkProcessor = BulkProcessor.builder(restHighLevelClient::bulkAsync, listener)
    40                 // 1000条数据请求执行一次bulk
    41                 .setBulkActions(1000)
    42                 // 5mb的数据刷新一次bulk
    43                 .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
    44                 // 并发请求数量, 0不并发, 1并发允许执行
    45                 .setConcurrentRequests(0)
    46                 // 固定1s必须刷新一次
    47                 .setFlushInterval(TimeValue.timeValueSeconds(1L))
    48                 // 重试5次,间隔1s
    49                 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
    50                 .build();
    51         this.bulkProcessor = bulkProcessor;
    52     }
    53 
    54     @PreDestroy
    55     public void destroy() {
    56         try {
    57             bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
    58         } catch (InterruptedException e) {
    59             LOGGER.error("Failed to close bulkProcessor", e);
    60         }
    61         LOGGER.info("bulkProcessor closed!");
    62     }
    63 
    64     /**
    65      * 修改
    66      *
    67      * @param request
    68      * @throws IOException
    69      */
    70     public void update(UpdateRequest request) {
    71         this.bulkProcessor.add(request);
    72     }
    73 
    74     /**
    75      * 新增
    76      *
    77      * @param request
    78      */
    79     public void insert(IndexRequest request) {
    80         this.bulkProcessor.add(request);
    81     }
    82 }

    bulkProcessor使用用例:

     1         //新建三个 index 请求
     2         IndexRequest one = new IndexRequest("posts", "doc", "1").
     3                 source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
     4         IndexRequest two = new IndexRequest("posts", "doc", "2")
     5                 .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
     6         IndexRequest three = new IndexRequest("posts", "doc", "3")
     7                 .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
     8         //新的三条index请求加入到上面配置好的bulkProcessor里面。
     9         bulkProcessor.add(one);
    10         bulkProcessor.add(two);
    11         bulkProcessor.add(three);
    12         // add many request here.
    13         //bulkProcess必须被关闭才能使上面添加的操作生效
    14         bulkProcessor.close(); //立即关闭
    15         //关闭bulkProcess的两种方法:
    16         try {
    17             //2.调用awaitClose.
    18             //简单来说,就是在规定的时间内,是否所有批量操作完成。全部完成,返回true,未完成返//回false
    19             
    20             boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
    21             
    22         } catch (InterruptedException e) {
    23             // TODO Auto-generated catch block
    24             e.printStackTrace();
    25         }
    • 9.upsert api

    update --当id不存在时将会抛出异常:

    1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);
    2 UpdateResponse response = restHighLevelClient.update(request);

    upsert--id不存在时就插入:

    1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);
    2 UpdateResponse response = restHighLevelClient.update(request);
    • 10.search api

    Search API提供了对文档的查询和聚合的查询。
    它的基本形式:

    1 SearchRequest searchRequest = new SearchRequest();  //构造search request .在这里无参,查询全部索引
    2 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多数查询参数要写在searchSourceBuilder里 
    3 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的条件。
    1 SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引
    2 searchRequest.types("doc"); //指定doc类型

    使用SearchSourceBuilder

    大多数的查询控制都可以使用SearchSourceBuilder实现。
    举一个简单例子:

    1 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造一个默认配置的对象
    2 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //设置查询
    3 sourceBuilder.from(0); //设置从哪里开始
    4 sourceBuilder.size(5); //每页5条
    5 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //设置超时时间

    配置好searchSourceBuilder后,将它传入searchRequest里:

    1 SearchRequest searchRequest = new SearchRequest();
    2 searchRequest.source(sourceBuilder);
    1 //全量搜索
    2 SearchRequest searchRequest = new SearchRequest();
    3 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    4 searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    5 searchRequest.source(searchSourceBuilder);
    6 SearchRequest searchRequest = new SearchRequest("index");
     1 //根据多个条件搜索
     2 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
     3 for (String id: ids) {
     4     TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);
     5     boolQueryBuilder.should(termQueryBuilder);
     6 }
     7 SearchRequest searchRequest = new SearchRequest(index);
     8 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
     9 searchSourceBuilder.query(boolQueryBuilder);
    10 searchRequest.source(searchSourceBuilder);
    11 SearchResponse response = null;
    12     response = restHighLevelClient.search(searchRequest);
    13 return response;
    • 11.search scroll api
     1 //scroll 分页搜索
     2 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
     3 SearchRequest searchRequest = new SearchRequest("posts");
     4 searchRequest.scroll(scroll);
     5 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
     6 searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
     7 searchRequest.source(searchSourceBuilder);
     8 
     9 SearchResponse searchResponse = client.search(searchRequest);
    10 String scrollId = searchResponse.getScrollId();
    11 SearchHit[] searchHits = searchResponse.getHits().getHits();
    12 
    13 while (searchHits != null && searchHits.length > 0) {
    14     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
    15     scrollRequest.scroll(scroll);
    16     searchResponse = client.searchScroll(scrollRequest);
    17     scrollId = searchResponse.getScrollId();
    18     searchHits = searchResponse.getHits().getHits();
    19 
    20 }
    21 
    22 ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
    23 clearScrollRequest.addScrollId(scrollId);
    24 ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
    25 boolean succeeded = clearScrollResponse.isSucceeded();
    • 12.排序

    SearchSourceBuilder可以添加一种或多种SortBuilder。
    有四种特殊的排序实现:

      • field
      • score
      • GeoDistance
      • scriptSortBuilder
    1 sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列
    2 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  //并且按照id正序排列
    • 13.过滤

    默认情况下,searchRequest返回文档内容,与REST API一样,这里你可以重写search行为。例如,你可以完全关闭"_source"检索。

    1 sourceBuilder.fetchSource(false);

    该方法还接受一个或多个通配符模式的数组,以更细粒度地控制包含或排除哪些字段。

    1 String[] includeFields = new String[] {"title", "user", "innerObject.*"};
    2 String[] excludeFields = new String[] {"_type"};
    3 sourceBuilder.fetchSource(includeFields, excludeFields);
    • 14.聚合

    通过配置适当的 AggregationBuilder ,再将它传入SearchSourceBuilder里,就可以完成聚合请求了。
    之前的文档里面,我们通过下面这条命令,导入了一千条account信息:

    curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json"

    随后,我们介绍了如何通过聚合请求进行分组:

    GET /bank/_search?pretty
    {
      "size": 0,
      "aggs": {
        "group_by_state": {
          "terms": {
            "field": "state.keyword"
          }
        }
      }
    }

    我们将这一千条数据根据state字段分组,得到响应:

    {
      "took": 2,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 999,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "group_by_state": {
          "doc_count_error_upper_bound": 20,
          "sum_other_doc_count": 770,
          "buckets": [
            {
              "key": "ID",
              "doc_count": 27
            },
            {
              "key": "TX",
              "doc_count": 27
            },
            {
              "key": "AL",
              "doc_count": 25
            },
            {
              "key": "MD",
              "doc_count": 25
            },
            {
              "key": "TN",
              "doc_count": 23
            },
            {
              "key": "MA",
              "doc_count": 21
            },
            {
              "key": "NC",
              "doc_count": 21
            },
            {
              "key": "ND",
              "doc_count": 21
            },
            {
              "key": "MO",
              "doc_count": 20
            },
            {
              "key": "AK",
              "doc_count": 19
            }
          ]
        }
      }
    }

    Java实现:

     1    @Test
     2     public void test2(){
     3         RestClient lowLevelRestClient = RestClient.builder(
     4                 new HttpHost("172.16.73.50", 9200, "http")).build();
     5         RestHighLevelClient client =
     6                 new RestHighLevelClient(lowLevelRestClient);
     7         SearchRequest searchRequest = new SearchRequest("bank");
     8         searchRequest.types("account");
     9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
    10                 .field("state.keyword");
    11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    12         searchSourceBuilder.aggregation(aggregation);
    13         searchSourceBuilder.size(0);
    14         searchRequest.source(searchSourceBuilder);
    15         try {
    16             SearchResponse searchResponse = client.search(searchRequest);
    17             System.out.println(searchResponse.toString());
    18         } catch (IOException e) {
    19             e.printStackTrace();
    20         }
    21         
    22     }

    Search response

    Search response返回对象与其在API里的一样,返回一些元数据和文档数据。
    首先,返回对象里的数据十分重要,因为这是查询的返回结果、使用分片情况、文档数据,HTTP状态码等

    1 RestStatus status = searchResponse.status();
    2 TimeValue took = searchResponse.getTook();
    3 Boolean terminatedEarly = searchResponse.isTerminatedEarly();
    4 boolean timedOut = searchResponse.isTimedOut();

    其次,返回对象里面包含关于分片的信息和分片失败的处理:

    1 int totalShards = searchResponse.getTotalShards();
    2 int successfulShards = searchResponse.getSuccessfulShards();
    3 int failedShards = searchResponse.getFailedShards();
    4 for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
    5     // failures should be handled here
    6 }

    取回searchHit

    为了取回文档数据,我们要从search response的返回对象里先得到searchHit对象:

    1 SearchHits hits = searchResponse.getHits();

    取回文档数据:

     1     @Test
     2     public void test2(){
     3         RestClient lowLevelRestClient = RestClient.builder(
     4                 new HttpHost("172.16.73.50", 9200, "http")).build();
     5         RestHighLevelClient client =
     6                 new RestHighLevelClient(lowLevelRestClient);
     7         SearchRequest searchRequest = new SearchRequest("bank");
     8         searchRequest.types("account");
     9         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    10         searchRequest.source(searchSourceBuilder);
    11         try {
    12             SearchResponse searchResponse = client.search(searchRequest);
    13             SearchHits searchHits = searchResponse.getHits();
    14             SearchHit[] searchHit = searchHits.getHits();
    15             for (SearchHit hit : searchHit) {
    16                 System.out.println(hit.getSourceAsString());
    17             }
    18         } catch (IOException e) {
    19             e.printStackTrace();
    20         }
    21         
    22     }

    根据需要,还可以转换成其他数据类型:

    1 String sourceAsString = hit.getSourceAsString();
    2 Map<String, Object> sourceAsMap = hit.getSourceAsMap();
    3 String documentTitle = (String) sourceAsMap.get("title");
    4 List<Object> users = (List<Object>) sourceAsMap.get("user");
    5 Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");

    取回聚合数据

    聚合数据可以通过SearchResponse返回对象,取到它的根节点,然后再根据名称取到聚合数据。

    GET /bank/_search?pretty
    {
      "size": 0,
      "aggs": {
        "group_by_state": {
          "terms": {
            "field": "state.keyword"
          }
        }
      }
    }

    响应:

    {
      "took": 2,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 999,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "group_by_state": {
          "doc_count_error_upper_bound": 20,
          "sum_other_doc_count": 770,
          "buckets": [
            {
              "key": "ID",
              "doc_count": 27
            },
            {
              "key": "TX",
              "doc_count": 27
            },
            {
              "key": "AL",
              "doc_count": 25
            },
            {
              "key": "MD",
              "doc_count": 25
            },
            {
              "key": "TN",
              "doc_count": 23
            },
            {
              "key": "MA",
              "doc_count": 21
            },
            {
              "key": "NC",
              "doc_count": 21
            },
            {
              "key": "ND",
              "doc_count": 21
            },
            {
              "key": "MO",
              "doc_count": 20
            },
            {
              "key": "AK",
              "doc_count": 19
            }
          ]
        }
      }
    }

    Java实现:

     1     @Test
     2     public void test2(){
     3         RestClient lowLevelRestClient = RestClient.builder(
     4                 new HttpHost("172.16.73.50", 9200, "http")).build();
     5         RestHighLevelClient client =
     6                 new RestHighLevelClient(lowLevelRestClient);
     7         SearchRequest searchRequest = new SearchRequest("bank");
     8         searchRequest.types("account");
     9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
    10                 .field("state.keyword");
    11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    12         searchSourceBuilder.aggregation(aggregation);
    13         searchSourceBuilder.size(0);
    14         searchRequest.source(searchSourceBuilder);
    15         try {
    16             SearchResponse searchResponse = client.search(searchRequest);
    17             Aggregations aggs = searchResponse.getAggregations();
    18             Terms byStateAggs = aggs.get("group_by_state");
    19             Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket
    20             System.out.println(b.getKeyAsString()+","+b.getDocCount());
    21             System.out.println("!!!");
    22             List<? extends Bucket> aggList = byStateAggs.getBuckets();//获取bucket数组里所有数据
    23             for (Bucket bucket : aggList) {
    24                 System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());;
    25             }
    26         } catch (IOException e) {
    27             e.printStackTrace();
    28         }
    29     }

    参考:https://www.jianshu.com/p/5cb91ed22956

    参考:https://my.oschina.net/u/3795437/blog/2253366

  • 相关阅读:
    (笔记)Linux内核学习(二)之进程
    (笔记)Linux内核学习(一)之内核介绍
    状态机思路在程序设计中的应用
    内存操作函数memmove,memcpy,memset
    linux下常用的几个时间函数:time,gettimeofday,clock_gettime,_ftime
    Camera ISO、快门、光圈、曝光这几个概念
    C语言中的指针和内存泄漏几种情况
    音视频文件码率与大小计算
    CC++中 fopen中文件打开方式的区别:
    常用DOS命令
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10750868.html
Copyright © 2011-2022 走看看