zoukankan      html  css  js  c++  java
  • springBoot整合elasticsearch6.4.0

    环境:es6.4.0   springBoot1.5.9  mysql8.X    logstash6.4.0   kibana6.4.0   redhat6

    开发工具:eclipse Oxygen.3a Release (4.7.3a)

    开发步骤:

    1.新建springBoot工程,过程略,网上有好多教程大家可以自己尝试

    2.引入logstash6.4依赖

    <!--elasticsearch 开始 -->
     2         <dependency>
     3             <groupId>org.elasticsearch</groupId>
     4             <artifactId>elasticsearch</artifactId>
     5             <version>${elasticsearch.version}</version>
     6         </dependency>
     7         <dependency>
     8             <groupId>org.elasticsearch.client</groupId>
     9             <artifactId>elasticsearch-rest-high-level-client</artifactId>
    10             <version>${elasticsearch.version}</version>
    11         </dependency>
    12         <dependency>
    13             <groupId>org.apache.logging.log4j</groupId>
    14             <artifactId>log4j-api</artifactId>
    15             <version>2.8.2</version>
    16         </dependency>
    17         <dependency>
    18             <groupId>org.apache.logging.log4j</groupId>
    19             <artifactId>log4j-core</artifactId>
    20             <version>2.8.2</version>
    21         </dependency>
    22         <dependency>
    23             <groupId>org.elasticsearch.client</groupId>
    24             <artifactId>elasticsearch-rest-client</artifactId>
    25             <version>${elasticsearch.version}</version>
    26         </dependency>
    27         <dependency>
    28             <groupId>org.elasticsearch.client</groupId>
    29             <artifactId>elasticsearch-rest-client-sniffer</artifactId>
    30             <version>${elasticsearch.version}</version>
    31         </dependency>
    32         <!--elasticsearch 结束 -->
    

    3.yml配置文件

    server:
       port: 8090
    
    
    spring:
      application:
        name: Elasticsearch
      data:
        elasticsearch:
          cluster-name: elasticsearch-application
          cluster-nodes: 192.168.177.128:9200
      jackson:
        default-property-inclusion: non_null
    
    logging:
      level:
        root: info
          
    

    4.测试类

      1 import java.io.IOException;
      2 
      3 import org.elasticsearch.action.get.GetRequest;
      4 import org.elasticsearch.action.get.GetResponse;
      5 import org.elasticsearch.action.search.ClearScrollRequest;
      6 import org.elasticsearch.action.search.ClearScrollResponse;
      7 import org.elasticsearch.action.search.SearchRequest;
      8 import org.elasticsearch.action.search.SearchResponse;
      9 import org.elasticsearch.action.search.SearchScrollRequest;
     10 import org.elasticsearch.client.RequestOptions;
     11 import org.elasticsearch.client.RestHighLevelClient;
     12 import org.elasticsearch.common.unit.TimeValue;
     13 import org.elasticsearch.index.query.BoolQueryBuilder;
     14 import org.elasticsearch.index.query.MatchQueryBuilder;
     15 import org.elasticsearch.index.query.QueryBuilders;
     16 import org.elasticsearch.index.query.WildcardQueryBuilder;
     17 import org.elasticsearch.search.Scroll;
     18 import org.elasticsearch.search.SearchHit;
     19 import org.elasticsearch.search.SearchHits;
     20 import org.elasticsearch.search.builder.SearchSourceBuilder;
     21 import org.junit.Test;
     22 import org.junit.runner.RunWith;
     23 import org.springframework.beans.factory.annotation.Autowired;
     24 import org.springframework.boot.test.context.SpringBootTest;
     25 import org.springframework.test.context.junit4.SpringRunner;
     26 
     27 import com.elasticsearch.Es_App;
     28 
     29 
     30 /**
     31  * @author 33  */
     34 @RunWith(SpringRunner.class)
     35 @SpringBootTest(classes = Es_App.class)
     36 public class test {
     37     @Autowired
     38     RestHighLevelClient client;
     39     // 这个时间并不需要长到可以处理所有的数据,仅仅需要足够长来处理前一批次的结果。每个 scroll 请求(包含 scroll 参数)设置了一个新的失效时间。
     40     final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
     41 
     42     @Test
     43     public void getIndexTest() {
     44 
     45         GetRequest request = new GetRequest("db_user", "user", "4");
     46         try {
     47 
     48             GetResponse response = client.get(request);
     49             System.out.println(response);
     50         } catch (IOException e) {
     51 
     52             e.printStackTrace();
     53         }
     54     }
     55 83 
     84     @Test
     85     public void wildcard() throws Exception {
     86         WildcardQueryBuilder mqb = QueryBuilders.wildcardQuery("username", "z*");
     87         WildcardQueryBuilder mqb1 = QueryBuilders.wildcardQuery("name.keyword", "李*");
     88         BoolQueryBuilder bq = QueryBuilders.boolQuery();
     89         bq.should(mqb);
     90         bq.should(mqb1);
     91         
     92         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
     93         sourceBuilder.size(300);
     94         sourceBuilder.query(bq);
     95         sourceBuilder.fetchSource(new String[] { "username", "name" }, null);
     96         System.out.println(sourceBuilder.toString());
     97         
     98         SearchRequest searchRequest = new SearchRequest("db_user").types("user");
     99         searchRequest.source(sourceBuilder);
    100         searchRequest.scroll(scroll);
    101         
    102         SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    103 
    104         SearchHit[] searchHits = searchResponse.getHits().getHits();106         for (SearchHit searchHit : searchHits) {
    107             System.out.println(searchHit.getSourceAsString());
    108             //String sourceAsString = searchHit.getSourceAsString();
    109         }
    110 
    111         // 遍历搜索命中的数据,直到没有数据
    112         String scrollId = searchResponse.getScrollId();
    113         while (searchHits != null && searchHits.length > 0) {
    114             SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
    115             scrollRequest.scroll(scroll);
    116             try {
    117                 searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
    118             } catch (IOException e) {
    119                 e.printStackTrace();
    120             }
    121             scrollId = searchResponse.getScrollId();
    122             searchHits = searchResponse.getHits().getHits();
    123             if (searchHits != null && searchHits.length > 0) {
    124                125                 for (SearchHit searchHit : searchHits) {
    126                     System.out.println(searchHit.getSourceAsString());
    127 //                    String sourceAsString = searchHit.getSourceAsString();
    128                 }
    129             }
    130         }
    131         // 清除滚屏
    132         ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
    133         clearScrollRequest.addScrollId(scrollId);// 也可以选择setScrollIds()将多个scrollId一起使用
    134         ClearScrollResponse clearScrollResponse = null;
    135         try {
    136             clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
    137         } catch (IOException e) {
    138             e.printStackTrace();
    139         }
    140         boolean succeeded = clearScrollResponse.isSucceeded();
    141         System.out.println("succeeded:" + succeeded);
    142     }
    143 
    144     @Test
    145     public void test2() {
    146     }
    147 
    148 }

    关于测试类说明:使用springBoot自动注入需要使用高版本的junit,博主在测试时使用的时4.1.2版本的junit,依赖如下:

    <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>

    es6中如果需要定义查询语句中的字段的类型,可以使用   字段.类型格式定义,如:username.keyword。

    5.配置类:

    import org.apache.http.HttpHost;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    
    
    
    @Configuration
    public class ElasticsearchConfiguration 
    implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean 
    {
        
        private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
    
        @Value("${spring.data.elasticsearch.cluster-nodes}")
        private String clusterNodes;
    
        private RestHighLevelClient restHighLevelClient;
    
        /**
         * 控制Bean的实例化过程
         *
         * @return
         * @throws Exception
         */
        @Override
        public RestHighLevelClient getObject() throws Exception {
            return restHighLevelClient;
        }
    
        /**
         * 获取接口返回的实例的class
         * 
         *
         * @return
         */
        @Override
        public Class<?> getObjectType() {
            return RestHighLevelClient.class;
        }
    
        @Override
        public void destroy() throws Exception {
            try {
                if (restHighLevelClient != null) {
                    restHighLevelClient.close();
                }
            } catch (final Exception e) {
                LOG.error("Error closing ElasticSearch client: ", e);
            }
        }
    
        @Override
        public boolean isSingleton() {
            return false;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            restHighLevelClient = buildClient();
        }
    
        private RestHighLevelClient buildClient() {
            try {
                restHighLevelClient = new RestHighLevelClient(
                        RestClient.builder(
                                new HttpHost(
                                        clusterNodes.split(":")[0],
                                        Integer.parseInt(clusterNodes.split(":")[1]),
                                        "http")));
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
            return restHighLevelClient;
        }
    }

    使用logstash将mysql数据库数据导入到es

    logstash6.4.0安装 (linux):

    1.下载linux版安装包,解压安装文件。

    2. 在logstash解压包下创建mysql文件夹(博主是导入mysql数据库数据,所已文件夹名字为mysql) ,将mysql版本相对应驱动jar包放入mysql文件中

    3.创建logstash连接mysql数据库配置文件,如下:

    input {
        stdin {
        }
        jdbc {
          # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
          #jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/webject?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
          jdbc_connection_string=> "jdbc:mysql://192.168.177.128:3306/webject?allowMultiQueries=true&useAffectedRows=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&useUnicode=true&characterEncoding=utf8"
          # 你的账户密码
          jdbc_user => "root"
          jdbc_password => "123456"
          # 连接数据库的驱动包,建议使用绝对地址
          jdbc_driver_library => "mysql/mysql-connector-java-8.0.12.jar"
          # 这是不用动就好
          jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
    
        #处理中文乱码问题
          codec => plain { charset => "UTF-8"}
    
           #使用其它字段追踪,而不是用时间
          use_column_value => true
           #追踪的字段      
        tracking_column => id      
        record_last_run => true     
        #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值     
        last_run_metadata_path => "mysql/station_parameter.txt"
    
          jdbc_default_timezone => "Asia/Shanghai"
     
          #statement_filepath => "mysql/jdbc.sql"
           statement => "select * from user where id > :sql_last_value"
          
    
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
    
          # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟
          schedule => "*/5 * * * *"
          type => "jdbc"
        }
    }
    
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
     
     
    output {
        elasticsearch {
            # 要导入到的Elasticsearch所在的主机
            hosts => "192.168.177.128:9200"
            # 要导入到的Elasticsearch的索引的名称
            index => "db_user"
            # 类型名称(类似数据库表名)
            document_type => "user"
            # 主键名称(类似数据库主键)
            document_id => "%{id}"
            # es 账号
            user => elastic
            password => changeme
            
        }
    
        stdout {
            # JSON格式输出
            codec => json_lines
        }
        
    }

    4.启动logstash 

       进入bin目录 输入以下启动命令: ./logstash -f mysql/jdbc.conf

     说明:

       导入后,以后导入数据库会从最后的导入位置开始,博主在jdbc.conf中配置的是id,所以下一次导入会从最后的一个id开始导入,如果需要重新开始导入,则删除mysql目录下的station_parameter.txt文件即可。

       不同版本logstash连接数据的配置文件的格式可能有所区别。

     
  • 相关阅读:
    SDN第二次上机作业
    全面而清晰的网络流
    source命令详解
    C++ 优先队列存放自定义类型的指针时自定义优先级失效的解决方法
    find命令查找文件和文件夹
    OOAD
    NULL与nullptr
    Segment fault 常见原因
    虚函数的用法
    使用gdb调试段错误
  • 原文地址:https://www.cnblogs.com/qinshou/p/12581911.html
Copyright © 2011-2022 走看看