zoukankan      html  css  js  c++  java
  • Flume 学习笔记之 Flume NG+Kafka整合

    Flume NG集群+Kafka集群整合:

    修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka

    hadoop1:

    #set Agent name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #set channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # other node,nna to nns
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop1
    a1.sources.r1.port = 52020
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = Collector
    a1.sources.r1.interceptors.i1.value = hadoop1
    a1.sources.r1.channels = c1
    #set sink to hdfs
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = ScalaTopic
    a1.sinks.k1.brokerList = hadoop1:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel=c1

    hadoop2:

    #set Agent name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #set channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # other node,nna to nns
    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop2
    a1.sources.r1.port = 52020
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = Collector
    a1.sources.r1.interceptors.i1.value = hadoop2
    a1.sources.r1.channels = c1
    #set sink to hdfs
    a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = ScalaTopic
    a1.sinks.k1.brokerList = hadoop2:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel=c1

    集群测试:

    1. 启动zookeeper(hadoop1,hadoop2,hadoop3)
    2. 启动kafka server和consumer(hadoop1,hadoop2)
    3. 启动Flume server(hadoop1,hadoop2):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-kafka-server.conf --name a1 -Dflume.root.logger=INFO,console
    4. 启动Flume client(hadoop3):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-client.conf --name agent1 -Dflume.root.logger=INFO,console
    5. 在hadoop3上追加一条日志记录
    6. kafka consumer收到记录,从则测试完毕。

    hadoop3:

    hadoop1:

    测试完毕,这样Flume+kafka就整合起来了,即Flume+Kafka+Spark Streaming的实时日志分析系统就孕育而生了。

  • 相关阅读:
    c#生成图片验证码
    关于Aspcms如何嵌入整个网站,以及网站导航所指向页面的内容显示
    web 验证控件
    MVC Link连接数据库增删改查方法的不同写法
    Mvc 翻页查询,代码很有用
    MVC添加分布视图做唯一验证
    MVc路由查询,路由到底有什么作用呢??
    MVC添加动态视图的参考代码。重点是添加部分视图的使用方法,非常有用的代码!!!!!!!!!!!!!!
    tyvj P1209
    bzoj 1051: [HAOI2006]受欢迎的牛 tarjan缩点
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7440197.html
Copyright © 2011-2022 走看看