zoukankan      html  css  js  c++  java
  • springBoot整合elasticsearch6.4.0

    环境:es6.4.0   springBoot1.5.9  mysql8.X    logstash6.4.0   kibana6.4.0   redhat6

    开发工具:eclipse Oxygen.3a Release (4.7.3a)

    开发步骤:

    1.新建springBoot工程,过程略,网上有好多教程大家可以自己尝试

    2.引入logstash6.4依赖

    <!--elasticsearch 开始 -->
     2         <dependency>
     3             <groupId>org.elasticsearch</groupId>
     4             <artifactId>elasticsearch</artifactId>
     5             <version>${elasticsearch.version}</version>
     6         </dependency>
     7         <dependency>
     8             <groupId>org.elasticsearch.client</groupId>
     9             <artifactId>elasticsearch-rest-high-level-client</artifactId>
    10             <version>${elasticsearch.version}</version>
    11         </dependency>
    12         <dependency>
    13             <groupId>org.apache.logging.log4j</groupId>
    14             <artifactId>log4j-api</artifactId>
    15             <version>2.8.2</version>
    16         </dependency>
    17         <dependency>
    18             <groupId>org.apache.logging.log4j</groupId>
    19             <artifactId>log4j-core</artifactId>
    20             <version>2.8.2</version>
    21         </dependency>
    22         <dependency>
    23             <groupId>org.elasticsearch.client</groupId>
    24             <artifactId>elasticsearch-rest-client</artifactId>
    25             <version>${elasticsearch.version}</version>
    26         </dependency>
    27         <dependency>
    28             <groupId>org.elasticsearch.client</groupId>
    29             <artifactId>elasticsearch-rest-client-sniffer</artifactId>
    30             <version>${elasticsearch.version}</version>
    31         </dependency>
    32         <!--elasticsearch 结束 -->
    

    3.yml配置文件

    server:
       port: 8090
    
    
    spring:
      application:
        name: Elasticsearch
      data:
        elasticsearch:
          cluster-name: elasticsearch-application
          cluster-nodes: 192.168.177.128:9200
      jackson:
        default-property-inclusion: non_null
    
    logging:
      level:
        root: info
          
    

    4.测试类

      1 import java.io.IOException;
      2 
      3 import org.elasticsearch.action.get.GetRequest;
      4 import org.elasticsearch.action.get.GetResponse;
      5 import org.elasticsearch.action.search.ClearScrollRequest;
      6 import org.elasticsearch.action.search.ClearScrollResponse;
      7 import org.elasticsearch.action.search.SearchRequest;
      8 import org.elasticsearch.action.search.SearchResponse;
      9 import org.elasticsearch.action.search.SearchScrollRequest;
     10 import org.elasticsearch.client.RequestOptions;
     11 import org.elasticsearch.client.RestHighLevelClient;
     12 import org.elasticsearch.common.unit.TimeValue;
     13 import org.elasticsearch.index.query.BoolQueryBuilder;
     14 import org.elasticsearch.index.query.MatchQueryBuilder;
     15 import org.elasticsearch.index.query.QueryBuilders;
     16 import org.elasticsearch.index.query.WildcardQueryBuilder;
     17 import org.elasticsearch.search.Scroll;
     18 import org.elasticsearch.search.SearchHit;
     19 import org.elasticsearch.search.SearchHits;
     20 import org.elasticsearch.search.builder.SearchSourceBuilder;
     21 import org.junit.Test;
     22 import org.junit.runner.RunWith;
     23 import org.springframework.beans.factory.annotation.Autowired;
     24 import org.springframework.boot.test.context.SpringBootTest;
     25 import org.springframework.test.context.junit4.SpringRunner;
     26 
     27 import com.elasticsearch.Es_App;
     28 
     29 
     30 /**
     31  * @author 33  */
     34 @RunWith(SpringRunner.class)
     35 @SpringBootTest(classes = Es_App.class)
     36 public class test {
     37     @Autowired
     38     RestHighLevelClient client;
     39     // 这个时间并不需要长到可以处理所有的数据,仅仅需要足够长来处理前一批次的结果。每个 scroll 请求(包含 scroll 参数)设置了一个新的失效时间。
     40     final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
     41 
     42     @Test
     43     public void getIndexTest() {
     44 
     45         GetRequest request = new GetRequest("db_user", "user", "4");
     46         try {
     47 
     48             GetResponse response = client.get(request);
     49             System.out.println(response);
     50         } catch (IOException e) {
     51 
     52             e.printStackTrace();
     53         }
     54     }
     55 83 
     84     @Test
     85     public void wildcard() throws Exception {
     86         WildcardQueryBuilder mqb = QueryBuilders.wildcardQuery("username", "z*");
     87         WildcardQueryBuilder mqb1 = QueryBuilders.wildcardQuery("name.keyword", "李*");
     88         BoolQueryBuilder bq = QueryBuilders.boolQuery();
     89         bq.should(mqb);
     90         bq.should(mqb1);
     91         
     92         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
     93         sourceBuilder.size(300);
     94         sourceBuilder.query(bq);
     95         sourceBuilder.fetchSource(new String[] { "username", "name" }, null);
     96         System.out.println(sourceBuilder.toString());
     97         
     98         SearchRequest searchRequest = new SearchRequest("db_user").types("user");
     99         searchRequest.source(sourceBuilder);
    100         searchRequest.scroll(scroll);
    101         
    102         SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    103 
    104         SearchHit[] searchHits = searchResponse.getHits().getHits();106         for (SearchHit searchHit : searchHits) {
    107             System.out.println(searchHit.getSourceAsString());
    108             //String sourceAsString = searchHit.getSourceAsString();
    109         }
    110 
    111         // 遍历搜索命中的数据,直到没有数据
    112         String scrollId = searchResponse.getScrollId();
    113         while (searchHits != null && searchHits.length > 0) {
    114             SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
    115             scrollRequest.scroll(scroll);
    116             try {
    117                 searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
    118             } catch (IOException e) {
    119                 e.printStackTrace();
    120             }
    121             scrollId = searchResponse.getScrollId();
    122             searchHits = searchResponse.getHits().getHits();
    123             if (searchHits != null && searchHits.length > 0) {
    124                125                 for (SearchHit searchHit : searchHits) {
    126                     System.out.println(searchHit.getSourceAsString());
    127 //                    String sourceAsString = searchHit.getSourceAsString();
    128                 }
    129             }
    130         }
    131         // 清除滚屏
    132         ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
    133         clearScrollRequest.addScrollId(scrollId);// 也可以选择setScrollIds()将多个scrollId一起使用
    134         ClearScrollResponse clearScrollResponse = null;
    135         try {
    136             clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
    137         } catch (IOException e) {
    138             e.printStackTrace();
    139         }
    140         boolean succeeded = clearScrollResponse.isSucceeded();
    141         System.out.println("succeeded:" + succeeded);
    142     }
    143 
    144     @Test
    145     public void test2() {
    146     }
    147 
    148 }

    关于测试类说明:使用springBoot自动注入需要使用高版本的junit,博主在测试时使用的时4.1.2版本的junit,依赖如下:

    <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>

    es6中如果需要定义查询语句中的字段的类型,可以使用   字段.类型格式定义,如:username.keyword。

    5.配置类:

    import org.apache.http.HttpHost;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    
    
    
    @Configuration
    public class ElasticsearchConfiguration 
    implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean 
    {
        
        private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
    
        @Value("${spring.data.elasticsearch.cluster-nodes}")
        private String clusterNodes;
    
        private RestHighLevelClient restHighLevelClient;
    
        /**
         * 控制Bean的实例化过程
         *
         * @return
         * @throws Exception
         */
        @Override
        public RestHighLevelClient getObject() throws Exception {
            return restHighLevelClient;
        }
    
        /**
         * 获取接口返回的实例的class
         * 
         *
         * @return
         */
        @Override
        public Class<?> getObjectType() {
            return RestHighLevelClient.class;
        }
    
        @Override
        public void destroy() throws Exception {
            try {
                if (restHighLevelClient != null) {
                    restHighLevelClient.close();
                }
            } catch (final Exception e) {
                LOG.error("Error closing ElasticSearch client: ", e);
            }
        }
    
        @Override
        public boolean isSingleton() {
            return false;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            restHighLevelClient = buildClient();
        }
    
        private RestHighLevelClient buildClient() {
            try {
                restHighLevelClient = new RestHighLevelClient(
                        RestClient.builder(
                                new HttpHost(
                                        clusterNodes.split(":")[0],
                                        Integer.parseInt(clusterNodes.split(":")[1]),
                                        "http")));
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
            return restHighLevelClient;
        }
    }

    使用logstash将mysql数据库数据导入到es

    logstash6.4.0安装 (linux):

    1.下载linux版安装包,解压安装文件。

    2. 在logstash解压包下创建mysql文件夹(博主是导入mysql数据库数据,所已文件夹名字为mysql) ,将mysql版本相对应驱动jar包放入mysql文件中

    3.创建logstash连接mysql数据库配置文件,如下:

    input {
        stdin {
        }
        jdbc {
          # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
          #jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/webject?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
          jdbc_connection_string=> "jdbc:mysql://192.168.177.128:3306/webject?allowMultiQueries=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&useUnicode=true&characterEncoding=utf8"
          # 你的账户密码
          jdbc_user => "root"
          jdbc_password => "123456"
          # 连接数据库的驱动包,建议使用绝对地址
          jdbc_driver_library => "mysql/mysql-connector-java-8.0.12.jar"
          # 这是不用动就好
          jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
    
        #处理中文乱码问题
          codec => plain { charset => "UTF-8"}
    
           #使用其它字段追踪,而不是用时间
          use_column_value => true
           #追踪的字段      
        tracking_column => id      
        record_last_run => true     
        #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值     
        last_run_metadata_path => "mysql/station_parameter.txt"
    
          jdbc_default_timezone => "Asia/Shanghai"
     
          #statement_filepath => "mysql/jdbc.sql"
           statement => "select * from user where id > :sql_last_value"
          
    
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
    
          # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟
          schedule => "*/5 * * * *"
          type => "jdbc"
        }
    }
    
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
     
     
    output {
        elasticsearch {
            # 要导入到的Elasticsearch所在的主机
            hosts => "192.168.177.128:9200"
            # 要导入到的Elasticsearch的索引的名称
            index => "db_user"
            # 类型名称(类似数据库表名)
            document_type => "user"
            # 主键名称(类似数据库主键)
            document_id => "%{id}"
            # es 账号
            user => elastic
            password => changeme
            
        }
    
        stdout {
            # JSON格式输出
            codec => json_lines
        }
        
    }

    4.启动logstash 

       进入bin目录 输入以下启动命令: ./logstash -f mysql/jdbc.conf

     说明:

       导入后,以后导入数据库会从最后的导入位置开始,博主在jdbc.conf中配置的是id,所以下一次导入会从最后的一个id开始导入,如果需要重新开始导入,则删除mysql目录下的station_parameter.txt文件即可。

       不同版本logstash连接数据的配置文件的格式可能有所区别。

     
  • 相关阅读:
    Linux内核网络协议栈优化总纲
    Java实现 蓝桥杯VIP 算法训练 连续正整数的和
    Java实现 蓝桥杯VIP 算法训练 连续正整数的和
    Java实现 蓝桥杯VIP 算法训练 寂寞的数
    Java实现 蓝桥杯VIP 算法训练 寂寞的数
    Java实现 蓝桥杯VIP 算法训练 学做菜
    Java实现 蓝桥杯VIP 算法训练 学做菜
    Java实现 蓝桥杯VIP 算法训练 判断字符位置
    Java实现 蓝桥杯VIP 算法训练 判断字符位置
    Java实现 蓝桥杯VIP 算法训练 链表数据求和操作
  • 原文地址:https://www.cnblogs.com/qinshou/p/12581911.html
Copyright © 2011-2022 走看看