zoukankan      html  css  js  c++  java
  • Flume配置Replicating Channel Selector

    1 官网内容

      上面的配置是r1获取到的内容会同时复制到c1 c2 c3 三个channel里面

    2 详细配置信息

      

    # Name the components on this agent
    	a1.sources = r1
    	a1.sinks = k1 k2
    	a1.channels = c1 c2
    	 
    	# Describe/configure the source
    	a1.sources.r1.type = exec
    	a1.sources.r1.command = tail -F /tmp/logs/cmcc.log
    	 
    	# Describe the sink
    	a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k1.topic = cmcc1
    	a1.sinks.k1.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k1.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k2.batchSize = 20
    	 
    	 
    	# Use a channel which buffers events in memory
    	a1.channels.c1.type = memory
    	a1.channels.c1.capacity = 1000
    	a1.channels.c1.transactionCapacity = 100
    	 
    	a1.channels.c2.type = file
    	a1.channels.c2.checkpointDir = /tmp/flume/checkpoint
    	a1.channels.c2.dataDirs = /tmp/flume/data
    	 
    	# Bind the source and sink to the channel
    	a1.sources.r1.channels = c1 c2
    	 
    	# set channel for sinks
    	a1.sinks.k1.channel = c1
    	a1.sinks.k2.channel = c2
    	 # #
    	a1.sources.r1.selector.type = replicating
    	 #
    	 
    	 
    

    3 查看消费情况

      

    topic cmcc1的消费情况
    	 	
    	 	kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc1 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
     
     
     
     
     
     topic cmcc2的消费情况
     
    	 /tmp/logs]#kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc2 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

    4 查看tail的文件内容

    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

      

    4 总结
    应该是启动了两次的原因,实际上是把文件重复两次的发送到了每个sink里面,实现了实验要求

  • 相关阅读:
    2017-2018-1 团队名称 第一周 作业
    20162307 2017-2018-1 《程序设计与数据结构》第3周学习总结
    20162307 2017-2018-1 《程序设计与数据结构》第1周学习总结
    20161207 结对编程-马尔可夫链-自动生成短文
    20162307 2016-2017-2《程序设计与数据结构》课程总结
    20162307 实验五 网络编程与安全
    20162307 结对编程-四则运算(挑战出题)
    20162306 2016-2017-2《程序设计与数据结构》课程总结
    实验五 网络编程与安全实验报告 20162306陈是奇
    实验补充
  • 原文地址:https://www.cnblogs.com/QuestionsZhang/p/10417797.html
Copyright © 2011-2022 走看看