zoukankan      html  css  js  c++  java
  • 使用logstash同步Mysql数据表到ES的一点感悟

    针对单独一个数据表而言,大致可以分如下两种情况:
    1.该数据表中有一个根据当前时间戳更新的字段,此时监控的是这个时间戳字段
    具体可以看这个文章:https://www.cnblogs.com/sanduzxcvbnm/p/12858967.html
    示例:
    modification_time就是表中要监控的时间戳字段

    input {
    
      jdbc {   
        jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
        jdbc_user => "root"
        jdbc_password => "root"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_driver_library => ""
        jdbc_paging_enabled => true
        jdbc_page_size => "1000"
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        tracking_column_type => "numeric"
        schedule => "*/5 * * * * *"
        statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
        type => "es_table"
      }
      
    }
    

    2.该数据表中没有根据当前时间戳更新的字段,此时监控的是这个表中的其他字段,比如id字段或者其他
    参考一下这个文档:https://www.cnblogs.com/sanduzxcvbnm/p/12858474.html
    db_example是数据库名,es_table是数据表名

    jdbc {
    
      jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
      jdbc_user => "root"
      jdbc_password => "xxxxx"
      jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-8.0.20.jar"
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "500"
      use_column_value => "true"
      record_last_run => "true"
      tracking_column => "id"
      last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info" # 存储监控的id字段值
      clean_run => "false"
      statement => "select * from es_table where id > :sql_last_value"
      schedule => "* * * * *"
      type => "es_table"
    
    }
    

    或者如下这种直接指定id的起始值

        jdbc {
           jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
           jdbc_user => "root"
           jdbc_password => "root"
           jdbc_validate_connection => true
           jdbc_driver_library => ""
           jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
           parameters => { "id" => "1" }
           statement => "SELECT * FROM es_table WHERE id > :id"
           type => "es_table"
        }
    

    同步多个数据表

    需要在jdbc{}中新增一个type字段,比如上面所示,但是在第二个示例中,指定存储id值的文件不能是同一个,也就是last_run_metadata_path的值

    可以参考如下示例:

    input {
      jdbc {   
        jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
        jdbc_user => "root"
        jdbc_password => "root"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_driver_library => ""
        jdbc_paging_enabled => true
        jdbc_page_size => "1000"
        record_last_run => "true"   
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        tracking_column_type => "numeric"
        schedule => "*/5 * * * * *"
        statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
        type => "es_table"
      }
      
      
      jdbc {   
        jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
        jdbc_user => "root"
        jdbc_password => "root"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_driver_library => ""
        jdbc_paging_enabled => true
        jdbc_page_size => "1000"
        record_last_run => "true"   
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        tracking_column_type => "numeric"
        schedule => "*/5 * * * * *"
        statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table1 WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
        type => "es_table1"
      }
    }
    
    filter {
      ruby {
         code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
      }
      ruby {
         code => "event.set('@timestamp',event.get('timestamp'))"
      }
      mutate {
        copy => { "id" => "[@metadata][_id]"}
        remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]
      }
      
    }
    output {
     if [type]=="es_table" {
        elasticsearch {
            hosts => ["192.168.75.21:9200"]
         	index => "es_table_idx"
         	document_id => "%{[@metadata][_id]}"
            user => "elastic"
            password => "GmSjOkL8Pz8IwKJfWgLT"
        }
      }
     if [type]=="es_table1" {
        elasticsearch {
            hosts => ["192.168.75.21:9200"]
         	index => "es_table1_idx"
         	document_id => "%{[@metadata][_id]}"
            user => "elastic"
            password => "GmSjOkL8Pz8IwKJfWgLT"
        }
      }
    }
    
    
  • 相关阅读:
    java——注解Annotation
    java——maven
    sklearn——回归评估指标
    java——单例模式
    java——极简handler机制
    java——为什么要有接口?和抽象类有什么不一样?
    java——cmd命令编译带包名的源程序
    [loj 2478][luogu P4843]「九省联考 2018」林克卡特树
    「线性基」学习小结
    FOI 冬令营 Day6
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/12867452.html
Copyright © 2011-2022 走看看