zoukankan      html  css  js  c++  java
  • spring Boot 整合 Elasticsearch

    先来几个测试方法, pom在最下面:

    完整代码 : https://github.com/lifan12589/infinite-possibilities/tree/master/springboot_stu/es-Api

    创建索引:

        @Test
        @SneakyThrows
        public void createIndex() {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
    
            CreateIndexRequest request = new CreateIndexRequest("test_index");
    
            request.settings(Settings.builder()
                    .put("index.number_of_shards", 3)//3个分片
                    .put("index.number_of_replicas", 2)//2个副本
            );
            CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
            if (createIndexResponse.isAcknowledged()) {
                System.out.println("创建index成功!");
            } else {
                System.out.println("创建index失败!");
            }
    
            client.close();
        }

    查询索引名称:

        @Test
        @SneakyThrows
        public void getIndex() {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
    
            GetIndexRequest request = new GetIndexRequest("se*");//查询se开头的索引名称
            GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
            String[] indices = response.getIndices();
            for (String indexName : indices) {
                System.out.println("index name:" + indexName);
            }
            client.close();
        }

    删除索引:

        @Test
        @SneakyThrows
        public void delIndex() {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
            DeleteIndexRequest request = new DeleteIndexRequest("test_index");//要删除的索引名称
            AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
            if (response.isAcknowledged()) {
                System.out.println("删除index成功!");
            } else {
                System.out.println("删除index失败!");
            }
            client.close();
        }

    插入数据:

    @Test
        @SneakyThrows
        public void insertData() {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
    
            //从数据库查询
            List<ProductInfo> list = productInfoMapper.selectAll();
    
            System.out.println("list : "+list);
            //插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyyMM。
            IndexRequest request = new IndexRequest("index_product");
            //最好不要自定义id 会影响插入速度。
            ProductInfo product = list.get(0);
            Gson gson = new Gson();
            request.id(product.getId().toString());
            request.source(gson.toJson(product)
                    , XContentType.JSON);
            IndexResponse response = client.index(request, RequestOptions.DEFAULT);
            System.out.println(response);
            client.close();
        }

    批量插入:

    @Test
        @SneakyThrows
        public void batchInsertData() {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
            //查询数据库  批量插入数据,更新和删除同理
            BulkRequest request = new BulkRequest("index_product");
            Gson gson = new Gson();
            //从数据库查询
            List<ProductInfo> lists = productInfoMapper.selectAll();
    
            for (ProductInfo list: lists) {
                System.out.println(gson.toJson(list));
                request.add(new IndexRequest().source(gson.toJson(list)
                        , XContentType.JSON));
            }
    
            BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
            System.out.println("数量:" + response.getItems().length);
            client.close();
        }

    根据 id 查询:

    @Test
        @SneakyThrows
        public void getById() {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
            //注意 这里查询使用的是别名。
            GetRequest request = new GetRequest("index_product", "mQYKzncBSw_3e2MkFkmH");
    
            //只查询特定字段。如果需要查询所有字段则不设置该项。
            String[] includes = {"itemname", "itemcode","inputDate"};
            String[] excludes = {"desc"};
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext);
    
            GetResponse response = client.get(request, RequestOptions.DEFAULT);
            System.out.println(response);
            client.close();
    
        }

    根据 id 查询多条数据:

     @Test
        public void multiGetById() throws IOException {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
            //多个结果  根据id查询
            MultiGetRequest request = new MultiGetRequest();
    
            request.add("index_product", "8Sj00XcB2CbVnAcWca2M");
            request.add("index_product", "9Sj00XcB2CbVnAcWca2M");
            //两种写法
    //        request.add(new MultiGetRequest.Item(
    //                "test_index",
    //                "ewbmzXcBSw_3e2MkIkk-"));
            MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
            for (MultiGetItemResponse itemResponse : response) {
                System.out.println(itemResponse.getResponse().getSourceAsString());
            }
            client.close();
        }

    根据 id 删除:

      @Test
        public void delById() throws IOException {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
            DeleteRequest request = new DeleteRequest("ac-new", "6");
            DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
            System.out.println(response);
            client.close();
        }

    根据某个属性就行批量更改:

    @Test
        public void updateByQuery() throws IOException {
            RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(
                            new HttpHost("localhost", 9200, "http"),
                            new HttpHost("localhost", 9201, "http"),
                            new HttpHost("localhost", 9202, "http")));
            UpdateByQueryRequest request = new UpdateByQueryRequest("index_product");
            //默认情况下,版本冲突会中止 UpdateByQueryRequest 进程,但是你可以用以下命令来代替
            //设置版本冲突继续
    //        request.setConflicts("proceed");
    
            //设置更新条件   加 keyword 可以精确匹配,不会被分词
            request.setQuery(QueryBuilders.matchQuery("itemname.keyword","具体属性值"));
    //        //限制更新条数
            request.setMaxDocs(8);
            request.setScript(
                    new Script(ScriptType.INLINE, "painless", "ctx._source.itemname+='#';", Collections.emptyMap()));
            BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
            System.out.println(response);
            client.close();
        }

    嗅探器:

    @Test
        public void sniffer() throws IOException {
    
            // region 监听器
            SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
    
            //获取客户端
            RestClient restClient = RestClient.builder(
                    new HttpHost("localhost",9200,"http"),
                    new HttpHost("localhost",9201,"http"),
                    new HttpHost("localhost",9202,"http"))
                    .setFailureListener(sniffOnFailureListener)
                    .build();
    
            //设置 https
            NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
                    restClient,
                    ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
                    ElasticsearchNodesSniffer.Scheme.HTTPS
            );
    
            //为 restClient 绑定 嗅探器
            Sniffer sniffer = Sniffer.builder(restClient)
                    .setSniffIntervalMillis(5000)  //设置连续两次普通嗅探执行之间的间隔(以毫秒为单位)
                    .setSniffAfterFailureDelayMillis(30000)  //设置失败后计划执行嗅探的延迟(以毫秒为单位)
                    .setNodesSniffer(nodesSniffer)
                    .build();
    
            sniffOnFailureListener.setSniffer(sniffer);
    
            //先关嗅探器, 后关客户端
            sniffer.close();
            restClient.close();
        }

    查看节点信息:

     @Test
        @SneakyThrows
        public void snifferT(){
            RestHighLevelClient client = ESClient.getEsClient().getHighLevelClient();
    
            Iterator<Node> nodes = client.getLowLevelClient().getNodes().iterator();
            while (nodes.hasNext()){
                Node node = nodes.next();
                System.out.println("初始化节点 : " + node);
            }
    
            Thread.sleep(60000);
            System.out.println("准备二次扫描:");
            nodes = client.getLowLevelClient().getNodes().iterator();
            while (nodes.hasNext()){
                Node node = nodes.next();
                System.out.println("二次扫描 : "+node);
            }
    
            Thread.sleep(60000);
            System.out.println("准备三次扫描:");
            nodes = client.getLowLevelClient().getNodes().iterator();
            while (nodes.hasNext()){
                Node node = nodes.next();
                System.out.println("三次扫描 : "+node);
            }
            ESClient.getEsClient().closeClient();
    
        }

    查询数据库数据,插入ES:

    @Test
        @SneakyThrows
        public void bulkInit(){
            RestHighLevelClient client = ESClient.getEsClient().getHighLevelClient();
            GetIndexRequest request = new GetIndexRequest("bulk_index");
            Boolean exists = client.indices().exists(request,RequestOptions.DEFAULT);
    
            if(!exists) {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest("bulk_index");
                createIndexRequest.settings(Settings.builder()
                        .put("index.number_of_shards", 3) //3个 主分片
                        .put("index.number_of_replicas", 2)); //2个副本 (每个主分片对应2个副本)
    
                CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest,RequestOptions.DEFAULT);
                int i = createIndexResponse.hashCode();
                boolean acknowledged = createIndexResponse.isAcknowledged();
                System.out.println("创建索引 ACK : "+acknowledged+"   索引 code : "+i);
            }
    
                List<BulkInfo> list = bulkInfoMapper.selectAll();
    
                BulkRequest bulkRequest = new BulkRequest("bulk_index");
                Gson gson = new Gson();
                for (int i=0;i<list.size();i++){
    
                    //数据库时间转换
                    Date date = new Date(list.get(i).getInputDate().getTime());
                    SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
                    String sd = sdf.format(date);
                    //替换时间字段
                    String gsons = gson.toJson(list.get(i));
                    JSONObject json = new JSONObject(gsons);
                    json.put("inputDate",sd);
    
                    bulkRequest.add(new IndexRequest().id(Integer.toString(i)).source(json.toString(),XContentType.JSON));
                }
    
                BulkResponse response = client.bulk(bulkRequest,RequestOptions.DEFAULT);
                System.out.println("插入条数 : "+response.getItems().length);
    
               ESClient.getEsClient().closeClient();
    
        }

    ESClient:

    package com.infinitePossibilities.util;
    
    import org.apache.http.Header;
    import org.apache.http.HttpHost;
    import org.apache.http.message.BasicHeader;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.sniff.Sniffer;
    import java.io.IOException;
    
    public class ESClient {
    
        private static ESClient esClient;
        private String host = "localhost:9200,localhost:9201,localhost:9202";
        private RestClientBuilder builder;
        private static Sniffer sniffer;
        private  static RestHighLevelClient highLevelClient;
    
        private ESClient(){
    
        }
    
        public static ESClient getEsClient(){
    
            if(esClient == null){
    
                synchronized (ESClient.class){
                    if(esClient == null){
                       esClient = new ESClient();
                       esClient.initBuilder();
                    }
                }
            }
           return esClient;
        }
    
    
        public RestClientBuilder initBuilder(){
    
            String [] hosts = host.split(",");
    
            HttpHost[] httpHosts = new HttpHost[hosts.length];
            for(int i = 0;i<hosts.length;i++){
                String [] host = hosts[i].split(":");
                httpHosts[i] = new HttpHost(host[0],Integer.parseInt(host[1]),"http");
            }
    
            //region 在Builder中设置请求头
            //  1.设置请求头
            builder = RestClient.builder(httpHosts);
            Header[] defaultHeader = new Header[]{
                    new BasicHeader("Content-type","application/json")
            };
            builder.setDefaultHeaders(defaultHeader);
    
            return builder;
        }
    
        public RestHighLevelClient getHighLevelClient(){
    
            if(highLevelClient == null){
                synchronized (ESClient.class){
                    if(highLevelClient == null){
                        highLevelClient = new RestHighLevelClient(builder);
                        //开启嗅探器
                        sniffer = Sniffer.builder(highLevelClient.getLowLevelClient())
                                .setSniffIntervalMillis(5000)  //设置连续两次普通嗅探执行之间的间隔(以毫秒为单位)
                                .setSniffAfterFailureDelayMillis(15000)  //设置失败后计划执行嗅探的延迟(以毫秒为单位)
                                .build();
                    }
                }
            }
            return highLevelClient;
        }
    
        public void closeClient(){
    
            if(null!=highLevelClient){
    
                try {
                    sniffer.close();
                    highLevelClient.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }

    pom :

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.1.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.infinitePossibilities</groupId>
        <artifactId>es</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>es</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.mybatis.generator</groupId>
                <artifactId>mybatis-generator-maven-plugin</artifactId>
                <version>1.4.0</version>
                <type>maven-plugin</type>
            </dependency>
    
            <!--MyBatis-Plus启动器
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>3.3.1</version>
            </dependency>    -->
    
            <!-- mysql:MyBatis相关依赖-->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>2.0.0</version>
            </dependency>
    
            <!--        ES transport client-->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>7.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client-sniffer</artifactId>
                <version>7.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>7.6.2</version>
            </dependency>
    
            <!--        mysql-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <!-- mysql:阿里巴巴数据库连接池 -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.1.12</version>
            </dependency>
    
            <!--        lombok-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.6</version>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.5</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.60</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
  • 相关阅读:
    防采集策略『blueidea』
    关于进程和线程『整理』
    数据采集『blueidea』
    搜索引擎营销的一些策略『来源:点石互动搜索引擎优化博』
    AJAX之通讯技术简介
    使用AJAX技术构建更优秀的Web应用程序
    AJAX相关JS代码片段和浏览器模型『』
    RDLC报表:每页显示N条记录
    ObjectMapper .NET
    How to Hash Data with Salt
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14823257.html
Copyright © 2011-2022 走看看