zoukankan      html  css  js  c++  java
  • 十二 flume与kafka数据传输

    1、准备

      kafka 创建topic:

    kafka-topics.sh --create --zookeeper node180:2181,node181:2181,node182:2181 --replication-factor 3 --partitions 3 --topic flume2kafka
    

      

    2、新建flume 配置文件

    agent.sources = avroSource
    agent.channels = memoryChannel
    agent.sinks = kafkaSink
    
     
    
    # For each one of the sources, the type is defined
    agent.sources.avroSource.type = avro
    # The channel can be defined as follows.
    agent.sources.avroSource.channels = memoryChannel
    agent.sources.avroSource.bind=0.0.0.0
    agent.sources.avroSource.port=1234
    
    
    # Each channel's type is defined.
    agent.channels.memoryChannel.type = memory
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    agent.channels.memoryChannel.capacity = 100
    
     
    
    # Each sink's type must be defined
    agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    #Specify the channel the sink should use
    agent.sinks.kafkaSink.channel = memoryChannel
    agent.sinks.kafkaSink.kafka.topic = flume2kafka
    agent.sinks.kafkaSink.kafka.bootstrap.servers = node180:9092,node181:9092,node182:9092
    

      

    3、启动

    flume-ng agent -n agent -c conf -f /opt/module/apache-flume-1.9.0/conf/flume-conf-aggregation-kafka.properties  -Dflume.root.logger=INFO.console 
    
    flume-ng agent -n agent -c conf -f /opt/module/apache-flume-1.9.0/conf/flume-conf-collect.properties  -Dflume.root.logger=INFO.console
    

      

    4、测试

      分别在服务器 node181、node182 使用命令生产数据:

        cd /root/flume/data

        echo "180" >> test.log

      通过此命令进行数据消费:kafka-console-consumer.sh --bootstrap-server node180:9092,node181:9092,node182:9092 --topic flume2kafka --from-beginning

     

  • 相关阅读:
    jvm内存配置参数
    6 个设计原则分别是什么?每种设计原则体现的设计模式是哪个?
    classloader 结构,是否可以自己定义一个 java.lang.String 类,为什么? 双亲代理机制。
    求sum=1+111+1111+........+1....111 .
    交换排序
    字符串压缩 stringZip
    TCP为何采用三次握手来建立连接,若采用二次握手可以吗
    使用jsoup抓取新闻信息
    通过地址获得经纬度(百度Geocoding API)
    FileReader
  • 原文地址:https://www.cnblogs.com/qk523/p/12564168.html
Copyright © 2011-2022 走看看