zoukankan      html  css  js  c++  java
  • logstash实现elasticsearch与mysql数据库同步

    1. 业务场景:
      1. 表一是事件表,表二是规则表
      2. 产生一条数据,会同时写入事件表和对应的规则表
      3. 每个事件关联的风险规则可能有多条
      4. 修改:修改事件A中的某些字段的数据,规则表中事件A对应的规则字段不发生改变
    2. 业务需求:需要将MySQL中的两张表同步到ES中(新增、修改、删除同步),优化事件查询速度
    3. 解决方案:使用logstash-input-jdbc插件实现同步
    4. 注意点:
      1. 在mysql数据同步到ES中,第一次同步时需要全量的数据,之后则需要定时去同步增量数据即可
      2. logstash无法实现实时同步,最快是一分钟才执行一次(用的canal做实时)
      3. logstash-input-jdbc插件:实现了新增和更新同步功能,删除同步功能还没有实现
    5. 遇到的问题与解决方案:
      1. logstash从MySQL中读取数据时发现MySQL中的时间字段会相差8小时?
        1. 原因:logstash默认的时间是UTC时间,MySQL中的时区北京时区,所以会相差8个小时
        2. 解决方案:在连接数据库配置时候设置为UTC时间(在数据库连接配置后面加上:?useTimezone=true&&serverTimezone=UTC)
        3. 实例:jdbc_connection_string => "jdbc:mysql://192.168.80.1:3306/test?useTimezone=true&&serverTimezone=UTC"
      2. MySQL中trans_extend和trans_all字段为json格式的字符串,使用Json过滤器的时候,无法将数据写入Elasticsearch?
        1. 原因:类型不匹配;我在es中定义trans_extend和trans_all字段的数据类型是text类型,logstash过滤后的数据类型是JSON格式,导致无法写入
        2. 解决方案:
          1. 在ES中定义trans_extend和trans_all字段的数据类型为JSON格式的
          2. 在ES中不定义trans_extend和trans_all字段的数据类型,logstash写入ES时,系统自动生成对应的数据类型
      3. last_value中记录的时间值会比event_date的实际值大8个小时导致数据丢失?
        1. 例如:
          1. event_date最后一行值为:2020-01-05 09:56:47
          2. last_value中记录的值为:--- 2020-01-05 17:56:47.000000000 +08:00
        2. 原因:last_value文件记录的时区是本地时间,logstash中的event_date是的UTC的时间,所以会大8个小时
        3. 解决方案:在SQL查询语句中将event_date增加8小时即可(date_add(event_date, interval 8 hour))
        4. 实例:select info.*,rule.rule_id from bs_risk_events_info as info join bs_risk_events_rule as rule on info.risk_event_no=rule.event_no where date_add(event_date, interval 8 hour) > :sql_last_value

    全量同步配置文档

    input {
        jdbc {
            # mysql 数据库链接,test 为数据库名:
            jdbc_connection_string => "jdbc:mysql://192.168.80.1:3306/test?useTimezone=true&&serverTimezone=UTC"
            # 驱动
            jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            # 用户名和密码
            jdbc_user => "root"
            jdbc_password => "root"        
            #是否分页
            jdbc_paging_enabled => "true"
            jdbc_page_size => "50000"
            #直接执行sql语句
            statement =>"select info.*,rule.rule_id from bs_risk_events_info as info join bs_risk_events_rule as rule on info.risk_event_no=rule.event_no"
            #设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
            schedule => "27 9 14 8 *"
            # 索引类型
            type => "jdbc"
        }
    }
    filter {
        json {
            source => "trans_extend"
            target =>"trans_extend"            
        }
        json{
            source => "trans_all"
            target => "trans_all"
        }
    }
    output {
        elasticsearch {
            #es的ip和端口
            hosts => ["http://192.168.57.100:9200"]
            #ES索引名称(自己定义的)
            index => "riske_events"
            #文档类型
            document_type => "info_rule"
            #设置数据的id为数据库中的字段
            document_id => "%{risk_event_no}-%{rule_id}"
        }
    }

    增量同步配置文档

    input {
        jdbc {
            # mysql 数据库链接,test为数据库名
            jdbc_connection_string => "jdbc:mysql://192.168.80.1:3306/test?useTimezone=true&&serverTimezone=UTC"
            # 用户名和密码
            jdbc_user => "root"
            jdbc_password => "root"
            # 驱动
            jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            #处理中文乱码问题
            codec => plain { charset => "UTF-8"}
            #使用其它字段追踪,而不是用时间
            use_column_value => true
            #追踪的字段
            tracking_column => event_date
            record_last_run => true
            #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
            last_run_metadata_path => "/opt/moudles/logstash-5.5.2/logstash.conf/last_value"
            #开启分页查询
            jdbc_paging_enabled => true
            jdbc_page_size => 50000
            #直接执行sql语句
            statement =>"select info.*,rule.rule_id from bs_risk_events_info as info join bs_risk_events_rule as rule on info.risk_event_no=rule.event_no where date_add(event_date, interval 8 hour) > :sql_last_value"
            # 执行的sql 文件路径+名称
            #statement_filepath => "/opt/data/jdbc.sql"
            # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
            schedule => "* * * * *"
            # 索引类型
            type => "jdbc"
        }
    }
    filter {
        json {
            source => "trans_extend"
            target =>"trans_extend"
        }
        json{
            source => "trans_all"
            target => "trans_all"
        }
    }
    output {
        elasticsearch {
            #es的ip和端口
            hosts => ["http://192.168.57.100:9200"]
            #ES索引名称(自己定义的)
            index => "riske_events"
            #文档类型
            document_type => "info_rule"
            #设置数据的id为数据库中的字段
            document_id => "%{risk_event_no}-%{rule_id}"
        }
    }    

    删除同步实现方案

    1. 实现方案:
      1. MySQL中的记录中可通过添加 is_deleted 字段用以表明该条记录是否被软删除
      2. MySQL数据进行删除的时候不可以直接删除数据的, 必须是软删除(is_delete字段标记为删除)
      3. MySQL中一旦发生删除操作,is_deleted 也会同步更新到 ElasticSearch 中
      4. 编写后台程序将 MySQL 和 ElasticSearch 中的这些文档删除
    2. 修改后的的表结构:
      CREATE TABLE `bs_risk_events_info` (
      `risk_event_no` varchar(20) CHARACTER SET utf8mb4 NOT NULL COMMENT '事件编号',
      `event_date` datetime NOT NULL COMMENT '事件日期',
      `trans_extend` varchar(2000) DEFAULT NULL COMMENT '流水扩充信息',
      `trans_all` text COMMENT '流水全部信息',
      `trans_amt` double(255,0) DEFAULT NULL COMMENT '交易金额',
      `is_delete` tinyint(10) DEFAULT '1' COMMENT '是否被删除;0表示删除,1表示没有删除,默认为1',
      PRIMARY KEY (`risk_event_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    3. 编写后台程序对MySQL和ES中的数据进行删除同步 :
      1. 删除MySQL中的数据:
        delete from bs_risk_events_rule where event_no in (select risk_event_no from bs_risk_events_info where is_delete = 0)
        delete from bs_risk_events_info where is_delete = 0
      2. 删除Elasticsearch中的数据
        GET riske_events/_delete_by_query 
        {
          "query":{
            "term":{
              "is_delete":0
            }
          }
        }
  • 相关阅读:
    (vue.js)vue中怎么设置select默认选中
    owin搭载静态网页
    Vue在IE11下不兼容,最后问题竟然出现在ie的缓存上
    setTimeout或者setInterval传递参数的解决办法
    jquery 发送post请求访问webapi,无法接到参数
    64位进程和32位进程通信问题,接收端收不到 SendMessage发送的消息
    关于C#设置Form的visible属性隐藏问题
    微信 支付支付授权目录 修改
    微信支付(20140923更新)商户支付密钥key的生成与设置
    申请微信公众平台是服务号,营业执照是个体工商户,没有对公账户,能申请微信支付功能吗?
  • 原文地址:https://www.cnblogs.com/WeiKing/p/13497513.html
Copyright © 2011-2022 走看看