zoukankan      html  css  js  c++  java
  • Flume采集文件数据到Kafka

    采集文件call.log的数据到kafka,并从kafka消费者控制台获取数据。

    flume+kafka是目前大数据很经典的日志采集工具。文件数据通过flume采集,通过kafka进行订阅发布并缓存,很适合充当消息中间件。

    准备工作

    启动zookeeper,kafka集群

    ./bin/zkServer.sh start
    ./bin/kafka-server-start.sh /config/server.properties
    

    在kafka创建ct主题,并设置分区数量,副本数量,这些信息都会保存在zookeeper上。

    ./bin/kafka-topics.sh --zookeeper master:2181 --create --topic ct --partitions 3 --replication-factor 2
    

    启动kafka控制台消费者,在这个进程可以看到采集的数据。

    ./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic ct --from-beginning
    

    启动flume,其中flume-kafka.conf配置文件在下方。

    ./bin/flume-ng agent --conf ./conf/ --name a1 --conf-file ./flume-kafka.conf
    

    flume-kafka.conf

    # define
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F -c +0 /usr/local/data/call.log
    a1.sources.r1.shell = /bin/bash -c
    
    # sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
    a1.sinks.k1.kafka.topic = ct
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    # channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    小结

    • 可以发现kafka的运行,都要通过zookeeper,可以很好理解zookeeper在kafka集群中充当的角色。
    • 运行后,文件call.log的数据都会发送给kafka,无论是哪个节点,通过kafka创建消费者,获取主题topic都会得到数据。
    • flume的sink有直接的kafka源,两者可以很简易的结合
  • 相关阅读:
    中国骨干网节点
    Linux命令整理
    centos6.5安装mysql
    mysql插入中文乱码问题
    Intellij Idea使用及配置
    IntelliJ IDEA像Eclipse一样打开多个项目
    IntelliJ IDEA14如何配置tomcat
    转:IntelliJ IDEA 2016.1.3注册破解激活
    IntelliJ Idea 快捷键
    CXF生成调用webservice的客户端
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12685174.html
Copyright © 2011-2022 走看看