zoukankan      html  css  js  c++  java
  • es搜索,统计

      记得好几年前用es做过标签画像统计,如今再看es时已是很生疏了,再用时已更新到了7.12版本了。以前用TransportClient客户端,现在出了而且是官方推荐用RestHighLevelClient客户端。

    这几天用RestHighLevelClient时还是觉得比较方便的。现将一些基本常用功能记录一下。

    1.初始化和关闭

    public static RestHighLevelClient getClient(String host, int port) {
            LOGGER.info("Init ES!");
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost(host, port, "http")));
            return client;
        }
    
        public static void closeES() {
            LOGGER.info("ES closed!");
            if(null != client) {
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    2.创建index以及mapping

    public boolean createIndexMapping(RestHighLevelClient client, String indexName){
            LOGGER.info("create index and mapping ...");
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
    
            createIndexRequest.settings(Settings.builder()
                    .put("index.number_of_shards",1)
                    .put("index.number_of_replicas",0));
    
            try {
                XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties")
                        .startObject("content")
                        .field("type", "text") // 数据类型
                        .field("index", "true") //默认
                        .field("analyzer", "ik_max_word")
                        .field("search_analyzer", "ik_smart")
                        .endObject()

    .startObject("date") .field("type", "date") // 数据类型 .field("index", "true") //默认 .endObject() .startObject("title") .field("type", "text") // 数据类型 .field("index", "true") //默认 .field("analyzer", "ik_max_word") .field("search_analyzer", "ik_smart") .endObject() .endObject() .endObject(); createIndexRequest.mapping(xContentBuilder); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); return createIndexResponse.isAcknowledged(); } catch (IOException e) { e.printStackTrace(); } return false; }

    3.获取一篇doc

    public Map<String, Object> getOneMap(RestHighLevelClient client, String index, String id) {
            GetRequest getRequest = new GetRequest(index, id);
            GetResponse getResponse = null;
            try {
                getResponse = client.get(getRequest, RequestOptions.DEFAULT);
                if(getResponse.isExists()) {
                    return getResponse.getSource();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }

    4.删除多篇

    /**
         * 批量删除
         * @param client
         * @param index
         * @param ids
         * @return
         */
        public Object deleteList(RestHighLevelClient client, String index, List<String> ids) {
            //构建批量删除请求
            DeleteByQueryRequest request = new DeleteByQueryRequest(index);
            IdsQueryBuilder queryBuilder = new IdsQueryBuilder();
            for(String id: ids) {
                queryBuilder.addIds(id);
            }
            // 匹配所有
            request.setQuery(queryBuilder);
            BulkByScrollResponse response = null;
            try {
                response = client.deleteByQuery(request, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return JSONObject.toJSON(response);
        }

    5.批量导入

    /**
         * 批量导入
         * @param client
         * @param index
         * @param list
         * @return
         */
        public boolean insertDocByBulk(RestHighLevelClient client, String index, List<Doc> list) {
            //批量插入请求
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout("10s");
            for(int i = 0; i < list.size(); i++) {
                Doc doc = list.get(i);
                //这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新)
    
                Map<String, Object> kv = new HashMap<>();
                kv.put("id", doc.getId());
                kv.put("title", doc.getTitle());
                kv.put("content", doc.getContent());
    
                bulkRequest.add(new IndexRequest(index).id(String.valueOf(doc.getId())).source(kv));
                //或者
                //bulkRequest.add(new IndexRequest(index).id(item.getID()).source(JSON.toJSONString(doc), XContentType.JSON));
            }
    
            try {
                // 客户端返回
                BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
                // responses.hasFailures(); // 是否失败,false表示成功!
                if(RestStatus.CREATED == responses.status()) {
                    return true;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }

    6.批量更新

    /**
         * 批量update
         * @param client
         * @param index
         * @param list
         * @return
         */
        public boolean updateDocByBulk(RestHighLevelClient client, String index, List<Doc> list) {
            //批量插入请求
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout("10s");
            for(int i = 0; i < list.size(); i++) {
                Doc doc = list.get(i);
                //这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新)
    
                Map<String, Object> kv = new HashMap<>();
                kv.put("id", doc.getId());
                kv.put("title", doc.getTitle());
                kv.put("content", doc.getContent());
    
                bulkRequest.add(new UpdateRequest().index(index).id(String.valueOf(doc.getId())).doc(kv));
            }
    
            try {
                // 客户端返回
                BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
                // responses.hasFailures(); // 是否失败,false表示成功!
                if(RestStatus.OK == responses.status()) {
                    return true;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }

    7.全量搜索

    7.1 利用scroll

    /**
         * 全量搜索
         * @param client
         * @param index
         * @param field1
         * @param field2
         * @param query
         * @param size
         * @param include
         * @param exclude
         * @return
         */
        public List<org.elasticsearch.search.SearchHit> searchByQueryScrollAll(RestHighLevelClient client, String index, String field1, String field2, String query, int size, String[] include, String[] exclude) {
            List<org.elasticsearch.search.SearchHit> result = CollectionUtil.newArrayList();
            final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
            SearchRequest searchRequest = new SearchRequest(index);
            searchRequest.scroll(scroll);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            // 高亮显示
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 高亮标签
            highlightBuilder.preTags("<a style='color: #e4393c'>");
            highlightBuilder.postTags("</a>");
            // 高亮字段
            highlightBuilder.field(field2);
            //设置最多一次能够取出(size)笔数据,从第(size + 1)笔数据开始,将开启滚动查询。  (滚动查询也属于这一次查询,只不过因为一次查不完,分多次查)
            searchSourceBuilder.size(size);
            //searchSourceBuilder.sort("_score", SortOrder.DESC);
            //socre相同,则按时间降序排序
            //searchSourceBuilder.sort("publish_date", SortOrder.DESC);
            //高亮显示添加到构造器(不需要高亮显示则不添加)
            searchSourceBuilder.highlighter(highlightBuilder);
            // 多字段联合查询
            //searchSourceBuilder.query(QueryBuilders.multiMatchQuery(query, field1, field2));
            searchSourceBuilder.query(QueryBuilders.boolQuery()
                    .should(QueryBuilders.matchQuery(field1, query))
                    .must(QueryBuilders.matchQuery(field2, query)));
            searchSourceBuilder.fetchSource(include, exclude);
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = null;
            try {
                searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            String scrollId = searchResponse.getScrollId();
            org.elasticsearch.search.SearchHit[] searchHits = searchResponse.getHits().getHits();
            while (searchHits != null && searchHits.length > 0) {
                for(org.elasticsearch.search.SearchHit hit: searchHits) {
                    //String highlightText = hit.getHighlightFields().get(field2).getFragments()[0].toString();
                    result.add(hit);
                }
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                try {
                    searchResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                scrollId = searchResponse.getScrollId();
                searchHits = searchResponse.getHits().getHits();
            }
    
            if(null != scrollId) {
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                // 滚动完成后清除滚动上下文
                ClearScrollResponse clearScrollResponse = null;
                try {
                    clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                //清除滚动是否成功
                boolean succeeded = clearScrollResponse.isSucceeded();
            }
            return result;
        }

    7.2利用after

    /**
         * 全量搜索
         * @param client
         * @param index
         * @param field1
         * @param field2
         * @param query
         * @param size
         * @param include
         * @param exclude
         * @return
         */
        public List<org.elasticsearch.search.SearchHit> searchByQuerySearchAfter(RestHighLevelClient client, String index, String field1, String field2, String query, int size, String[] include, String[] exclude) {
            List<org.elasticsearch.search.SearchHit> result = CollectionUtil.newArrayList();
            SearchRequest request = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            // 高亮显示
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 高亮标签
            highlightBuilder.preTags("<a style='color: #e4393c'>");
            highlightBuilder.postTags("</a>");
            // 高亮字段
            highlightBuilder.field(field2);
            QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                    .should(QueryBuilders.matchQuery(field1, query))
                    .must(QueryBuilders.matchQuery(field2, query));
    
            // searchSourceBuilder.query(QueryBuilders.matchQuery(field, query));
            searchSourceBuilder.query(queryBuilder);
            searchSourceBuilder.fetchSource(include, exclude);
            //每页显示条数
            searchSourceBuilder.size(size);
            // 需要唯一不重复的字段作为排序
            searchSourceBuilder.sort("_id", SortOrder.DESC);
            //searchSourceBuilder.sort("_score", SortOrder.DESC);
            //score相同,则按时间降序排序
            //searchSourceBuilder.sort("publish_date", SortOrder.DESC);
            //高亮显示添加到构造器(不需要高亮显示则不添加)
            searchSourceBuilder.highlighter(highlightBuilder);
            //构造器添加到搜索请求
            request.source(searchSourceBuilder);
            //客户端返回
            SearchResponse response = null;
            try {
                response = client.search(request, RequestOptions.DEFAULT);
                //搜索结果
                org.elasticsearch.search.SearchHit[] hits = response.getHits().getHits();
                while(hits.length > 0) {
                    for(org.elasticsearch.search.SearchHit hit : hits) {
                        result.add(hit);
                    }
                    org.elasticsearch.search.SearchHit last = hits[hits.length - 1];
                    searchSourceBuilder.searchAfter(last.getSortValues());
                    response = client.search(request, RequestOptions.DEFAULT);
                    hits = response.getHits().getHits();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return result;
        }

    8.统计分析

    /**
         * 多field统计
         * @param client
         * @param index
         * @param query
         * @param field1
         * @param field2
         * @param aggFields
         */
        public static Map<String, Map<String, Long>> countDocByTermsAgg(RestHighLevelClient client, String index, String query, String field1, String field2, String ... aggFields) {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.fetchSource(false);
            SearchRequest request = new SearchRequest(index);
            QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                    .should(QueryBuilders.matchQuery(field1, query))
                    .must(QueryBuilders.matchQuery(field2, query));
            searchSourceBuilder.query(queryBuilder);
            Map<String, Map<String, Long>>  fieldAggMap = CollectionUtil.newLinkedHashMap();
            TermsAggregationBuilder aggregationBuilder;
            SearchResponse response = null;
            for(String fieldName : aggFields) {
                aggregationBuilder = AggregationBuilders.terms("agg_name").field(fieldName);
                searchSourceBuilder.aggregation(aggregationBuilder);
                request.source(searchSourceBuilder);
                searchSourceBuilder.size(0);
                try {
                    response = client.search(request, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Aggregations aggregations = response.getAggregations();
                Terms byTopicAggregation = aggregations.get("agg_name");
                List<? extends Terms.Bucket> buckets = byTopicAggregation.getBuckets();
                Map<String, Long> bucketsFieldsAgg = CollectionUtil.newLinkedHashMap();
                buckets.forEach(b ->
                        bucketsFieldsAgg.put(b.getKeyAsString(), b.getDocCount())
                );
                fieldAggMap.put(fieldName, bucketsFieldsAgg);
            }
            return fieldAggMap;
        }
  • 相关阅读:
    LeetCode 40. 组合总和 II(Combination Sum II)
    LeetCode 129. 求根到叶子节点数字之和(Sum Root to Leaf Numbers)
    LeetCode 60. 第k个排列(Permutation Sequence)
    LeetCode 47. 全排列 II(Permutations II)
    LeetCode 46. 全排列(Permutations)
    LeetCode 93. 复原IP地址(Restore IP Addresses)
    LeetCode 98. 验证二叉搜索树(Validate Binary Search Tree)
    LeetCode 59. 螺旋矩阵 II(Spiral Matrix II)
    一重指针和二重指针
    指针的意义
  • 原文地址:https://www.cnblogs.com/little-horse/p/14846634.html
Copyright © 2011-2022 走看看