zoukankan      html  css  js  c++  java
  • MySQL数据实时增量同步到Kafka

    转载自:https://www.cnblogs.com/yucy/p/7845105.html

    MySQL数据实时增量同步到Kafka - Flume

     
    • 写在前面的话

      需求,将MySQL里的数据实时增量同步到Kafka。接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka。不过对比了一些工具,例如:Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些工具实现,配置了就可以读binlog,而client端是需要我们动手编写程序的,远没有达到我即插即用的期望和懒人的标准。

      再来看看flume,只需要写一个配置文件,就可以完成数据同步的操作。官网:http://flume.apache.org/FlumeUserGuide.html#flume-sources。它的数据源默认是没有读取binlog日志实现的,也没有读数据库表的官方实现,只能用开源的自定义source:https://github.com/keedio/flume-ng-sql-source

    • 同步的格式

      原作者的插件flume-ng-sql-source只支持csv的格式,如果开始同步之后,数据库表需要增减字段,则会给开发者造成很大的困扰。所以我添加了一个分支版本,用来将数据以JSON的格式,同步到kafka,字段语义更加清晰。

      sql-json插件包下载地址:https://github.com/yucy/flume-ng-sql-source-json/releases/download/1.0/flume-ng-sql-source-json-1.0.jar

      将此jar包下载之后,和相应的数据库驱动包,一起放到flume的lib目录之下即可。

    • 处理机制

    flume-ng-sql-source在【status.file.name】文件中记录读取数据库表的偏移量,进程重启后,可以接着上次的进度,继续增量读表。

     

    • 启动说明

    说明:启动命令里的【YYYYMM=201711】,会传入到flume.properties里面,替换${YYYYMM}

    
    
    1. [test@localhost ~]$ YYYYMM=201711 bin/flume-ng agent -c conf -f conf/flume.properties -n sync &

     -c:表示配置文件的目录,在此我们配置了flume-env.sh,也在conf目录下;

     -f:指定配置文件,这个配置文件必须在全局选项的--conf参数定义的目录下,就是说这个配置文件要在前面配置的conf目录下面;

     -n:表示要启动的agent的名称,也就是我们flume.properties配置文件里面,配置项的前缀,这里我们配的前缀是【sync】;

     

    • flume的配置说明

    • flume-env.sh
    
    
    1. # 配置JVM堆内存和java运行参数,配置-DpropertiesImplementation参数是为了在flume.properties配置文件中使用环境变量
    2. export JAVA_OPTS="-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"

     关于propertiesImplementation参数的官方说明:http://flume.apache.org/FlumeUserGuide.html#using-environment-variables-in-configuration-files

     

    • flume.properties

     

    复制代码
    
    
    1. # 数据来源
    2. sync.sources = s-1
    3. # 数据通道
    4. sync.channels = c-1
    5. # 数据去处,这里配置了failover,根据下面的优先级配置,会先启用k-1,k-1挂了后再启用k-2
    6. sync.sinks = k-1 k-2
    7.  
    8. #这个是配置failover的关键,需要有一个sink group
    9. sync.sinkgroups = g-1
    10. sync.sinkgroups.g-1.sinks = k-1 k-2
    11. #处理的类型是failover
    12. sync.sinkgroups.g-1.processor.type = failover
    13. #优先级,数字越大优先级越高,每个sink的优先级必须不相同
    14. sync.sinkgroups.g-1.processor.priority.k-1 = 5
    15. sync.sinkgroups.g-1.processor.priority.k-2 = 10
    16. #设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
    17. sync.sinkgroups.g-1.processor.maxpenalty = 10000
    18.  
    19. ########## 数据通道的定义
    20. # 数据量不大,直接放内存。其实还可以放在JDBC,kafka或者磁盘文件等
    21. sync.channels.c-1.type = memory
    22. # 通道队列的最大长度
    23. sync.channels.c-1.capacity = 100000
    24. # putList和takeList队列的最大长度,sink从capacity中抓取batchsize个event,放到这个队列。所以此参数最好比capacity小,比sink的batchsize大。
    25. # 官方定义:The maximum number of events the channel will take from a source or give to a sink per transaction.
    26. sync.channels.c-1.transactionCapacity = 1000
    27. sync.channels.c-1.byteCapacityBufferPercentage = 20
    28. ### 默认值的默认值等于JVM可用的最大内存的80%,可以不配置
    29. # sync.channels.c-1.byteCapacity = 800000
    30.  
    31. #########sql source#################
    32. # source s-1用到的通道,和sink的通道要保持一致,否则就GG了
    33. sync.sources.s-1.channels=c-1
    34. ######### For each one of the sources, the type is defined
    35. sync.sources.s-1.type = org.keedio.flume.source.SQLSource
    36. sync.sources.s-1.hibernate.connection.url = jdbc:mysql://192.168.1.10/testdb?useSSL=false
    37. ######### Hibernate Database connection properties
    38. sync.sources.s-1.hibernate.connection.user = test
    39. sync.sources.s-1.hibernate.connection.password = 123456
    40. sync.sources.s-1.hibernate.connection.autocommit = true
    41. sync.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    42. sync.sources.s-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    43. sync.sources.s-1.run.query.delay=10000
    44. sync.sources.s-1.status.file.path = /home/test/apache-flume-1.8.0-bin/status
    45. # 用上${YYYYMM}环境变量,是因为我用的测试表示一个月表,每个月的数据会放到相应的表里。使用方式见上面的启动说明
    46. sync.sources.s-1.status.file.name = test_${YYYYMM}.status
    47. ######## Custom query
    48. sync.sources.s-1.start.from = 0
    49. sync.sources.s-1.custom.query = select * from t_test_${YYYYMM} where id > $@$ order by id asc
    50. sync.sources.s-1.batch.size = 100
    51. sync.sources.s-1.max.rows = 100
    52. sync.sources.s-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    53. sync.sources.s-1.hibernate.c3p0.min_size=5
    54. sync.sources.s-1.hibernate.c3p0.max_size=20
    55.  
    56. ######### sinks 1
    57. # sink k-1用到的通道,和source的通道要保持一致,否则取不到数据
    58. sync.sinks.k-1.channel = c-1
    59. sync.sinks.k-1.type = org.apache.flume.sink.kafka.KafkaSink
    60. sync.sinks.k-1.kafka.topic = sync-test
    61. sync.sinks.k-1.kafka.bootstrap.servers = localhost:9092
    62. sync.sinks.k-1.kafka.producer.acks = 1
    63. # 每批次处理的event数量
    64. sync.sinks.k-1.kafka.flumeBatchSize  = 100
    65.  
    66. ######### sinks 2
    67. # sink k-2用到的通道,和source的通道要保持一致,否则取不到数据
    68. sync.sinks.k-2.channel = c-1
    69. sync.sinks.k-2.type = org.apache.flume.sink.kafka.KafkaSink
    70. sync.sinks.k-2.kafka.topic = sync-test
    71. sync.sinks.k-2.kafka.bootstrap.servers = localhost:9092
    72. sync.sinks.k-2.kafka.producer.acks = 1
    73. sync.sinks.k-2.kafka.flumeBatchSize  = 100
    复制代码

     

    关于putList和takeList与capacity的关系,引用:http://blog.csdn.net/u012948976/article/details/51760546

    flume各部分参数含义

    flume架构详情

    • batchData的大小见参数:batchSize
    • PutList和TakeList的大小见参数:transactionCapactiy
    • Channel总容量大小见参数:capacity

     

    •   问题记录

    异常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J

    分析:由于我用的是flume1.8,而flume-ng-sql-1.4.3插件对应的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了两个方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失败补偿暂停线程处理时,需要用到这个方法。

    解决方法:更新flume-ng-sql-1.4.3里依赖的flume-ng-core版本为1.8.0,并在源代码【SQLSource.java】里添加这两个方法即可。

    复制代码
    
    
    1. @Override
    2. public long getBackOffSleepIncrement() {
    3.     return 1000;
    4. }
    5. @Override
    6. public long getMaxBackOffSleepInterval() {
    7.     return 5000;
    8. }
  • 相关阅读:
    python3 numpy基本用法归纳总结
    MySQL 中的三中循环 while loop repeat 的基本用法
    什么是网关及网关作用
    网络扫描工具nmap
    nmap基本使用方法
    nmap脚本使用总结
    用Delphi将数据导入到Excel并控制Excel
    delphi Form属性设置 设置可实现窗体无最大化,并且不能拖大拖小(写一个WM_EXITSIZEMOVE的空函数)
    Delphi 数据类型列表
    一个队列类的实现(比delphi自带的速度快70倍)(线程安全版本)
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723717.html
Copyright © 2011-2022 走看看