zoukankan      html  css  js  c++  java
  • Flume配置Replicating Channel Selector

    1 官网内容

      上面的配置是r1获取到的内容会同时复制到c1 c2 c3 三个channel里面

    2 详细配置信息

      

    # Name the components on this agent
    	a1.sources = r1
    	a1.sinks = k1 k2
    	a1.channels = c1 c2
    	 
    	# Describe/configure the source
    	a1.sources.r1.type = exec
    	a1.sources.r1.command = tail -F /tmp/logs/cmcc.log
    	 
    	# Describe the sink
    	a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k1.topic = cmcc1
    	a1.sinks.k1.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k1.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k2.batchSize = 20
    	 
    	 
    	# Use a channel which buffers events in memory
    	a1.channels.c1.type = memory
    	a1.channels.c1.capacity = 1000
    	a1.channels.c1.transactionCapacity = 100
    	 
    	a1.channels.c2.type = file
    	a1.channels.c2.checkpointDir = /tmp/flume/checkpoint
    	a1.channels.c2.dataDirs = /tmp/flume/data
    	 
    	# Bind the source and sink to the channel
    	a1.sources.r1.channels = c1 c2
    	 
    	# set channel for sinks
    	a1.sinks.k1.channel = c1
    	a1.sinks.k2.channel = c2
    	 # #
    	a1.sources.r1.selector.type = replicating
    	 #
    	 
    	 
    

    3 查看消费情况

      

    topic cmcc1的消费情况
    	 	
    	 	kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc1 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
     
     
     
     
     
     topic cmcc2的消费情况
     
    	 /tmp/logs]#kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc2 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

    4 查看tail的文件内容

    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

      

    4 总结
    应该是启动了两次的原因,实际上是把文件重复两次的发送到了每个sink里面,实现了实验要求

  • 相关阅读:
    [ZJOI2019]语言
    浅谈javascript关键字"this"
    浅谈javascript实现图片预加载
    How browsers workBehind the scenes of modern web browsers
    javascript设计模式之单例模式
    浅谈javascript关键字"call"
    (转)DirectX 中的 Device>BeginScene(), Device>EndScene() 和 Present
    今天买了美味方面包,哭的很难吃。
    (转2)点是否在三角形内
    完成地形拼接系统
  • 原文地址:https://www.cnblogs.com/QuestionsZhang/p/10417797.html
Copyright © 2011-2022 走看看