zoukankan      html  css  js  c++  java
  • Flume 学习笔记之 Flume NG+Kafka整合

    Flume NG集群+Kafka集群整合:

    修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka

    hadoop1:

    #set Agent name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #set channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # other node,nna to nns
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop1
    a1.sources.r1.port = 52020
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = Collector
    a1.sources.r1.interceptors.i1.value = hadoop1
    a1.sources.r1.channels = c1
    #set sink to hdfs
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = ScalaTopic
    a1.sinks.k1.brokerList = hadoop1:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel=c1

    hadoop2:

    #set Agent name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #set channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # other node,nna to nns
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop2
    a1.sources.r1.port = 52020
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = Collector
    a1.sources.r1.interceptors.i1.value = hadoop2
    a1.sources.r1.channels = c1
    #set sink to hdfs
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = ScalaTopic
    a1.sinks.k1.brokerList = hadoop2:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel=c1

    集群测试:

    1. 启动zookeeper(hadoop1,hadoop2,hadoop3)
    2. 启动kafka server和consumer(hadoop1,hadoop2)
    3. 启动Flume server(hadoop1,hadoop2):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-kafka-server.conf --name a1 -Dflume.root.logger=INFO,console
    4. 启动Flume client(hadoop3):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-client.conf --name agent1 -Dflume.root.logger=INFO,console
    5. 在hadoop3上追加一条日志记录
    6. kafka consumer收到记录,从则测试完毕。

    hadoop3:

    hadoop1:

    测试完毕,这样Flume+kafka就整合起来了,即Flume+Kafka+Spark Streaming的实时日志分析系统就孕育而生了。

  • 相关阅读:
    winform 动态添加控件及事件
    C#调用WCF问题汇总
    如何实现验证登陆者
    如何利用wx.request进行post请求
    如何利用wx.login方法获取openid和sessionKey
    微信小程序开发调试阶段不校验请求域名
    第三方授权登陆
    node环境下:node_modules里面的文件
    windows下如何快速删除大文件
    webpack使用devtool :source map插件
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7440197.html
Copyright © 2011-2022 走看看