先来几个测试方法, pom在最下面:
完整代码 : https://github.com/lifan12589/infinite-possibilities/tree/master/springboot_stu/es-Api
创建索引:
@Test @SneakyThrows public void createIndex() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); CreateIndexRequest request = new CreateIndexRequest("test_index"); request.settings(Settings.builder() .put("index.number_of_shards", 3)//3个分片 .put("index.number_of_replicas", 2)//2个副本 ); CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); if (createIndexResponse.isAcknowledged()) { System.out.println("创建index成功!"); } else { System.out.println("创建index失败!"); } client.close(); }
查询索引名称:
@Test @SneakyThrows public void getIndex() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); GetIndexRequest request = new GetIndexRequest("se*");//查询se开头的索引名称 GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); String[] indices = response.getIndices(); for (String indexName : indices) { System.out.println("index name:" + indexName); } client.close(); }
删除索引:
@Test @SneakyThrows public void delIndex() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); DeleteIndexRequest request = new DeleteIndexRequest("test_index");//要删除的索引名称 AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { System.out.println("删除index成功!"); } else { System.out.println("删除index失败!"); } client.close(); }
插入数据:
@Test @SneakyThrows public void insertData() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); //从数据库查询 List<ProductInfo> list = productInfoMapper.selectAll(); System.out.println("list : "+list); //插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyyMM。 IndexRequest request = new IndexRequest("index_product"); //最好不要自定义id 会影响插入速度。 ProductInfo product = list.get(0); Gson gson = new Gson(); request.id(product.getId().toString()); request.source(gson.toJson(product) , XContentType.JSON); IndexResponse response = client.index(request, RequestOptions.DEFAULT); System.out.println(response); client.close(); }
批量插入:
@Test @SneakyThrows public void batchInsertData() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); //查询数据库 批量插入数据,更新和删除同理 BulkRequest request = new BulkRequest("index_product"); Gson gson = new Gson(); //从数据库查询 List<ProductInfo> lists = productInfoMapper.selectAll(); for (ProductInfo list: lists) { System.out.println(gson.toJson(list)); request.add(new IndexRequest().source(gson.toJson(list) , XContentType.JSON)); } BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); System.out.println("数量:" + response.getItems().length); client.close(); }
根据 id 查询:
@Test @SneakyThrows public void getById() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); //注意 这里查询使用的是别名。 GetRequest request = new GetRequest("index_product", "mQYKzncBSw_3e2MkFkmH"); //只查询特定字段。如果需要查询所有字段则不设置该项。 String[] includes = {"itemname", "itemcode","inputDate"}; String[] excludes = {"desc"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext); GetResponse response = client.get(request, RequestOptions.DEFAULT); System.out.println(response); client.close(); }
根据 id 查询多条数据:
@Test public void multiGetById() throws IOException { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); //多个结果 根据id查询 MultiGetRequest request = new MultiGetRequest(); request.add("index_product", "8Sj00XcB2CbVnAcWca2M"); request.add("index_product", "9Sj00XcB2CbVnAcWca2M"); //两种写法 // request.add(new MultiGetRequest.Item( // "test_index", // "ewbmzXcBSw_3e2MkIkk-")); MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT); for (MultiGetItemResponse itemResponse : response) { System.out.println(itemResponse.getResponse().getSourceAsString()); } client.close(); }
根据 id 删除:
@Test public void delById() throws IOException { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); DeleteRequest request = new DeleteRequest("ac-new", "6"); DeleteResponse response = client.delete(request, RequestOptions.DEFAULT); System.out.println(response); client.close(); }
根据某个属性就行批量更改:
@Test public void updateByQuery() throws IOException { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http"), new HttpHost("localhost", 9202, "http"))); UpdateByQueryRequest request = new UpdateByQueryRequest("index_product"); //默认情况下,版本冲突会中止 UpdateByQueryRequest 进程,但是你可以用以下命令来代替 //设置版本冲突继续 // request.setConflicts("proceed"); //设置更新条件 加 keyword 可以精确匹配,不会被分词 request.setQuery(QueryBuilders.matchQuery("itemname.keyword","具体属性值")); // //限制更新条数 request.setMaxDocs(8); request.setScript( new Script(ScriptType.INLINE, "painless", "ctx._source.itemname+='#';", Collections.emptyMap())); BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT); System.out.println(response); client.close(); }
嗅探器:
@Test public void sniffer() throws IOException { // region 监听器 SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener(); //获取客户端 RestClient restClient = RestClient.builder( new HttpHost("localhost",9200,"http"), new HttpHost("localhost",9201,"http"), new HttpHost("localhost",9202,"http")) .setFailureListener(sniffOnFailureListener) .build(); //设置 https NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer( restClient, ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT, ElasticsearchNodesSniffer.Scheme.HTTPS ); //为 restClient 绑定 嗅探器 Sniffer sniffer = Sniffer.builder(restClient) .setSniffIntervalMillis(5000) //设置连续两次普通嗅探执行之间的间隔(以毫秒为单位) .setSniffAfterFailureDelayMillis(30000) //设置失败后计划执行嗅探的延迟(以毫秒为单位) .setNodesSniffer(nodesSniffer) .build(); sniffOnFailureListener.setSniffer(sniffer); //先关嗅探器, 后关客户端 sniffer.close(); restClient.close(); }
查看节点信息:
@Test @SneakyThrows public void snifferT(){ RestHighLevelClient client = ESClient.getEsClient().getHighLevelClient(); Iterator<Node> nodes = client.getLowLevelClient().getNodes().iterator(); while (nodes.hasNext()){ Node node = nodes.next(); System.out.println("初始化节点 : " + node); } Thread.sleep(60000); System.out.println("准备二次扫描:"); nodes = client.getLowLevelClient().getNodes().iterator(); while (nodes.hasNext()){ Node node = nodes.next(); System.out.println("二次扫描 : "+node); } Thread.sleep(60000); System.out.println("准备三次扫描:"); nodes = client.getLowLevelClient().getNodes().iterator(); while (nodes.hasNext()){ Node node = nodes.next(); System.out.println("三次扫描 : "+node); } ESClient.getEsClient().closeClient(); }
查询数据库数据,插入ES:
@Test @SneakyThrows public void bulkInit(){ RestHighLevelClient client = ESClient.getEsClient().getHighLevelClient(); GetIndexRequest request = new GetIndexRequest("bulk_index"); Boolean exists = client.indices().exists(request,RequestOptions.DEFAULT); if(!exists) { CreateIndexRequest createIndexRequest = new CreateIndexRequest("bulk_index"); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 3) //3个 主分片 .put("index.number_of_replicas", 2)); //2个副本 (每个主分片对应2个副本) CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest,RequestOptions.DEFAULT); int i = createIndexResponse.hashCode(); boolean acknowledged = createIndexResponse.isAcknowledged(); System.out.println("创建索引 ACK : "+acknowledged+" 索引 code : "+i); } List<BulkInfo> list = bulkInfoMapper.selectAll(); BulkRequest bulkRequest = new BulkRequest("bulk_index"); Gson gson = new Gson(); for (int i=0;i<list.size();i++){ //数据库时间转换 Date date = new Date(list.get(i).getInputDate().getTime()); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd"); String sd = sdf.format(date); //替换时间字段 String gsons = gson.toJson(list.get(i)); JSONObject json = new JSONObject(gsons); json.put("inputDate",sd); bulkRequest.add(new IndexRequest().id(Integer.toString(i)).source(json.toString(),XContentType.JSON)); } BulkResponse response = client.bulk(bulkRequest,RequestOptions.DEFAULT); System.out.println("插入条数 : "+response.getItems().length); ESClient.getEsClient().closeClient(); }
ESClient:
package com.infinitePossibilities.util; import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.message.BasicHeader; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.sniff.Sniffer; import java.io.IOException; public class ESClient { private static ESClient esClient; private String host = "localhost:9200,localhost:9201,localhost:9202"; private RestClientBuilder builder; private static Sniffer sniffer; private static RestHighLevelClient highLevelClient; private ESClient(){ } public static ESClient getEsClient(){ if(esClient == null){ synchronized (ESClient.class){ if(esClient == null){ esClient = new ESClient(); esClient.initBuilder(); } } } return esClient; } public RestClientBuilder initBuilder(){ String [] hosts = host.split(","); HttpHost[] httpHosts = new HttpHost[hosts.length]; for(int i = 0;i<hosts.length;i++){ String [] host = hosts[i].split(":"); httpHosts[i] = new HttpHost(host[0],Integer.parseInt(host[1]),"http"); } //region 在Builder中设置请求头 // 1.设置请求头 builder = RestClient.builder(httpHosts); Header[] defaultHeader = new Header[]{ new BasicHeader("Content-type","application/json") }; builder.setDefaultHeaders(defaultHeader); return builder; } public RestHighLevelClient getHighLevelClient(){ if(highLevelClient == null){ synchronized (ESClient.class){ if(highLevelClient == null){ highLevelClient = new RestHighLevelClient(builder); //开启嗅探器 sniffer = Sniffer.builder(highLevelClient.getLowLevelClient()) .setSniffIntervalMillis(5000) //设置连续两次普通嗅探执行之间的间隔(以毫秒为单位) .setSniffAfterFailureDelayMillis(15000) //设置失败后计划执行嗅探的延迟(以毫秒为单位) .build(); } } } return highLevelClient; } public void closeClient(){ if(null!=highLevelClient){ try { sniffer.close(); highLevelClient.close(); } catch (IOException e) { e.printStackTrace(); } } } }
pom :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.infinitePossibilities</groupId> <artifactId>es</artifactId> <version>0.0.1-SNAPSHOT</version> <name>es</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> <version>1.4.0</version> <type>maven-plugin</type> </dependency> <!--MyBatis-Plus启动器 <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.1</version> </dependency> --> <!-- mysql:MyBatis相关依赖--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <!-- ES transport client--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client-sniffer</artifactId> <version>7.6.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.6.2</version> </dependency> <!-- mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- mysql:阿里巴巴数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.12</version> </dependency> <!-- lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <optional>true</optional> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>