zoukankan      html  css  js  c++  java
  • centos7配置Logstash同步Mysql数据到Elasticsearch

    Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。个人认为这款插件是比较稳定,容易配置的使用Logstash之前,我们得明确自己的需求场景是什么,从哪种类型的数据源同步数据到哪种存储库。Logstash版本迭代较快,每个版本的插件都有点区别,比如6.3版本以后output到没有jdbc的插件,然而你如果想使用output的jdbc插件就需要去安装插件(logstash-output-jdbc),也就是说,如果你想用output的jdbc,你就必须使用6.3以下(最好5.x)的版本。这里以Logstash5.3.1版本为例。

    1.下载安装

    Logstash不支持jdk1.10,建议使用1.8。Logstash版本要与Es版本保持一致。下载地址:https://www.elastic.co/cn/downloads/past-releases/logstash-5-3-1

    上传到目录,例如:usr/local/src

    解压  tar -zxvf  logstash-5.3.1.tar.gz

    重命名 mv logstash-5.3.1 logstash

    启动验证:

    cd logstash

    bin/logstash -e 'input{stdin{}} output{stdout{}}',输出如下,代表启动成功:

    关闭

    ps -ef | grep logstash

    kill pid

    2.配置目录

    配置文件目录:logstash/config

    新建配置文件test.conf

    下载mysql-connector-java-5.1.30.jar,下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.30

    logstash目录下创建一个jar目录,用来存放jar文件 

    mkdir jar
    在logstash目录下创建一个sql目录,用来存放查询sql文件test.sql

    mkdir sql

    3.配置文件

    Logstash同步数据方式有全量同步和增量同步两种,不同方式配置文件有细微不同。第一次同步时需要全量的数据,之后则需要定时去同步增量数据,使用logstash需要了解一下事项:

    1.凡是SQL可以实现的logstash均可以实现(本就是通过sql查询数据)

    2.支持每次全量同步或按照特定字段(如自增ID、更新时间)增量同步

    3.同步频率可控,最快同步频率每分钟一次(如果对实效性要求较高,慎用)

    4.不支持被物理删除的数据同步物理删除ES中的数据(可在表设计中增加逻辑删除字段标识数据删除)

    全量同步

    test.conf文件内容如下:

    input {
      jdbc {
          # mysql 数据库链接,test为数据库名
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
          jdbc_user => "root"
          jdbc_password => "root"
    
          # 驱动路径
          jdbc_driver_library => "/usr/local/src/logstash/jar/mysql-connector-java-5.1.30-bin.jar"
    
          # 驱动类名
          jdbc_driver_class => "com.mysql.jdbc.Driver"
    
          #是否分页
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
    
          #直接执行sql语句
          #statement =>"select * from test"
          # 执行的sql 文件路径+名称
          statement_filepath => "/usr/local/src/logstash/sql/test.sql"
    
          #设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "* * * * *"
    
          # 索引类型
          #type => "jdbc"
        }
    
    }
    
    output {
      elasticsearch {
            #es的ip和端口
            hosts => ["http://127.0.0.1:9200"]
            #ES索引名称(自己定义的)
            index => "test_logstash"
            #文档类型(自己定义的)
            document_type => "test"
            #设置es中数据的id为数据库中的字段(一般设置为mysql中主键字段)
            document_id => "%{id}"
        }
        stdout {
            codec => json_lines
        }
    
    }

    test.sql文件内容如下:

    #sql查询语句,mysql中怎样写,此处就怎样写
    select
    * from table

    指定配置文件启动Logstash:

    cd logstash

    bin/logstash -f config/test.conf

    会看到mysql中数据会在屏幕显示,如果有报错,一般为配置文件出错。

    后台启动:nohup bin/logstash -f config/test.conf 

    增量同步

    test.conf文件内容如下:

    input {
      jdbc {
          # mysql 数据库链接,test为数据库名
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
          jdbc_user => "root"
          jdbc_password => "root"
    
          # 驱动路径
          jdbc_driver_library => "/usr/local/src/logstash/jar/mysql-connector-java-5.1.30-bin.jar"
    
          # 驱动类名
          jdbc_driver_class => "com.mysql.jdbc.Driver"
    
          #是否分页
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          
          record_last_run => true
          use_column_value => true
          tracking_column => "id"
          last_run_metadata_path => "/usr/local/src/logstash/last_run_record"
    
    
          #直接执行sql语句
          statement =>"statement =>"select * from test where id >:sql_last_value""
          # 执行的sql 文件路径+名称
          #statement_filepath => "/usr/local/src/logstash/sql/test.sql"
    
          #设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "* * * * *"
    
          # 索引类型
          #type => "jdbc"
        }
    
    }
    
    output {
      elasticsearch {
            #es的ip和端口
            hosts => ["http://127.0.0.1:9200"]
            #ES索引名称(自己定义的)
            index => "test_logstash"
            #文档类型(自己定义的)
            document_type => "test"
            #设置es中数据的id为数据库中的字段(一般设置为mysql中主键字段)
            document_id => "%{id}"
        }
        stdout {
            codec => json_lines
        }
    
    }

    红色部分是跟全量同步有区别的地方:

    record_last_run:记录最后运行结果

    use_column_value:记录字段

    tracking_column:记录字段名,这里就是id

    last_run_metadata_path:记录数据保存位置

    sql_last_value:下次在执行同步的时候会将这个值,赋给sql_last_value

    启动Logstash后,以后就是根据查询条件增量同步数据了。

    到此,使用Logstash同步Mysql数据到Es就算完成了,各位如果觉得还有点意义,烦请点一下推荐,加个关注,互相交流,如果安装过程有任何问题或者发现错误,都可以留言交流,共同进步! 

  • 相关阅读:
    php源码学习——开篇
    springMvc入门一
    spring 整合 servlet
    java jar包下载地址
    java spring学习
    Servlet 学习
    JRE_HOME environment variable is not defined correctly This environment variableis needed to run this program
    java JBDC操作
    Java 自定义异常
    JAVA的日期类DATE
  • 原文地址:https://www.cnblogs.com/yqzc/p/12343264.html
Copyright © 2011-2022 走看看