以下示例基于elasticsearch 5.3.0
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.3.0</version>
</dependency>
一、配置和连接
配置
elasticsearch:
cluster-name: my-eshop
servers:
- 127.0.0.1:9300
- 127.0.0.1:9301
xpack:
enable: false
username: elastic
password: changeme
@Bean
public TransportClient transportClient(ElasticSearchProperties elasticSearchProperties) {
log.info(elasticSearchProperties.toString());
XPackProperties xpack = elasticSearchProperties.getXpack();
TransportClient client;
if(xpack.getEnable()){
client = new PreBuiltXPackTransportClient(Settings.builder()
.put("cluster.name", elasticSearchProperties.getClusterName())
.put("xpack.security.user", elasticSearchProperties.getXpack().getUsername() + ":" + elasticSearchProperties.getXpack().getPassword())
.build());
}else{
Settings settings = Settings.builder()
.put("cluster.name", elasticSearchProperties.getClusterName())
.build();
client = new PreBuiltTransportClient(settings);
}
if(CollectionUtils.isEmpty(elasticSearchProperties.getServers())){
throw new RuntimeException("ES集群配置不可为空");
}
TransportClient finalClient = client;
elasticSearchProperties.getServers().forEach(item->{
try {
String[] split = item.split(":");
finalClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.parseInt(split[1])));
} catch (UnknownHostException e) {
log.error("",e);
}
});
return finalClient;
}
@Data
@Component
@ConfigurationProperties(prefix= "elasticsearch")
public class ElasticSearchProperties {
private String clusterName;
private List<String> servers;
private XPackProperties xpack;
}
@Data
@Component
@ConfigurationProperties(prefix = "elasticsearch.xpack")
public class XPackProperties {
private String username;
private String password;
private Boolean enable = false;
}
二、序列化、反序列化工具类
在正式开始前,需要先创建好序列化、反序列化工具类
import cn.hutool.core.date.DatePattern;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* @author kdyzm
* @date 2021/6/21
*/
public class ObjectMapperFactory {
public static ObjectMapper getObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
JavaTimeModule timeModule = new JavaTimeModule();
timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
objectMapper.registerModule(timeModule);
return objectMapper;
}
/**
* Jackson序列化需要的类
*/
static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
@Override
public void serialize(LocalDateTime localDateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(localDateTime.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
}
}
/**
* Jackson反序列化需要的类
*/
static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
@Override
public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
String timestamp = jsonParser.getValueAsString();
return LocalDateTime.parse(timestamp, DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN));
}
}
}
三、API-数据保存
1. 保存一条数据
try {
transportClient.prepareIndex("test-index", "test")
.setSource(ObjectMapperFactory.getObjectMapper().writeValueAsBytes(entity), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
} catch (JsonProcessingException e) {
log.error("", e);
}
2. 批量保存数据
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
entities.forEach(item -> {
try {
bulkRequestBuilder.add(transportClient.prepareIndex("test-index", "test")
.setSource(objectMapper.writeValueAsBytes(item), XContentType.JSON));
} catch (JsonProcessingException e) {
log.error("", e);
}
});
bulkRequestBuilder.execute().actionGet();
四、API-数据删除
1.删除一条数据
DeleteByQueryAction.INSTANCE
.newRequestBuilder(transportClient)
.filter(QueryBuilders.termQuery("code", code))
.source("test-index")
.get();
2.批量删除数据
public void deleteBatch(List<String> codes) {
DeleteByQueryAction.INSTANCE
.newRequestBuilder(transportClient)
.filter(QueryBuilders.termsQuery("code", codes))
.source(EsIndexEnum.RLZY_SERVICE.indexName())
.get();
}
五、API-数据更新
1.更新一条数据
transportClient.prepareUpdate()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setIndex(EsIndexEnum.RLZY_SERVICE.indexName())
.setType("data")
.setId(id)
.setDoc(objectMapper.writeValueAsBytes(oldValue), XContentType.JSON)
.get();
2.批量更新某个字段
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(transportClient);
updateByQuery.source("test-index")
.filter(QueryBuilders.termsQuery("code", codes))
.script(new Script("ctx._source['show']='" + false + "'"));
BulkByScrollResponse response = updateByQuery.get();
long updated = response.getUpdated();
log.info("批量修改了{}条数据", updated);
六、API-数据查询
1.查询一条数据
public T queryOneByBaseId(String code) {
QueryBuilder termQuery = QueryBuilders.termQuery("code", code);
SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("test_index")
.setQuery(termQuery)
.setFetchSource(true);
SearchResponse searchResponse = searchRequestBuilder
.execute()
.actionGet();
SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits() <= 0) {
log.info("未查询到 code={} 的记录", baseId);
return null;
}
if (hits.getTotalHits() > 1) {
throw new RuntimeException("code=" + baseId + " 存在多条记录");
}
ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
SearchHit data = hits.getAt(0);
String sourceAsString = data.getSourceAsString();
try {
return objectMapper.readValue(sourceAsString, Class<T>);
} catch (JsonProcessingException e) {
log.error("", e);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
2.多条件查询
public List<T> queryByCondition(int pageNum, int pageSize, boolean showAll, T req) {
BoolQueryBuilder mixQuery = QueryBuilders.boolQuery();
boolean queryAll = true;
if (!StringUtils.isEmpty(req.getTitle())) {
MultiMatchQueryBuilder query = QueryBuilders
.multiMatchQuery(req.getTitle(), "title");
mixQuery.must(query);
queryAll = false;
}
if (!showAll) {
TermQueryBuilder query = QueryBuilders
.termQuery(req.getShow().toString(), Boolean.TRUE);
mixQuery.must(query);
queryAll = false;
}
FieldSortBuilder sortBuilder = null;
if (queryAll) {
MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
mixQuery.must(matchAllQueryBuilder);
sortBuilder = SortBuilders.fieldSort("create_date").order(SortOrder.DESC).unmappedType("create_date");
}
SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("test_index")
.setQuery(mixQuery)
.setFrom((pageNum - 1) * pageSize)
.setSize(pageSize)
.setFetchSource(true);
if (Objects.nonNull(sortBuilder)) {
searchRequestBuilder.addSort(sortBuilder);
}
SearchResponse searchResponse = searchRequestBuilder
.execute()
.actionGet();
SearchHits hits = searchResponse.getHits();
List<T> data = new ArrayList<>();
ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
if (hits.getTotalHits() <= 0) {
return new ArrayList<>();
}
hits.forEach(item -> {
T entity = null;
try {
entity = objectMapper.readValue(item.getSourceAsString(), T.class);
} catch (JsonProcessingException e) {
log.error("", e);
} catch (IOException e) {
e.printStackTrace();
}
data.add(entity);
});
return data;
}
3.分组统计查询
List<T> list = new ArrayList<>();
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("inf_group_statistic").field("inf_group").size(10000);
SearchRequestBuilder aggregationBuilders = transportClient.prepareSearch(SearchModuleEnum.IMFORMATION_MODULE.index().split(","))
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("show", "true")))
.addAggregation(aggregationBuilder)
.setFetchSource(false);
SearchResponse searchResponse1 = aggregationBuilders.get();
Terms terms = searchResponse1.getAggregations().get("inf_group_statistic");
List<Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
T res = new T();
res.setIndustry(key);
res.setNum((int) docCount);
list.add(res);
}
其最终形成的查询和下面的形式差不多
POST {{es-host}}/test_index/data/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"show": true
}
},
{
"exists": {
"field": "event_classification"
}
}
]
}
},
"size": 0,
"aggs": {
"group_by_tags": {
"terms": {
"size": 1000,
"field": "event_classification"
}
}
}
}
4.统计总数量和总金额
SearchRequestBuilder aggregationBuilders = transportClient.prepareSearch(
EsIndexEnum.POLICY_HALL.indexName())
.setQuery(mixQuery)
.addAggregation(AggregationBuilders.sum("matchMoney").field("money"))
.setFetchSource(false);
SearchResponse searchResponse1 = aggregationBuilders.get();
Sum matchMoney = searchResponse1.getAggregations().get("matchMoney");
result.setMatchMoney(BigDecimal.valueOf(matchMoney.getValue()).setScale(2,BigDecimal.ROUND_HALF_UP).toPlainString());
result.setMatchNum((int) hits.getTotalHits());
5.高亮显示
SearchResponse searchResponse = searchRequestBuilder
.execute()
.actionGet();
SearchHits hits = searchResponse.getHits();HighlightBuilder highlightBuilder = new HighlightBuilder()
.preTags("<span class="searchHighlight">")
.postTags("</span>")
.field("title")
.field("second_title")
.field("content");
SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch(
SearchModuleEnum.get(req.getModuleType()).index().split(","))
.setQuery(mixQuery)
.highlighter(highlightBuilder)
.setFrom((req.getPageNum() - 1) * req.getPageSize())
.setSize(req.getPageSize())
.setFetchSource(true);
SearchResponse searchResponse = searchRequestBuilder
.execute()
.actionGet();
SearchHits hits = searchResponse.getHits();
hits.forEach(item -> {
String index = item.getIndex();
String sourceAsString = item.getSourceAsString();
Map<String, HighlightField> highlightFieldMap = item.highlightFields();
HighlightField titleHighlight = highlightFieldMap.get("title");
StringBuilder title = new StringBuilder();
if (Objects.nonNull(titleHighlight)) {
for (Text fragment : titleHighlight.fragments()) {
title.append(fragment);
}
}
StringBuilder secondTitle = new StringBuilder();
HighlightField secondTitleHighlight = highlightFieldMap.get("second_title");
if (Objects.nonNull(secondTitleHighlight)) {
for (Text fragment : secondTitleHighlight.fragments()) {
secondTitle.append(fragment);
}
}
StringBuilder content = new StringBuilder();
HighlightField contentHighlight = highlightFieldMap.get("content");
if (Objects.nonNull(contentHighlight)) {
for (Text fragment : contentHighlight.fragments()) {
content.append(fragment);
}
}
});
6.高权重查询
比如titel查询的权重是content的两倍
MultiMatchQueryBuilder query = QueryBuilders
.multiMatchQuery(req.getSearchKey())
.field("title", 2)
.field("second_title", 1.2f)
.field("content", AbstractQueryBuilder.DEFAULT_BOOST);
mixQuery.must(query);