zoukankan      html  css  js  c++  java
  • Flume 学习笔记之 Flume NG+Kafka整合

    Flume NG集群+Kafka集群整合:

    修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka

    hadoop1:

    #set Agent name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #set channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # other node,nna to nns
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop1
    a1.sources.r1.port = 52020
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = Collector
    a1.sources.r1.interceptors.i1.value = hadoop1
    a1.sources.r1.channels = c1
    #set sink to hdfs
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = ScalaTopic
    a1.sinks.k1.brokerList = hadoop1:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel=c1

    hadoop2:

    #set Agent name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #set channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # other node,nna to nns
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop2
    a1.sources.r1.port = 52020
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = Collector
    a1.sources.r1.interceptors.i1.value = hadoop2
    a1.sources.r1.channels = c1
    #set sink to hdfs
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = ScalaTopic
    a1.sinks.k1.brokerList = hadoop2:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel=c1

    集群测试:

    1. 启动zookeeper(hadoop1,hadoop2,hadoop3)
    2. 启动kafka server和consumer(hadoop1,hadoop2)
    3. 启动Flume server(hadoop1,hadoop2):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-kafka-server.conf --name a1 -Dflume.root.logger=INFO,console
    4. 启动Flume client(hadoop3):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-client.conf --name agent1 -Dflume.root.logger=INFO,console
    5. 在hadoop3上追加一条日志记录
    6. kafka consumer收到记录,从则测试完毕。

    hadoop3:

    hadoop1:

    测试完毕,这样Flume+kafka就整合起来了,即Flume+Kafka+Spark Streaming的实时日志分析系统就孕育而生了。

  • 相关阅读:
    什么是webview
    juqery.fn.extend和jquery.extend
    LeetCode
    5. Longest Palindromic Substring
    42. Trapping Rain Water
    11. Container With Most Water
    621. Task Scheduler
    49. Group Anagrams
    739. Daily Temperatures
    3. Longest Substring Without Repeating Characters
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7440197.html
Copyright © 2011-2022 走看看