记得好几年前用es做过标签画像统计,如今再看es时已是很生疏了,再用时已更新到了7.12版本了。以前用TransportClient客户端,现在出了而且是官方推荐用RestHighLevelClient客户端。
这几天用RestHighLevelClient时还是觉得比较方便的。现将一些基本常用功能记录一下。
1.初始化和关闭
public static RestHighLevelClient getClient(String host, int port) { LOGGER.info("Init ES!"); client = new RestHighLevelClient( RestClient.builder(new HttpHost(host, port, "http"))); return client; } public static void closeES() { LOGGER.info("ES closed!"); if(null != client) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } }
2.创建index以及mapping
public boolean createIndexMapping(RestHighLevelClient client, String indexName){ LOGGER.info("create index and mapping ..."); CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards",1) .put("index.number_of_replicas",0)); try { XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties") .startObject("content") .field("type", "text") // 数据类型 .field("index", "true") //默认 .field("analyzer", "ik_max_word") .field("search_analyzer", "ik_smart") .endObject()
.startObject("date") .field("type", "date") // 数据类型 .field("index", "true") //默认 .endObject() .startObject("title") .field("type", "text") // 数据类型 .field("index", "true") //默认 .field("analyzer", "ik_max_word") .field("search_analyzer", "ik_smart") .endObject() .endObject() .endObject(); createIndexRequest.mapping(xContentBuilder); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); return createIndexResponse.isAcknowledged(); } catch (IOException e) { e.printStackTrace(); } return false; }
3.获取一篇doc
public Map<String, Object> getOneMap(RestHighLevelClient client, String index, String id) { GetRequest getRequest = new GetRequest(index, id); GetResponse getResponse = null; try { getResponse = client.get(getRequest, RequestOptions.DEFAULT); if(getResponse.isExists()) { return getResponse.getSource(); } } catch (IOException e) { e.printStackTrace(); } return null; }
4.删除多篇
/** * 批量删除 * @param client * @param index * @param ids * @return */ public Object deleteList(RestHighLevelClient client, String index, List<String> ids) { //构建批量删除请求 DeleteByQueryRequest request = new DeleteByQueryRequest(index); IdsQueryBuilder queryBuilder = new IdsQueryBuilder(); for(String id: ids) { queryBuilder.addIds(id); } // 匹配所有 request.setQuery(queryBuilder); BulkByScrollResponse response = null; try { response = client.deleteByQuery(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } return JSONObject.toJSON(response); }
5.批量导入
/** * 批量导入 * @param client * @param index * @param list * @return */ public boolean insertDocByBulk(RestHighLevelClient client, String index, List<Doc> list) { //批量插入请求 BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("10s"); for(int i = 0; i < list.size(); i++) { Doc doc = list.get(i); //这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新) Map<String, Object> kv = new HashMap<>(); kv.put("id", doc.getId()); kv.put("title", doc.getTitle()); kv.put("content", doc.getContent()); bulkRequest.add(new IndexRequest(index).id(String.valueOf(doc.getId())).source(kv)); //或者 //bulkRequest.add(new IndexRequest(index).id(item.getID()).source(JSON.toJSONString(doc), XContentType.JSON)); } try { // 客户端返回 BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT); // responses.hasFailures(); // 是否失败,false表示成功! if(RestStatus.CREATED == responses.status()) { return true; } } catch (IOException e) { e.printStackTrace(); } return false; }
6.批量更新
/** * 批量update * @param client * @param index * @param list * @return */ public boolean updateDocByBulk(RestHighLevelClient client, String index, List<Doc> list) { //批量插入请求 BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("10s"); for(int i = 0; i < list.size(); i++) { Doc doc = list.get(i); //这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新) Map<String, Object> kv = new HashMap<>(); kv.put("id", doc.getId()); kv.put("title", doc.getTitle()); kv.put("content", doc.getContent()); bulkRequest.add(new UpdateRequest().index(index).id(String.valueOf(doc.getId())).doc(kv)); } try { // 客户端返回 BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT); // responses.hasFailures(); // 是否失败,false表示成功! if(RestStatus.OK == responses.status()) { return true; } } catch (IOException e) { e.printStackTrace(); } return false; }
7.全量搜索
7.1 利用scroll
/** * 全量搜索 * @param client * @param index * @param field1 * @param field2 * @param query * @param size * @param include * @param exclude * @return */ public List<org.elasticsearch.search.SearchHit> searchByQueryScrollAll(RestHighLevelClient client, String index, String field1, String field2, String query, int size, String[] include, String[] exclude) { List<org.elasticsearch.search.SearchHit> result = CollectionUtil.newArrayList(); final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); SearchRequest searchRequest = new SearchRequest(index); searchRequest.scroll(scroll); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 高亮显示 HighlightBuilder highlightBuilder = new HighlightBuilder(); // 高亮标签 highlightBuilder.preTags("<a style='color: #e4393c'>"); highlightBuilder.postTags("</a>"); // 高亮字段 highlightBuilder.field(field2); //设置最多一次能够取出(size)笔数据,从第(size + 1)笔数据开始,将开启滚动查询。 (滚动查询也属于这一次查询,只不过因为一次查不完,分多次查) searchSourceBuilder.size(size); //searchSourceBuilder.sort("_score", SortOrder.DESC); //socre相同,则按时间降序排序 //searchSourceBuilder.sort("publish_date", SortOrder.DESC); //高亮显示添加到构造器(不需要高亮显示则不添加) searchSourceBuilder.highlighter(highlightBuilder); // 多字段联合查询 //searchSourceBuilder.query(QueryBuilders.multiMatchQuery(query, field1, field2)); searchSourceBuilder.query(QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery(field1, query)) .must(QueryBuilders.matchQuery(field2, query))); searchSourceBuilder.fetchSource(include, exclude); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = null; try { searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } String scrollId = searchResponse.getScrollId(); org.elasticsearch.search.SearchHit[] searchHits = searchResponse.getHits().getHits(); while (searchHits != null && searchHits.length > 0) { for(org.elasticsearch.search.SearchHit hit: searchHits) { //String highlightText = hit.getHighlightFields().get(field2).getFragments()[0].toString(); result.add(hit); } SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId); searchScrollRequest.scroll(scroll); try { searchResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } scrollId = searchResponse.getScrollId(); searchHits = searchResponse.getHits().getHits(); } if(null != scrollId) { ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); // 滚动完成后清除滚动上下文 ClearScrollResponse clearScrollResponse = null; try { clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } //清除滚动是否成功 boolean succeeded = clearScrollResponse.isSucceeded(); } return result; }
7.2利用after
/** * 全量搜索 * @param client * @param index * @param field1 * @param field2 * @param query * @param size * @param include * @param exclude * @return */ public List<org.elasticsearch.search.SearchHit> searchByQuerySearchAfter(RestHighLevelClient client, String index, String field1, String field2, String query, int size, String[] include, String[] exclude) { List<org.elasticsearch.search.SearchHit> result = CollectionUtil.newArrayList(); SearchRequest request = new SearchRequest(index); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 高亮显示 HighlightBuilder highlightBuilder = new HighlightBuilder(); // 高亮标签 highlightBuilder.preTags("<a style='color: #e4393c'>"); highlightBuilder.postTags("</a>"); // 高亮字段 highlightBuilder.field(field2); QueryBuilder queryBuilder = QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery(field1, query)) .must(QueryBuilders.matchQuery(field2, query)); // searchSourceBuilder.query(QueryBuilders.matchQuery(field, query)); searchSourceBuilder.query(queryBuilder); searchSourceBuilder.fetchSource(include, exclude); //每页显示条数 searchSourceBuilder.size(size); // 需要唯一不重复的字段作为排序 searchSourceBuilder.sort("_id", SortOrder.DESC); //searchSourceBuilder.sort("_score", SortOrder.DESC); //score相同,则按时间降序排序 //searchSourceBuilder.sort("publish_date", SortOrder.DESC); //高亮显示添加到构造器(不需要高亮显示则不添加) searchSourceBuilder.highlighter(highlightBuilder); //构造器添加到搜索请求 request.source(searchSourceBuilder); //客户端返回 SearchResponse response = null; try { response = client.search(request, RequestOptions.DEFAULT); //搜索结果 org.elasticsearch.search.SearchHit[] hits = response.getHits().getHits(); while(hits.length > 0) { for(org.elasticsearch.search.SearchHit hit : hits) { result.add(hit); } org.elasticsearch.search.SearchHit last = hits[hits.length - 1]; searchSourceBuilder.searchAfter(last.getSortValues()); response = client.search(request, RequestOptions.DEFAULT); hits = response.getHits().getHits(); } } catch (IOException e) { e.printStackTrace(); } return result; }
8.统计分析
/** * 多field统计 * @param client * @param index * @param query * @param field1 * @param field2 * @param aggFields */ public static Map<String, Map<String, Long>> countDocByTermsAgg(RestHighLevelClient client, String index, String query, String field1, String field2, String ... aggFields) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.fetchSource(false); SearchRequest request = new SearchRequest(index); QueryBuilder queryBuilder = QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery(field1, query)) .must(QueryBuilders.matchQuery(field2, query)); searchSourceBuilder.query(queryBuilder); Map<String, Map<String, Long>> fieldAggMap = CollectionUtil.newLinkedHashMap(); TermsAggregationBuilder aggregationBuilder; SearchResponse response = null; for(String fieldName : aggFields) { aggregationBuilder = AggregationBuilders.terms("agg_name").field(fieldName); searchSourceBuilder.aggregation(aggregationBuilder); request.source(searchSourceBuilder); searchSourceBuilder.size(0); try { response = client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } Aggregations aggregations = response.getAggregations(); Terms byTopicAggregation = aggregations.get("agg_name"); List<? extends Terms.Bucket> buckets = byTopicAggregation.getBuckets(); Map<String, Long> bucketsFieldsAgg = CollectionUtil.newLinkedHashMap(); buckets.forEach(b -> bucketsFieldsAgg.put(b.getKeyAsString(), b.getDocCount()) ); fieldAggMap.put(fieldName, bucketsFieldsAgg); } return fieldAggMap; }