zoukankan      html  css  js  c++  java
  • 几篇关于MySQL数据同步到Elasticsearch的文章---第三篇:logstash_output_kafka:Mysql同步Kafka深入详解

    文章转载自:
    https://mp.weixin.qq.com/s?__biz=MzI2NDY1MTA3OQ==&mid=2247484411&idx=1&sn=1f5a371095d61bd0d6461ed111dd252b&chksm=eaa82bd3dddfa2c5b08831bfd4221178b277f03ec74ef6c5a8f415409c21e569577fbc943f08&scene=21#wechat_redirect

    0、题记

    实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。
    [在这里插入图片描述]

    而mysql写入kafka的选型方案有:

    方案一:logstash_output_kafka 插件。
    方案二:kafka_connector。
    方案三:debezium 插件。
    方案四:flume。
    方案五:其他类似方案。

    其中:debezium和flume是基于mysql binlog实现的。

    如果需要同步历史全量数据+实时更新数据,建议使用logstash。
    1、logstash同步原理

    常用的logstash的插件是:logstash_input_jdbc实现关系型数据库到Elasticsearch等的同步。

    实际上,核心logstash的同步原理的掌握,有助于大家理解类似的各种库之间的同步。

    logstash核心原理:输入生成事件,过滤器修改它们,输出将它们发送到其他地方。

    logstash核心三部分组成:input、filter、output。
    [在这里插入图片描述]

    input { }
    filter { }
    output { }

    1.1 input输入

    包含但远不限于:

    jdbc:关系型数据库:mysql、oracle等。
    
    file:从文件系统上的文件读取。
    
    syslog:在已知端口514上侦听syslog消息。
    
    redis:redis消息。beats:处理 Beats发送的事件。
    
    kafka:kafka实时数据流。
    

    1.2 filter过滤器

    过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。

    可以把它比作数据处理的ETL环节。

    一些有用的过滤包括:

    grok:解析并构造任意文本。Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式!
    
    mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。
    
    drop:完全删除事件,例如调试事件。
    
    clone:制作事件的副本,可能添加或删除字段。
    
    geoip:添加有关IP地址的地理位置的信息。
    

    1.3 output输出

    输出是Logstash管道的最后阶段。一些常用的输出包括:

    elasticsearch:将事件数据发送到Elasticsearch。
    
    file:将事件数据写入磁盘上的文件。
    
    kafka:将事件写入Kafka。
    

    详细的filter demo参考:http://t.cn/EaAt4zP
    2、同步Mysql到kafka配置参考

    input {
    jdbc {
    jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"
    jdbc_user => "root"
    jdbc_password => "xxxxxxx"
    jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    #schedule => "* * * * *"
    statement => "SELECT * from news_info WHERE id > :sql_last_value order by id"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric"
    record_last_run => true
    last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"

    }
    

    }

    filter {
    ruby{
    code => "event.set('gather_time_unix',event.get('gather_time').to_i1000)"
    }
    ruby{
    code => "event.set('publish_time_unix',event.get('publish_time').to_i
    1000)"
    }
    mutate {
    remove_field => [ "@version" ]
    remove_field => [ "@timestamp" ]
    remove_field => [ "gather_time" ]
    remove_field => [ "publish_time" ]
    }
    }

    output {
    kafka {
    bootstrap_servers => "192.168.1.13:9092"
    codec => json_lines
    topic_id => "mytopic"

    }
    file {
            codec => json_lines
            path => "/tmp/output_a.log"
    }
    

    }

    以上内容不复杂,不做细讲。

    注意:
    Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。

    code =>
    "event.set('gather_time_unix',event.get('gather_time').to_i*1000)",
    

    是将Mysql中的时间格式转化为时间戳格式。
    3、坑总结
    3.1 坑1字段大小写问题

    from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names
    => "false"  这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。
    

    最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false" 。记录下来希望可以帮到更多人。
    3.2 同步到ES中的数据会不会重复?

    想将关系数据库的数据同步至ES中,如果在集群的多台服务器上同时启动logstash。
    

    解读:实际项目中就是没用随机id 使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据
    3.3 相同配置logstash,升级6.3之后不能同步数据。

    解读:高版本基于时间增量有优化。
    

    tracking_column_type => "timestamp"应该是需要指定标识为时间类型,默认为数字类型numeric
    3.4 ETL字段统一在哪处理?

    解读:可以logstash同步mysql的时候sql查询阶段处理,如:select a_value as avalue***。

    或者filter阶段处理,mutate rename处理。

    mutate {
    rename => ["shortHostname", "hostname" ]
    }

    或者kafka阶段借助kafka stream处理。
    4、小结

    相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。
    
    需要结合实际业务场景做深入的研究和性能分析。
    
    有问题,欢迎留言讨论。
    

  • 相关阅读:
    vi编辑器命令大全
    Ubuntu环境搭建svn服务器
    Visual Studio中“后期生成事件命令行” 中使用XCopy命令
    解决 Visual Studio For Mac 还原包失败问题
    [Win10应用开发] 如何使用Windows通知
    [Win10应用开发] 使用 Windows 推送服务
    如何使用 PsExec 执行远程命令
    如何在调试Window App时,触发 Suspending ,Resuming 等事件
    如何 “解决” WPF中空域问题(Airspace issuse)
    浅谈可扩展性框架:MEF
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/12877729.html
Copyright © 2011-2022 走看看