- 业务场景:
- 表一是事件表,表二是规则表
- 产生一条数据,会同时写入事件表和对应的规则表
- 每个事件关联的风险规则可能有多条
- 修改:修改事件A中的某些字段的数据,规则表中事件A对应的规则字段不发生改变
- 业务需求:需要将MySQL中的两张表同步到ES中(新增、修改、删除同步),优化事件查询速度
- 解决方案:使用logstash-input-jdbc插件实现同步
- 注意点:
- 在mysql数据同步到ES中,第一次同步时需要全量的数据,之后则需要定时去同步增量数据即可
- logstash无法实现实时同步,最快是一分钟才执行一次(用的canal做实时)
- logstash-input-jdbc插件:实现了新增和更新同步功能,删除同步功能还没有实现
- 遇到的问题与解决方案:
- logstash从MySQL中读取数据时发现MySQL中的时间字段会相差8小时?
- 原因:logstash默认的时间是UTC时间,MySQL中的时区北京时区,所以会相差8个小时
- 解决方案:在连接数据库配置时候设置为UTC时间(在数据库连接配置后面加上:?useTimezone=true&&serverTimezone=UTC)
- 实例:jdbc_connection_string => "jdbc:mysql://192.168.80.1:3306/test?useTimezone=true&&serverTimezone=UTC"
- MySQL中trans_extend和trans_all字段为json格式的字符串,使用Json过滤器的时候,无法将数据写入Elasticsearch?
- 原因:类型不匹配;我在es中定义trans_extend和trans_all字段的数据类型是text类型,logstash过滤后的数据类型是JSON格式,导致无法写入
- 解决方案:
- 在ES中定义trans_extend和trans_all字段的数据类型为JSON格式的
- 在ES中不定义trans_extend和trans_all字段的数据类型,logstash写入ES时,系统自动生成对应的数据类型
- last_value中记录的时间值会比event_date的实际值大8个小时导致数据丢失?
- 例如:
- event_date最后一行值为:2020-01-05 09:56:47
- last_value中记录的值为:--- 2020-01-05 17:56:47.000000000 +08:00
- 原因:last_value文件记录的时区是本地时间,logstash中的event_date是的UTC的时间,所以会大8个小时
- 解决方案:在SQL查询语句中将event_date增加8小时即可(date_add(event_date, interval 8 hour))
- 实例:select info.*,rule.rule_id from bs_risk_events_info as info join bs_risk_events_rule as rule on info.risk_event_no=rule.event_no where date_add(event_date, interval 8 hour) > :sql_last_value
全量同步配置文档
input {
jdbc {
# mysql 数据库链接,test 为数据库名:
jdbc_connection_string => "jdbc:mysql://192.168.80.1:3306/test?useTimezone=true&&serverTimezone=UTC"
# 驱动
jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
#是否分页
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#直接执行sql语句
statement =>"select info.*,rule.rule_id from bs_risk_events_info as info join bs_risk_events_rule as rule on info.risk_event_no=rule.event_no"
#设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "27 9 14 8 *"
# 索引类型
type => "jdbc"
}
}
filter {
json {
source => "trans_extend"
target =>"trans_extend"
}
json{
source => "trans_all"
target => "trans_all"
}
}
output {
elasticsearch {
#es的ip和端口
hosts => ["http://192.168.57.100:9200"]
#ES索引名称(自己定义的)
index => "riske_events"
#文档类型
document_type => "info_rule"
#设置数据的id为数据库中的字段
document_id => "%{risk_event_no}-%{rule_id}"
}
}
增量同步配置文档
input {
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.80.1:3306/test?useTimezone=true&&serverTimezone=UTC"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
#处理中文乱码问题
codec => plain { charset => "UTF-8"}
#使用其它字段追踪,而不是用时间
use_column_value => true
#追踪的字段
tracking_column => event_date
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/moudles/logstash-5.5.2/logstash.conf/last_value"
#开启分页查询
jdbc_paging_enabled => true
jdbc_page_size => 50000
#直接执行sql语句
statement =>"select info.*,rule.rule_id from bs_risk_events_info as info join bs_risk_events_rule as rule on info.risk_event_no=rule.event_no where date_add(event_date, interval 8 hour) > :sql_last_value"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/data/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "jdbc"
}
}
filter {
json {
source => "trans_extend"
target =>"trans_extend"
}
json{
source => "trans_all"
target => "trans_all"
}
}
output {
elasticsearch {
#es的ip和端口
hosts => ["http://192.168.57.100:9200"]
#ES索引名称(自己定义的)
index => "riske_events"
#文档类型
document_type => "info_rule"
#设置数据的id为数据库中的字段
document_id => "%{risk_event_no}-%{rule_id}"
}
}
删除同步实现方案
- 实现方案:
- MySQL中的记录中可通过添加 is_deleted 字段用以表明该条记录是否被软删除
- MySQL数据进行删除的时候不可以直接删除数据的, 必须是软删除(is_delete字段标记为删除)
- MySQL中一旦发生删除操作,is_deleted 也会同步更新到 ElasticSearch 中
- 编写后台程序将 MySQL 和 ElasticSearch 中的这些文档删除
- 修改后的的表结构:
CREATE TABLE `bs_risk_events_info` (
`risk_event_no` varchar(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '事件编号',
`event_date` datetime NOT NULL COMMENT '事件日期',
`trans_extend` varchar(2000) DEFAULT NULL COMMENT '流水扩充信息',
`trans_all` text COMMENT '流水全部信息',
`trans_amt` double(255,0) DEFAULT NULL COMMENT '交易金额',
`is_delete` tinyint(10) DEFAULT '1' COMMENT '是否被删除;0表示删除,1表示没有删除,默认为1',
PRIMARY KEY (`risk_event_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
- 编写后台程序对MySQL和ES中的数据进行删除同步 :
- 删除MySQL中的数据:
delete from bs_risk_events_rule where event_no in (select risk_event_no from bs_risk_events_info where is_delete = 0)
delete from bs_risk_events_info where is_delete = 0
- 删除Elasticsearch中的数据
GET riske_events/_delete_by_query
{
"query":{
"term":{
"is_delete":0
}
}
}