1、版本依赖
注意对 transport client不了解先阅读官方文档:
transport client(传送门)
这里需要版本匹配,如失败查看官网或百度。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jiatp</groupId>
<artifactId>springboot-03-rest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-03-rest</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.3.2</version>
</dependency>
<!-- Java Low Level REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.3.2</version>
</dependency>
<!-- Java High Level REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.2</version>
</dependency>
<!-- json转换 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.配置客户端
ElasticsearchConfig.java
package com.jiatp.springboot.config;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
@Value("${elasticsearch.schema}")
private String schema;
@Value("${elasticsearch.connectTimeOut}")
private int connectTimeOut;
@Value("${elasticsearch.socketTimeOut}")
private int socketTimeOut;
@Value("${elasticsearch.connectionRequestTimeOut}")
private int connectionRequestTimeOut;
@Value("${elasticsearch.maxConnectNum}")
private int maxConnectNum;
@Value("${elasticsearch.maxConnectPerRoute}")
private int maxConnectPerRoute;
private HttpHost httpHost;
private boolean uniqueConnectTimeConfig = true;
private boolean uniqueConnectNumConfig = true;
private RestClientBuilder builder;
private RestHighLevelClient client;
/**
* 返回一个RestHighLevelClient
*
* @return
*/
@Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClient")
public RestHighLevelClient client() {
httpHost= new HttpHost(host, port, schema);
builder = RestClient.builder(httpHost);
if (uniqueConnectTimeConfig) {
setConnectTimeOutConfig();
}
if (uniqueConnectNumConfig) {
setMutiConnectConfig();
}
client = new RestHighLevelClient(builder);
return client;
}
/**
* 异步httpclient的连接延时配置
*/
public void setConnectTimeOutConfig() {
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(connectTimeOut);
requestConfigBuilder.setSocketTimeout(socketTimeOut);
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
return requestConfigBuilder;
}
});
}
/**
* 异步httpclient的连接数配置
*/
public void setMutiConnectConfig() {
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setMaxConnTotal(maxConnectNum);
httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
return httpClientBuilder;
}
});
}
/**
* 关闭连接
*/
public void close() {
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
application.yml
elasticsearch:
host: 192.168.x.x
port: 9200
schema: http
connectTimeOut: 1000
socketTimeOut: 30000
connectionRequestTimeOut: 500
maxConnectNum: 100
maxConnectPerRoute: 100
3、测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot03RestApplicationTests {
@Qualifier(value = "restHighLevelClient")
@Autowired
RestHighLevelClient restHighLevelClient;
String indexName="student";
String esType="msg";
@Test
public void contextLoads() throws IOException{
RestClient restClient = RestClient.builder(
new HttpHost("192.168.56.101", 9200, "http")).build();
//(1) 执行一个基本的方法,验证es集群是否搭建成功
Response response = restClient.performRequest("GET", "/", Collections.singletonMap("pretty", "true"));
System.out.println(EntityUtils.toString(response.getEntity()));
}
当现实create时则表明没问题。
其它测试:
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot03RestApplicationTests {
@Qualifier(value = "restHighLevelClient")
@Autowired
RestHighLevelClient restHighLevelClient;
String indexName="student";
String esType="msg";
@Test
public void contextLoads() throws IOException{
RestClient restClient = RestClient.builder(
new HttpHost("192.168.56.101", 9200, "http")).build();
//(1) 执行一个基本的方法,验证es集群是否搭建成功
Response response = restClient.performRequest("GET", "/", Collections.singletonMap("pretty", "true"));
System.out.println(EntityUtils.toString(response.getEntity()));
}
//创建索引
@Test
public void createIndex(){
//index名必须全小写,否则报错
String index ="book";
CreateIndexRequest request = new CreateIndexRequest(index);
try {
CreateIndexResponse indexResponse = restHighLevelClient.indices().create(request);
if (indexResponse.isAcknowledged()) {
System.out.println("创建索引成功");
} else {
System.out.println("创建索引失败");
}
} catch (IOException e) {
e.printStackTrace();
}
}
//检查索引
@Test
public void findIndex()throws Exception{
try {
Response response = restHighLevelClient.getLowLevelClient().performRequest("HEAD", "book");
boolean exist = response.getStatusLine().getReasonPhrase().equals("OK");
System.out.println(exist);
} catch (IOException e) {
e.printStackTrace();
}
}
//插入数据
@Test
public void addData(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", 3);
jsonObject.put("age", 26);
jsonObject.put("name", "wangwu");
jsonObject.put("date", new Date());
IndexRequest indexRequest = new IndexRequest(indexName, esType, "2").source(jsonObject);
try {
IndexResponse indexResponse = restHighLevelClient.index(indexRequest);
System.out.println(indexResponse.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* 使用XContentBuilder添加数据
* */
@Test
public void addData1() throws Exception{
XContentBuilder builder = jsonBuilder();
builder.startObject();
{
builder.field("user", "jiatp");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "3")
.source(builder).routing("my_route");//可以添加指定路由
IndexResponse response = restHighLevelClient.index(indexRequest);
System.out.println(response.status().name());
}
/*
* 使用Object key-pairs对象键
* */
@Test
public void addData2() throws Exception{
IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "3")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
IndexResponse response = restHighLevelClient.index(indexRequest);
System.out.println(response.status().name());
}
//异步方式
@Test
public void testAddAsync() throws InterruptedException {
ActionListener listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println("Async:" + indexResponse.status().name());
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// Todo
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
// Todo
}
// 处理成功分片小于总分片的情况
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// Todo
}
}
@Override
public void onFailure(Exception e) {
System.out.println("AsyncFailure:" + e.getMessage());
e.printStackTrace();
}
};
IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "4")
.source("user", "luxi",
"postDate", new Date(),
"message", "trying out Elasticsearch");
restHighLevelClient.indexAsync(indexRequest, listener); // 异步方式
Thread.sleep(2000);
}
/*
* 查询
*
* */
// 指定routing的数据,查询也要指定
@Test
public void searchRoute()throws Exception{
GetRequest request = new GetRequest("twitter", "t_doc", "3").routing("my_route"); // 指定routing的数据,查询也要指定
GetResponse response = restHighLevelClient.get(request);
System.out.println(response.getSourceAsString());
}
//查询-额外参数 异步获取
@Test
public void getOneOp() throws IOException, InterruptedException {
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse documentFields) {
System.out.println(documentFields.getSourceAsString());
}
@Override
public void onFailure(Exception e) {
System.out.println("Error:" + e.getMessage());
e.printStackTrace();
}
};
GetRequest request = new GetRequest("twitter", "t_doc", "2");
String[] includes = new String[]{"message", "*Date"}; // 包含的字段
String[] excludes = Strings.EMPTY_ARRAY; // 排除的字段
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
restHighLevelClient.getAsync(request,listener);
Thread.sleep(2000);
}
//查询所有
@Test
public void searchAll(){
HttpEntity entity = new NStringEntity(
"{ "query": { "match_all": {}}}",
ContentType.APPLICATION_JSON);
String endPoint = "/" + indexName + "/" + esType + "/_search";
try {
Response response = restHighLevelClient.getLowLevelClient()
.performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
System.out.println(EntityUtils.toString(response.getEntity()));
} catch(IOException e) {
e.printStackTrace();
}
}
//条件查询 姓名:李四
@Test
public void test(){
try {
String endPoint = "/" + indexName + "/" + esType + "/_search";
IndexRequest indexRequest = new IndexRequest();
XContentBuilder builder;
try {
builder = JsonXContent.contentBuilder()
.startObject()
.startObject("query")
.startObject("match")
.field("name.keyword", "lisi")
.endObject()
.endObject()
.endObject();
indexRequest.source(builder);
} catch (IOException e) {
e.printStackTrace();
}
String source = indexRequest.source().utf8ToString();
System.out.println(source);
HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
System.out.println(EntityUtils.toString(response.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}
//条件查询 叫kimchy的
@Test
public void testSearch(){
try {
SearchRequest searchRequest = new SearchRequest("twitter");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
sourceBuilder.from(0);
sourceBuilder.size(5);
searchRequest.source(sourceBuilder);
SearchResponse response = restHighLevelClient.search(searchRequest);
System.out.println("Hits:" + response.getHits().totalHits);
response.getHits().forEach(e -> {
System.out.println(e.getSourceAsString()); });
} catch(IOException e) {
e.printStackTrace();
}
}
/**
* * 查询名字等于 lisi
* 并且年龄在20和40之间
*/
@Test
public void serarchFuhe(){
try {
String endPoint = "/" + indexName + "/" + esType + "/_search";
IndexRequest indexRequest = new IndexRequest();
XContentBuilder builder;
try {
builder = JsonXContent.contentBuilder()
.startObject()
.startObject("query")
.startObject("bool")
.startObject("must")
.startObject("match")
.field("name.keyword", "lisi")
.endObject()
.endObject()
.startObject("filter")
.startObject("range")
.startObject("age")
.field("gte", "20")
.field("lte", "40")
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
indexRequest.source(builder);
} catch (IOException e) {
e.printStackTrace();
}
String source = indexRequest.source().utf8ToString();
System.out.println(source);
HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
System.out.println(EntityUtils.toString(response.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 存在即更新【输出:OK】
* OK
* {"C":"Carambola","A":"Apple","B":"Banana"}
* 不存在则创建【输出:CREATED】
* CREATED
* {"C":"Carambola"}
* 开启scriptedUpsert【在文档不存在情况下输出:CREATED】
* {"A" : "Apple","B" : "Banana","C" : "Carambola"}
*/
@Test
public void testUpdate() throws IOException {
UpdateRequest request = new UpdateRequest("twitter", "t_doc", "7")
.script(new Script(ScriptType.INLINE,"painless",
"ctx._source.A='Apple';ctx._source.B='Banana'",Collections.EMPTY_MAP))
// 如果文档不存在,使用upsert方法定义一些内容,这些内容将作为新文档插入
.upsert(jsonBuilder()
.startObject()
.field("C","Carambola")
.endObject());
request.timeout(TimeValue.timeValueSeconds(2)); // 2秒超时
//request.scriptedUpsert(true); // 无论文档是否存在,脚本都必须运行
UpdateResponse update = restHighLevelClient.update(request);
System.out.println(update.status().name());
}
//删除
@Test
public void delete(){
String endPoint = "/" + indexName + "/" + esType + "/_delete_by_query";
/**
* 删除条件
*/
IndexRequest indexRequest = new IndexRequest();
XContentBuilder builder;
try {
builder = JsonXContent.contentBuilder()
.startObject()
.startObject("query")
.startObject("term")
//name中包含deleteText
.field("name.keyword", "wangwu")
.endObject()
.endObject()
.endObject();
indexRequest.source(builder);
} catch (IOException e) {
e.printStackTrace();
}
String source = indexRequest.source().utf8ToString();
HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
try {
Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
System.out.println(EntityUtils.toString(response.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
可看api进行测试,https://blog.csdn.net/jatpen/article/details/102631110