jdbc插件
- 增量同步和全量同步:
- 全量同步:是指将全部数据同步到es(通常是刚建立es,第一次同步时使用)
- 增量同步:是指将后续的更新、插入的记录同步到es
- 增量同步更新的注意点:
- tracking_column的字段必须是递增的字段(如id或者时间字段)
- tracking_column字段是id,则只能增加数据,而无法修改数据
- tracking_column字段是时间字段(如event_date),才可以实现同步修改操作
- 参数说明:
- record_last_run:是否记录上次执行结果;如果为真,将会把上次执行到的tracking_column字段的最后一行的值记录下来,保存到 last_run_metadata_path
- use_column_value:是否需要记录某个column 的值,record_last_run 为真,可以自定义 track _column 名称, 否则默认 track_column 的是 timestamp 的值
- tracking_column:如果 use_column_value 为真,需配置此参数;这个参数就是数据库给出的一个字段名称;该 column 必须是递增的。比如:记录时间的字段或者是自增的id
- last_run_metadata_path:指定文件,来记录上次执行到的tracking_column 字段的最后一行的值;可以在SQL语句中运用该值(WHERE MY_ID > :last_sql_value);last_sql_value 取得就是该文件中的值
- clean_run:是否清除 last_run_metadata_path 的记录;如果为真,那么每次都相当于从头开始查询所有的数据库记录
- lowercase_column_names:是否将 column 名称转小写
- statement:执行SQL语句
- statement_filepath:可以将SQL语句写入某个文件中,statement_filepath就是执行SQL文件的路径
- schedule:设置监听间隔,多久执行一次
- 从左到右的含义:分、时、日、月、年
- schedule => "* * * * *":全部为*默认含义为每分钟都更新
- schedule => "40 12 30 7 2020":年份不能写具体的年份,不然会报错: Error: invalid weekday expression (2020)
- schedule => "40 12 30 7 *"
- 注意点:
- SQL查询语句是一次读取一批数据,tracking_column对应的字段中最后一行的值存储在last_run_metadata_path对应的文件中,而不是tracking_column对应的字段值中最大的数据存储在last_run_metadata_path对应的文件中
- 时区问题:
- UTC:称为通用协调时间;英国伦敦的本地时相同
- UTC + 时区差=本地时间
- 北京时区是东八区,领先UTC 8个小时
- logstash和ES使用的UTC时间
- 全量同步实例:
input { jdbc { # mysql 数据库链接,test为数据库名:"jdbc:mysql://192.168.29.1:3306/test?useUnicode=true&characterEncoding=utf8" jdbc_connection_string => "jdbc:mysql://192.168.29.1:3306/test" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动 jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" #是否分页 jdbc_paging_enabled => "true" jdbc_page_size => "50000" #直接执行sql语句 statement =>"select * from MdL_001" # 执行的sql 文件路径+名称 #statement_filepath => "/opt/data/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" # 索引类型 type => "jdbc" }
} - 增量同步实例:
input { jdbc { # mysql 数据库链接,test为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.29.1:3306/test" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动 jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" #处理中文乱码问题 codec => plain { charset => "UTF-8"} #使用其它字段追踪,而不是用时间 use_column_value => true #追踪的字段(使用MySQL中的时间字段而不使用自增的id字段;因为自增的id字段无法实现更新问题,因为后面需要更新的id值会小于last_run_metadata_path记录的值而无法实现更新操作) #tracking_column => id tracking_column => event_date record_last_run => true #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 last_run_metadata_path => "/opt/data/station_parameter.txt" #开启分页查询 jdbc_paging_enabled => true jdbc_page_size => 50000 #直接执行sql语句 statement =>"select * from mdl_001 id > :sql_last_value" # 执行的sql 文件路径+名称 #statement_filepath => "/opt/data/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" # 索引类型 type => "jdbc" } }