zoukankan      html  css  js  c++  java
  • Flume同时输出数据到HDFS和kafka

    cd /usr/local/flume/conf

    vim flume-exec-total.conf

    ## Explain
    #通过sink把数据分别输出到kafka和HDFS上
    
    
    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1 k2
    agent.channels = c1 c2
    
    # Describe/configuration the source
    agent.sources.r1.type = exec
    agent.sources.r1.command = tail -f /root/test.log
    agent.sources.r1.shell = /bin/bash -c 
    
    ## kafka
    #Describe the sink
    agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.k1.topic = kafkatest
    agent.sinks.k1.brokerList = master:9092
    agent.sinks.k1.requiredAcks = 1
    agent.sinks.k1.batchSize = 2
    
    # Use a channel which buffers events in memory 
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    #agent.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1 c2
    agent.sinks.k1.channel = c1
    
    ## hdfs
    #Describe the sink
    agent.sinks.k2.type = hdfs
    agent.sinks.k2.hdfs.path = hdfs://master:9000/data/flume/tail
    agent.sinks.k2.hdfs.fileType=DataStream
    agent.sinks.k2.hdfs.writeFormat=Text
    #agent.sinks.k2.hdfs.rollInterval = 0
    #agent.sinks.k2.hdfs.rollSize = 134217728
    #agent.sinks.k2.hdfs.rollCount = 1000000
    agent.sinks.k2.hdfs.batchSize=10
    
    ## Use a channel which buffers events in memory 
    agent.channels.c2.type = memory
    #agent.channels.c1.capacity = 1000
    #agent.channels.c2.transactionCapacity = 100
    
    ## Bind the source and sink to the channel
    #agent.sources.r1.channels = c2
    agent.sinks.k2.channel = c2

    验证:

    1. 首先启动HDFS和kafka

    2. 创建topic

    kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest

    启动flume以及测试

    3. 启动Flume

    服务端
    /usr/local/flume/bin/flume-ng agent -f flume-exec-total.conf -n agent -Dflume.root.logger=INFO, console
    
    客户端
    echo "wangzai doubi" > test.log

    4. 启动kafka客户端

    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning

    结果如图:

    Flume服务端:

    HDFS:

    Kafka客户端:

  • 相关阅读:
    18种典型算法
    幂法和反幂法
    关于Ubuntu下安装Win8和Win8下安装Ubuntu的注意事项
    静态链接库与动态链接库
    面向对象系列二(封装)
    基于ASP.NET WPF技术及MVP模式实战太平人寿客户管理项目开发(Repository模式)
    不要对终于用户谈云
    cocos2d-x 3.0 创建项目
    Android OpenGL ES 画球体
    设计模式 适配器模式 以手机充电器为例
  • 原文地址:https://www.cnblogs.com/654wangzai321/p/9693177.html
Copyright © 2011-2022 走看看