zoukankan      html  css  js  c++  java
  • elasticsearch -- Logstash实现mysql同步数据到elasticsearch

    配置

    1. 安装插件
      由于这里是从mysql同步数据到elasticsearch,所以需要安装jdbc的入插件和elasticsearch的出插件:logstash-input-jdbc、logstash-output-elasticsearch
      安装效果图如下所示:

    2. 下载mysql连接库
      由于logstash是ruby开发的,所以这里要下载mysql的连接库jar包,从官网下载,我这里下载的是:mysql-connector-java-5.1.46.jar
      将下载好的mysql-connector-java-5.1.46.jar,放至/usr/local/logstash/config/目录下。
    3. 修改配置文件
      在config目录下,创建配置文件(logstash-mysql-es.conf):
      input {
        jdbc {
          # mysql相关jdbc配置
          jdbc_connection_string => "jdbc:mysql://10.112.76.30:3306/jack_test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
          jdbc_user => "root"
          jdbc_password => "123456"
      
          # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
          jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar"
          # the name of the driver class for mysql
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => true
          jdbc_page_size => "50000"
      
          jdbc_default_timezone =>"Asia/Shanghai"
      
          # mysql文件, 也可以直接写SQL语句在此处,如下:
          # statement => "select * from t_order where update_time >= :sql_last_value"
          statement_filepath => "./config/jdbc.sql"
      
          # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
          schedule => "* * * * *"
          #type => "jdbc"
      
          # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
          #record_last_run => true
      
          # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
          use_column_value => true
      
          # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
          tracking_column => "update_time"
          
          tracking_column_type => "timestamp"
      
          last_run_metadata_path => "./logstash_capital_bill_last_id"
      
          # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
          clean_run => false
      
          #是否将 字段(column) 名称转小写
          lowercase_column_names => false
        }
      }
      
      output {
        elasticsearch {
          hosts => "10.112.76.31:9200"
          index => "mysql_order"
          document_id => "%{id}"
          template_overwrite => true
        }
      
        # 这里输出调试,正式运行时可以注释掉
        stdout {
            codec => json_lines
        } 
      }
      这里有几个注意点:
      (1)jdbc_driver_library
              mysql-connector-java-5.1.46.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。如果配置不对,将会报“can ”错误。
      (2)sql_last_value
              标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。
      (3)statementstatement_filepath
              statement:执行同步的sql语句,可以同步部分数据。
              statement_filepath:存储执行同步的sql语句。不和statement同时使用。
      (4)schedule
              定时器,表示每隔多长时间同步一次数据。格式类似crontab。
      (5)tracking_columntracking_column_type
              tracking_column:表示表中哪一列用于判断logstash同步的位置信息。与sql_last_value比较判断是否需要同步这条数据。
              tracking_column_type:racking_column指定列的类型。支持两种类型:numeric(默认)、timestamp。注意:如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp。我就踩了这个大坑。。。一直同步不成功!!!
      (6)last_run_metadata_path
              存储sql_last_value值的文件名称及位置。
      (7)document_id
              生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。比如同步订单数据,可以使用订单号。

    启动

    在根目录下,执行命令:

    nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out &

    效果图如下:

    同步

    完成了一条数据的同步

  • 相关阅读:
    博客园
    未释放的已删除文件
    ssh连接缓慢
    剑指 Offer 38. 字符串的排列
    剑指 Offer 37. 序列化二叉树
    剑指 Offer 50. 第一个只出现一次的字符
    剑指 Offer 36. 二叉搜索树与双向链表
    剑指 Offer 35. 复杂链表的复制
    剑指 Offer 34. 二叉树中和为某一值的路径
    剑指 Offer 33. 二叉搜索树的后序遍历序列
  • 原文地址:https://www.cnblogs.com/onekey/p/10256688.html
Copyright © 2011-2022 走看看