zoukankan      html  css  js  c++  java
  • elasticsearch -- Logstash实现mysql同步数据到elasticsearch

    配置

    1. 安装插件
      由于这里是从mysql同步数据到elasticsearch,所以需要安装jdbc的入插件和elasticsearch的出插件:logstash-input-jdbc、logstash-output-elasticsearch
      安装效果图如下所示:

    2. 下载mysql连接库
      由于logstash是ruby开发的,所以这里要下载mysql的连接库jar包,从官网下载,我这里下载的是:mysql-connector-java-5.1.46.jar
      将下载好的mysql-connector-java-5.1.46.jar,放至/usr/local/logstash/config/目录下。
    3. 修改配置文件
      在config目录下,创建配置文件(logstash-mysql-es.conf):
      input {
        jdbc {
          # mysql相关jdbc配置
          jdbc_connection_string => "jdbc:mysql://10.112.76.30:3306/jack_test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
          jdbc_user => "root"
          jdbc_password => "123456"
      
          # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
          jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar"
          # the name of the driver class for mysql
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => true
          jdbc_page_size => "50000"
      
          jdbc_default_timezone =>"Asia/Shanghai"
      
          # mysql文件, 也可以直接写SQL语句在此处,如下:
          # statement => "select * from t_order where update_time >= :sql_last_value"
          statement_filepath => "./config/jdbc.sql"
      
          # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
          schedule => "* * * * *"
          #type => "jdbc"
      
          # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
          #record_last_run => true
      
          # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
          use_column_value => true
      
          # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
          tracking_column => "update_time"
          
          tracking_column_type => "timestamp"
      
          last_run_metadata_path => "./logstash_capital_bill_last_id"
      
          # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
          clean_run => false
      
          #是否将 字段(column) 名称转小写
          lowercase_column_names => false
        }
      }
      
      output {
        elasticsearch {
          hosts => "10.112.76.31:9200"
          index => "mysql_order"
          document_id => "%{id}"
          template_overwrite => true
        }
      
        # 这里输出调试,正式运行时可以注释掉
        stdout {
            codec => json_lines
        } 
      }
      这里有几个注意点:
      (1)jdbc_driver_library
              mysql-connector-java-5.1.46.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。如果配置不对,将会报“can ”错误。
      (2)sql_last_value
              标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。
      (3)statementstatement_filepath
              statement:执行同步的sql语句,可以同步部分数据。
              statement_filepath:存储执行同步的sql语句。不和statement同时使用。
      (4)schedule
              定时器,表示每隔多长时间同步一次数据。格式类似crontab。
      (5)tracking_columntracking_column_type
              tracking_column:表示表中哪一列用于判断logstash同步的位置信息。与sql_last_value比较判断是否需要同步这条数据。
              tracking_column_type:racking_column指定列的类型。支持两种类型:numeric(默认)、timestamp。注意:如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp。我就踩了这个大坑。。。一直同步不成功!!!
      (6)last_run_metadata_path
              存储sql_last_value值的文件名称及位置。
      (7)document_id
              生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。比如同步订单数据,可以使用订单号。

    启动

    在根目录下,执行命令:

    nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out &

    效果图如下:

    同步

    完成了一条数据的同步

  • 相关阅读:
    BZOJ1146:[CTSC2008]网络管理
    AtCoder Grand Contest 004 C:AND Grid
    BZOJ3295:[CQOI2011]动态逆序对
    AtCoder Regular Contest 070F:Honest Or Unkind
    BZOJ3110:[ZJOI2013]K大数查询
    BZOJ3196:二逼平衡树
    浅谈splay
    BZOJ3938:Robot
    浅谈标记永久化
    AtCoder Regular Contest 068E:Snuke Line
  • 原文地址:https://www.cnblogs.com/onekey/p/10256688.html
Copyright © 2011-2022 走看看