zoukankan      html  css  js  c++  java
  • Elasticsearch 同步mysql数据

    将mysqL 数据同步到elsearch中(同步而非导入,当数据变化时el数据跟着变化),有两种情况,单表数据和联表数据。

    0.在bin目录放jdbc驱动jar包

    1.logstash 的配置尤为重要

    参考:

    https://blog.csdn.net/qq_16436555/article/details/9136071

       input { jdbc {
         #数据库地址,用户,密码 jdbc_connection_string
    => "jdbc:mysql://ip:3306/test" jdbc_user => "root" jdbc_password => "root"
         #驱动位置配置
    #此处的路径最好是绝对路径,行对路径取决与允许命令的目录 jdbc_driver_library => "sync-conf/mysql-connector-java-6.0.6.jar" jdbc_driver_class => "com.mysql.jdbc.Driver"
         #开启分页查询
    jdbc_paging_enabled => "true" jdbc_page_size => "50000"
     

          #决定增量同步的关键

          #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录

          clean_run => false
         #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称
       ,使用其它字段追踪,而不是用时间,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.   use_column_value => true    #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的    tracking_column => create_time
    #只有两种选择 numeric,timestamp
         tracking_column_type=>"timestamp"   #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中    record_last_run => true #在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :sql_last_value 取得就是该文件中的值 last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" #是否将字段名称转小写。 #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true, #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分 lowercase_column_names => false #你的SQL的位置,当然,你的SQL也可以直接写在这里。 #statement => SELECT * FROM tabeName t WHERE t.creat_time > :sql_last_value order by creat_time asc
       statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" #sql 文件执行路径,最好绝对路径。 
    #注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
    #如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。 }
        # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
        schedule => "* * * * *"
    #用于区分输入的数据源,可以在output中区分输出到哪里 type => "user" } } output { #设置窗口日志输出 stdout { codec => json_lines }
    if[type] == "user"{
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            #注意index的值不支持大写字母
            index => "user"
            #document_type自行设置,不设置时,默认为doc
            #document_type => ""
            #此处的值来自查询sql中的列名称,根据需要自行配置
            document_id => "%{id}"
        }

    }
    }

    重点:

    1.tracking_column 不配置是默认根据timestamp追踪,tracking_column 配了相应的use_column_value => true

    2.设置上次追踪的位置记录,就开启record_last_run => true,

    在增量同步时,根据tracking 的变化会更新el数据,所以在sql语句中要添加 :sql_last_value order by tracking_column asc 

    这样会往 last_run_metadata_path 文件里写最新的racking_column值

      
  • 相关阅读:
    怎样使用Secure CRT查看vcenter和esxi主机的日志文件(转)
    Linux下如何查看系统启动时间和运行时间
    Java使用线程并发库模拟弹夹装弹以及发射子弹的过程
    使用Java线程并发库实现两个线程交替打印的线程题
    Android Exception Type "share_dialog_title" is not translated in en, zh-rTW strings
    Java JDK1.5、1.6、1.7新特性整理
    Java 中long类型转换成为int类型时可能会出错的地方
    Java 将任意数组的任意两个位置的数据进行交换
    Java设置以及获取JavaBean私有属性进阶
    Java使用PropertyDescriptor获取实体类中私有属性的值,并给私有属性赋值
  • 原文地址:https://www.cnblogs.com/lingli-meng/p/11572693.html
Copyright © 2011-2022 走看看