zoukankan      html  css  js  c++  java
  • MySQL数据实时增量同步到Kafka

    • 写在前面的话

      需求,将MySQL里的数据实时增量同步到Kafka。接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka。不过对比了一些工具,例如:Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些工具实现,配置了就可以读binlog,而client端是需要我们动手编写程序的,远没有达到我即插即用的期望和懒人的标准。

      再来看看flume,只需要写一个配置文件,就可以完成数据同步的操作。官网:http://flume.apache.org/FlumeUserGuide.html#flume-sources。它的数据源默认是没有读取binlog日志实现的,也没有读数据库表的官方实现,只能用开源的自定义source:https://github.com/keedio/flume-ng-sql-source

    • 同步的格式

      原作者的插件flume-ng-sql-source只支持csv的格式,如果开始同步之后,数据库表需要增减字段,则会给开发者造成很大的困扰。所以我添加了一个分支版本,用来将数据以JSON的格式,同步到kafka,字段语义更加清晰。

      sql-json插件包下载地址:https://github.com/yucy/flume-ng-sql-source-json/releases/download/1.0/flume-ng-sql-source-json-1.0.jar

      将此jar包下载之后,和相应的数据库驱动包,一起放到flume的lib目录之下即可。

    • 处理机制

    flume-ng-sql-source在【status.file.name】文件中记录读取数据库表的偏移量,进程重启后,可以接着上次的进度,继续增量读表。

    • 启动说明

    说明:启动命令里的【YYYYMM=201711】,会传入到flume.properties里面,替换${YYYYMM}

    [test@localhost ~]$ YYYYMM=201711 bin/flume-ng agent -c conf -f conf/flume.properties -n sync &

     -c:表示配置文件的目录,在此我们配置了flume-env.sh,也在conf目录下;

     -f:指定配置文件,这个配置文件必须在全局选项的--conf参数定义的目录下,就是说这个配置文件要在前面配置的conf目录下面;

     -n:表示要启动的agent的名称,也就是我们flume.properties配置文件里面,配置项的前缀,这里我们配的前缀是【sync】;

    • flume的配置说明

    • flume-env.sh
    # 配置JVM堆内存和java运行参数,配置-DpropertiesImplementation参数是为了在flume.properties配置文件中使用环境变量
    export JAVA_OPTS="-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"

     关于propertiesImplementation参数的官方说明:http://flume.apache.org/FlumeUserGuide.html#using-environment-variables-in-configuration-files

    • flume.properties
    # 数据来源
    sync.sources = s-1
    # 数据通道
    sync.channels = c-1
    # 数据去处,这里配置了failover,根据下面的优先级配置,会先启用k-1,k-1挂了后再启用k-2
    sync.sinks = k-1 k-2
    
    #这个是配置failover的关键,需要有一个sink group
    sync.sinkgroups = g-1
    sync.sinkgroups.g-1.sinks = k-1 k-2
    #处理的类型是failover
    sync.sinkgroups.g-1.processor.type = failover
    #优先级,数字越大优先级越高,每个sink的优先级必须不相同
    sync.sinkgroups.g-1.processor.priority.k-1 = 5
    sync.sinkgroups.g-1.processor.priority.k-2 = 10
    #设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
    sync.sinkgroups.g-1.processor.maxpenalty = 10000
    
    ########## 数据通道的定义
    # 数据量不大,直接放内存。其实还可以放在JDBC,kafka或者磁盘文件等
    sync.channels.c-1.type = memory
    # 通道队列的最大长度
    sync.channels.c-1.capacity = 100000
    # putList和takeList队列的最大长度,sink从capacity中抓取batchsize个event,放到这个队列。所以此参数最好比capacity小,比sink的batchsize大。
    # 官方定义:The maximum number of events the channel will take from a source or give to a sink per transaction.
    sync.channels.c-1.transactionCapacity = 1000
    sync.channels.c-1.byteCapacityBufferPercentage = 20
    ### 默认值的默认值等于JVM可用的最大内存的80%,可以不配置
    # sync.channels.c-1.byteCapacity = 800000
    
    #########sql source#################
    # source s-1用到的通道,和sink的通道要保持一致,否则就GG了
    sync.sources.s-1.channels=c-1
    ######### For each one of the sources, the type is defined
    sync.sources.s-1.type = org.keedio.flume.source.SQLSource
    sync.sources.s-1.hibernate.connection.url = jdbc:mysql://192.168.1.10/testdb?useSSL=false
    ######### Hibernate Database connection properties
    sync.sources.s-1.hibernate.connection.user = test
    sync.sources.s-1.hibernate.connection.password = 123456
    sync.sources.s-1.hibernate.connection.autocommit = true
    sync.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    sync.sources.s-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    sync.sources.s-1.run.query.delay=10000
    sync.sources.s-1.status.file.path = /home/test/apache-flume-1.8.0-bin/status
    # 用上${YYYYMM}环境变量,是因为我用的测试表示一个月表,每个月的数据会放到相应的表里。使用方式见上面的启动说明
    sync.sources.s-1.status.file.name = test_${YYYYMM}.status
    ######## Custom query
    sync.sources.s-1.start.from = 0
    sync.sources.s-1.custom.query = select * from t_test_${YYYYMM} where id > $@$ order by id asc
    sync.sources.s-1.batch.size = 100
    sync.sources.s-1.max.rows = 100
    sync.sources.s-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    sync.sources.s-1.hibernate.c3p0.min_size=5
    sync.sources.s-1.hibernate.c3p0.max_size=20
    
    ######### sinks 1
    # sink k-1用到的通道,和source的通道要保持一致,否则取不到数据
    sync.sinks.k-1.channel = c-1
    sync.sinks.k-1.type = org.apache.flume.sink.kafka.KafkaSink
    sync.sinks.k-1.kafka.topic = sync-test
    sync.sinks.k-1.kafka.bootstrap.servers = localhost:9092
    sync.sinks.k-1.kafka.producer.acks = 1
    # 每批次处理的event数量
    sync.sinks.k-1.kafka.flumeBatchSize  = 100
    
    ######### sinks 2
    # sink k-2用到的通道,和source的通道要保持一致,否则取不到数据
    sync.sinks.k-2.channel = c-1
    sync.sinks.k-2.type = org.apache.flume.sink.kafka.KafkaSink
    sync.sinks.k-2.kafka.topic = sync-test
    sync.sinks.k-2.kafka.bootstrap.servers = localhost:9092
    sync.sinks.k-2.kafka.producer.acks = 1
    sync.sinks.k-2.kafka.flumeBatchSize  = 100

    关于putList和takeList与capacity的关系,引用:http://blog.csdn.net/u012948976/article/details/51760546

    flume各部分参数含义

    flume架构详情

    • batchData的大小见参数:batchSize
    • PutList和TakeList的大小见参数:transactionCapactiy
    • Channel总容量大小见参数:capacity
    •   问题记录

    异常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J

    分析:由于我用的是flume1.8,而flume-ng-sql-1.4.3插件对应的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了两个方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失败补偿暂停线程处理时,需要用到这个方法。

    解决方法:更新flume-ng-sql-1.4.3里依赖的flume-ng-core版本为1.8.0,并在源代码【SQLSource.java】里添加这两个方法即可。

    @Override
    public long getBackOffSleepIncrement() {
        return 1000;
    }
    @Override
    public long getMaxBackOffSleepInterval() {
        return 5000;
    }
  • 相关阅读:
    hdu 3342 Legal or Not 拓排序
    hdu 1596 find the safest road Dijkstra
    hdu 1874 畅通工程续 Dijkstra
    poj 2676 sudoku dfs
    poj 2251 BFS
    poj Prime Path BFS
    poj 3278 BFS
    poj 2387 Dijkstra 模板
    poj 3083 DFS 和BFS
    poj 1062 昂贵的聘礼 dijkstra
  • 原文地址:https://www.cnblogs.com/yucy/p/7845105.html
Copyright © 2011-2022 走看看