zoukankan      html  css  js  c++  java
  • flume采集MongoDB数据到Kafka中

    环境说明

    • centos7(运行于vbox虚拟机)
    • flume1.9.0(自定义了flume连接mongodb的source插件)
    • jdk1.8
    • kafka(2.11)
    • zookeeper(3.57)
    • mongoDB4.0.0(无密码)
    • xshell 7

    自定义flume插件

    由于flume对数据库的支持欠缺,flume的source组件中,没有组件适用于连接关系型数据库或非关系型数据库。

    对于关系型数据库(RDB),github中开源插件flume-ng-sql-source被广泛用于对接RDB。但是对于非关系型数据库,不同的非关系型数据库之间都有些许差别,且没有一个统一的,或者配对的插件来支持非关系型数据库。

    因此,需要使用者自定义插件来适配。

    我自定义的flume-ng-mongodb-source的jar包如下:

    ()

    将该jar包放在yourpath/flume/lib下(yourpath指你flume文件夹前面路径,下同。同理,下文出现的yourhost指你本机的ip地址)

    连接mongodb的配置文件

    在mongodb中创建database和collection,用于测试。

    创建数据库:

    use flumetest
    

    创建集合(隐式创建):

    db.testCollection.insert({id:1,name:"333"})
    

    查看是否已经创建了数据库:

    > show dbs
    admin      0.000GB
    config     0.000GB
    flumetest  0.000GB
    local      0.000GB
    test       0.000GB
    

    查看集合中的数据:

    > db.testCollection.findOne()
    { "_id" : ObjectId("5fe29faad5553e6caaa8cbe9"), "id" : 1, "name" : "333" }
    

    此外,我们需要将mongodb相关的驱动jar包放到yourpath/flume/lib

    bson-3.12.7.jar
    mongo-java-driver-3.12.7.jar
    mongodb-driver-core-3.12.7.jar
    

    flume连接mongodb需要先编写相关的配置文件,在yourpath/flume/conf里新增配置文件mongo-flume.conf,具体的配置如下:

    #This is a model,you can use for test
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = com.wms.flumesource.MongoDBSource
    a1.sources.r1.Mongodb.url = yourhost:27017
    a1.sources.r1.Mongodb.database=flumetest
    a1.sources.r1.Mongodb.collection = testCollection
    a1.sources.r1.Mongodb.column= _id
    a1.sources.r1.start.from = 0
    a1.sources.r1.interval=2000
    a1.sources.r1.charset=UTF-8
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = mongoTopic
    a1.sinks.k1.brokerList = yourhost:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    参数说明:

    # mongodb的url
    a1.sources.r1.Mongodb.url = yourhost:27017
    # 要连接的database
    a1.sources.r1.Mongodb.database=flumetest
    # 要连接的collection
    a1.sources.r1.Mongodb.collection = testCollection
    # mongodb中每条数据都有默认的_id,用于续传
    a1.sources.r1.Mongodb.column= _id
    
    
    # sink使用了kafka,flume成功连接之后开启消费监控就能看到数据了
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # 接下来用于监控消费的topic名字
    a1.sinks.k1.topic = mongoTopic
    

    因为mongodb有集群操作,所以flume-ng-mongodb-source也支持mongodb集群,只需要在a1.sources.r1.Mongodb.url里配置多个url即可,如:

    a1.sources.r1.Mongodb.url = yourhost1:port1,yourhost2:port2,yourhost3:port3,......
    

    采集mongodb数据实践

    启动mongodb和kafka。

    启动flume

    bin/flume-ng agent -n a1 -c conf -f conf/mongo-flume.conf -Dflume.root.logger=INFO,console
    

    参数说明:

    • a1:是你在mongo-flume中给agent起的别名
    • conf/mongo-flume.conf:导入前文所述的配置文件,配置文件在yourpath/flume/conf下。

    启动一个kafka消费监控:

    bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning
    

    获取testCollection中全部数据(下图不是重复数据,是之前多次测试在topic中留下的数据):

    往testCollection中添加一条数据:

    db.testCollection.insert({id:7,name:"test",city:"Beijing"})
    

    消费监控中的结果如下:

    只读增量数据

    如果不想把collection中所有的数据都读取出来,请修改flume-ng-mongodb-source源码。

    在MongoDBSource.java文件中,找到run方法,取消掉events.clear()的注释。

    再次打包,替换掉lib下flume-ng-mongodb-source的jar包。

    然后再次执行上面的启动操作:

    bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning
    
    
    bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning
    

    插入一条数据:

    db.testCollection.insert({id:8,name:"增量"})
    

    查看消费监控:

    可以看到只有新增的数据了,不会再读取所有的数据

    再插入一条数据实验一下:

    db.testCollection.insert({id:9,source:"MongoDBSource",channle:"memory",sink:"kafka"})
    

  • 相关阅读:
    扩展的局域网
    参数估计
    以太网的 MAC 层
    poj 1523Tarjan算法的含义——求取割点可以分出的连通分量的个数
    tarjan算法--求解无向图的割点和桥
    spfa负环判断
    codeforce 489d bfs分层处理
    并查集优化——压缩路径——秩优化
    SPFA_queue_链式前向星最短路 & HDU2433
    POJ3046选蚂蚁创建集合_线性DP
  • 原文地址:https://www.cnblogs.com/kylinxxx/p/14179767.html
Copyright © 2011-2022 走看看