zoukankan      html  css  js  c++  java
  • 3.3_springBoot2.1.x检索之RestHighLevelClient方式

    1、版本依赖

    注意对 transport client不了解先阅读官方文档:

    transport client(传送门)

    这里需要版本匹配,如失败查看官网或百度。

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.jiatp</groupId>
        <artifactId>springboot-03-rest</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>springboot-03-rest</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- Java Low Level REST Client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- Java High Level REST Client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>6.3.2</version>
            </dependency>
            <!-- json转换 -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.62</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    2.配置客户端

    ElasticsearchConfig.java
    package com.jiatp.springboot.config;
    
    
    import org.apache.http.HttpHost;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.config.RequestConfig.Builder;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.springframework.beans.factory.annotation.Autowire;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.io.IOException;
    
    @Configuration
    public class ElasticsearchConfig {
    
        @Value("${elasticsearch.host}")
        private String host;
        @Value("${elasticsearch.port}")
        private int port;
        @Value("${elasticsearch.schema}")
        private String schema;
        @Value("${elasticsearch.connectTimeOut}")
        private int connectTimeOut;
        @Value("${elasticsearch.socketTimeOut}")
        private int socketTimeOut;
        @Value("${elasticsearch.connectionRequestTimeOut}")
        private int connectionRequestTimeOut;
        @Value("${elasticsearch.maxConnectNum}")
        private int maxConnectNum;
        @Value("${elasticsearch.maxConnectPerRoute}")
        private int maxConnectPerRoute;
        private HttpHost httpHost;
        private boolean uniqueConnectTimeConfig = true;
        private boolean uniqueConnectNumConfig = true;
        private RestClientBuilder builder;
        private RestHighLevelClient client;
    
        /**
         * 返回一个RestHighLevelClient
         *
         * @return
         */
        @Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClient")
        public RestHighLevelClient client() {
            httpHost= new HttpHost(host, port, schema);
            builder = RestClient.builder(httpHost);
            if (uniqueConnectTimeConfig) {
                setConnectTimeOutConfig();
            }
            if (uniqueConnectNumConfig) {
                setMutiConnectConfig();
            }
            client = new RestHighLevelClient(builder);
            return client;
        }
    
        /**
         * 异步httpclient的连接延时配置
         */
        public void setConnectTimeOutConfig() {
            builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                @Override
                public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(connectTimeOut);
                    requestConfigBuilder.setSocketTimeout(socketTimeOut);
                    requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
                    return requestConfigBuilder;
                }
            });
        }
    
    
        /**
         * 异步httpclient的连接数配置
         */
        public void setMutiConnectConfig() {
            builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setMaxConnTotal(maxConnectNum);
                    httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                    return httpClientBuilder;
                }
            });
        }
    
        /**
         * 关闭连接
         */
        public void close() {
            if (client != null) {
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    application.yml

    elasticsearch:
      host: 192.168.x.x
      port: 9200
      schema: http
      connectTimeOut: 1000
      socketTimeOut: 30000
      connectionRequestTimeOut: 500
      maxConnectNum: 100
      maxConnectPerRoute: 100
    

    3、测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot03RestApplicationTests {
    
        @Qualifier(value = "restHighLevelClient")
        @Autowired
        RestHighLevelClient restHighLevelClient;
    
        String indexName="student";
        String esType="msg";
    
    
        @Test
        public void contextLoads() throws IOException{
            RestClient restClient = RestClient.builder(
                    new HttpHost("192.168.56.101", 9200, "http")).build();
            //(1) 执行一个基本的方法,验证es集群是否搭建成功
            Response response = restClient.performRequest("GET", "/", Collections.singletonMap("pretty", "true"));
            System.out.println(EntityUtils.toString(response.getEntity()));
    
    
        }

    当现实create时则表明没问题。

    其它测试:

    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot03RestApplicationTests {
    
        @Qualifier(value = "restHighLevelClient")
        @Autowired
        RestHighLevelClient restHighLevelClient;
    
        String indexName="student";
        String esType="msg";
    
    
        @Test
        public void contextLoads() throws IOException{
            RestClient restClient = RestClient.builder(
                    new HttpHost("192.168.56.101", 9200, "http")).build();
            //(1) 执行一个基本的方法,验证es集群是否搭建成功
            Response response = restClient.performRequest("GET", "/", Collections.singletonMap("pretty", "true"));
            System.out.println(EntityUtils.toString(response.getEntity()));
    
    
        }
    
        //创建索引
        @Test
        public void createIndex(){
    
            //index名必须全小写,否则报错
            String index ="book";
            CreateIndexRequest request = new CreateIndexRequest(index);
            try {
                CreateIndexResponse indexResponse = restHighLevelClient.indices().create(request);
                if (indexResponse.isAcknowledged()) {
                    System.out.println("创建索引成功");
    
                } else {
                    System.out.println("创建索引失败");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        //检查索引
        @Test
        public void findIndex()throws Exception{
    
            try {
                Response response = restHighLevelClient.getLowLevelClient().performRequest("HEAD", "book");
                boolean exist = response.getStatusLine().getReasonPhrase().equals("OK");
                System.out.println(exist);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
        //插入数据
        @Test
        public void addData(){
    
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", 3);
            jsonObject.put("age", 26);
            jsonObject.put("name", "wangwu");
            jsonObject.put("date", new Date());
            IndexRequest indexRequest = new IndexRequest(indexName, esType, "2").source(jsonObject);
    
            try {
                IndexResponse indexResponse = restHighLevelClient.index(indexRequest);
                System.out.println(indexResponse.getId());
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
        /*
        * 使用XContentBuilder添加数据
        * */
        @Test
        public void addData1() throws Exception{
    
            XContentBuilder builder = jsonBuilder();
            builder.startObject();
            {
                builder.field("user", "jiatp");
                builder.timeField("postDate", new Date());
                builder.field("message", "trying out Elasticsearch");
            }
            builder.endObject();
            IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "3")
                    .source(builder).routing("my_route");//可以添加指定路由
            IndexResponse response = restHighLevelClient.index(indexRequest);
            System.out.println(response.status().name());
    
        }
        /*
         * 使用Object key-pairs对象键
         * */
        @Test
        public void addData2() throws Exception{
    
            IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "3")
                    .source("user", "kimchy",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
            IndexResponse response = restHighLevelClient.index(indexRequest);
            System.out.println(response.status().name());
    
        }
        //异步方式
        @Test
        public void testAddAsync() throws InterruptedException {
            ActionListener listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                    System.out.println("Async:" + indexResponse.status().name());
                    if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                        // Todo
                    } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                        // Todo
                    }
                    // 处理成功分片小于总分片的情况
                    ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                        // Todo
                    }
                }
    
                @Override
                public void onFailure(Exception e) {
                    System.out.println("AsyncFailure:" + e.getMessage());
                    e.printStackTrace();
                }
            };
    
            IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "4")
                    .source("user", "luxi",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
    
            restHighLevelClient.indexAsync(indexRequest, listener);  // 异步方式
            Thread.sleep(2000);
    
    
    
        }
    
    
        /*
        * 查询
        *
        * */
        // 指定routing的数据,查询也要指定
        @Test
        public void searchRoute()throws Exception{
    
            GetRequest request = new GetRequest("twitter", "t_doc", "3").routing("my_route");   // 指定routing的数据,查询也要指定
            GetResponse response = restHighLevelClient.get(request);
            System.out.println(response.getSourceAsString());
        }
        //查询-额外参数  异步获取
        @Test
        public void  getOneOp() throws IOException, InterruptedException {
            ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse documentFields) {
                    System.out.println(documentFields.getSourceAsString());
                }
    
                @Override
                public void onFailure(Exception e) {
                    System.out.println("Error:" + e.getMessage());
                    e.printStackTrace();
                }
            };
    
            GetRequest request = new GetRequest("twitter", "t_doc", "2");
            String[] includes = new String[]{"message", "*Date"};   // 包含的字段
            String[] excludes = Strings.EMPTY_ARRAY;                 // 排除的字段
            FetchSourceContext fetchSourceContext =
                    new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext);
            restHighLevelClient.getAsync(request,listener);
            Thread.sleep(2000);
    
        }
    
        //查询所有
        @Test
        public void searchAll(){
            HttpEntity entity = new NStringEntity(
                    "{ "query": { "match_all": {}}}",
                    ContentType.APPLICATION_JSON);
            String endPoint = "/" + indexName + "/" + esType + "/_search";
            try {
                Response response = restHighLevelClient.getLowLevelClient()
                        .performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
                System.out.println(EntityUtils.toString(response.getEntity()));
            } catch(IOException e) {
                e.printStackTrace();
            }
    
        }
    
    
    
        //条件查询  姓名:李四
        @Test
        public void test(){
            try {
                String endPoint = "/" + indexName + "/" + esType + "/_search";
    
                IndexRequest indexRequest = new IndexRequest();
                XContentBuilder builder;
                try {
                    builder = JsonXContent.contentBuilder()
                            .startObject()
                            .startObject("query")
                            .startObject("match")
                            .field("name.keyword", "lisi")
                            .endObject()
                            .endObject()
                            .endObject();
                    indexRequest.source(builder);
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
                String source = indexRequest.source().utf8ToString();
                System.out.println(source);
    
                HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
    
                Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
                System.out.println(EntityUtils.toString(response.getEntity()));
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        //条件查询 叫kimchy的
        @Test
        public void testSearch(){
            try {
            SearchRequest searchRequest = new SearchRequest("twitter");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
            sourceBuilder.from(0);
            sourceBuilder.size(5);
            searchRequest.source(sourceBuilder);
            SearchResponse response = restHighLevelClient.search(searchRequest);
                System.out.println("Hits:" + response.getHits().totalHits);
                response.getHits().forEach(e -> {
                    System.out.println(e.getSourceAsString()); });
    
            } catch(IOException e) {
                e.printStackTrace();
            }
    
    
    
        }
    
           /**
            * * 查询名字等于 lisi
            * 并且年龄在20和40之间
            */
        @Test
        public void serarchFuhe(){
            try {
                String endPoint = "/" + indexName + "/" + esType + "/_search";
    
                IndexRequest indexRequest = new IndexRequest();
                XContentBuilder builder;
                try {
    
                    builder = JsonXContent.contentBuilder()
                            .startObject()
                            .startObject("query")
                            .startObject("bool")
                            .startObject("must")
                            .startObject("match")
                            .field("name.keyword", "lisi")
                            .endObject()
                            .endObject()
                            .startObject("filter")
                            .startObject("range")
                            .startObject("age")
                            .field("gte", "20")
                            .field("lte", "40")
                            .endObject()
                            .endObject()
                            .endObject()
                            .endObject()
                            .endObject()
                            .endObject();
                    indexRequest.source(builder);
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
               String source = indexRequest.source().utf8ToString();
                System.out.println(source);
    
                HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
    
                Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
                System.out.println(EntityUtils.toString(response.getEntity()));
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
        /**
         * 存在即更新【输出:OK】
         * OK
         * {"C":"Carambola","A":"Apple","B":"Banana"}
         * 不存在则创建【输出:CREATED】
         * CREATED
         * {"C":"Carambola"}
         * 开启scriptedUpsert【在文档不存在情况下输出:CREATED】
         * {"A" : "Apple","B" : "Banana","C" : "Carambola"}
         */
        @Test
        public void testUpdate() throws IOException {
            UpdateRequest request = new UpdateRequest("twitter", "t_doc", "7")
                    .script(new Script(ScriptType.INLINE,"painless",
                            "ctx._source.A='Apple';ctx._source.B='Banana'",Collections.EMPTY_MAP))
                    // 如果文档不存在,使用upsert方法定义一些内容,这些内容将作为新文档插入
                    .upsert(jsonBuilder()
                            .startObject()
                            .field("C","Carambola")
                            .endObject());
            request.timeout(TimeValue.timeValueSeconds(2)); // 2秒超时
            //request.scriptedUpsert(true);   // 无论文档是否存在,脚本都必须运行
            UpdateResponse update = restHighLevelClient.update(request);
            System.out.println(update.status().name());
    
    
        }
    
    
        //删除
        @Test
        public void delete(){
    
            String endPoint = "/" + indexName + "/" + esType + "/_delete_by_query";
    
            /**
             * 删除条件
             */
            IndexRequest indexRequest = new IndexRequest();
            XContentBuilder builder;
            try {
                builder = JsonXContent.contentBuilder()
                        .startObject()
                        .startObject("query")
                        .startObject("term")
                        //name中包含deleteText
                        .field("name.keyword", "wangwu")
                        .endObject()
                        .endObject()
                        .endObject();
                indexRequest.source(builder);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            String source = indexRequest.source().utf8ToString();
    
            HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
            try {
                Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
                System.out.println(EntityUtils.toString(response.getEntity()));
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
       
    
    }

    可看api进行测试,https://blog.csdn.net/jatpen/article/details/102631110

    或者查看官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.4/java-rest-high-supported-apis.html

  • 相关阅读:
    redis哨兵高可用
    数据库主从搭建
    docker 补充
    docker 进阶操作
    docker 简介
    数据可视化(Matplotlib)
    数据操作
    pandas练习
    Pandas简介
    python mysql utf-8 latin
  • 原文地址:https://www.cnblogs.com/jatpeo/p/11767472.html
Copyright © 2011-2022 走看看