zoukankan      html  css  js  c++  java
  • Flume配置Replicating Channel Selector

    1 官网内容

      上面的配置是r1获取到的内容会同时复制到c1 c2 c3 三个channel里面

    2 详细配置信息

      

    # Name the components on this agent
    	a1.sources = r1
    	a1.sinks = k1 k2
    	a1.channels = c1 c2
    	 
    	# Describe/configure the source
    	a1.sources.r1.type = exec
    	a1.sources.r1.command = tail -F /tmp/logs/cmcc.log
    	 
    	# Describe the sink
    	a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k1.topic = cmcc1
    	a1.sinks.k1.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k1.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k2.batchSize = 20
    	 
    	 
    	# Use a channel which buffers events in memory
    	a1.channels.c1.type = memory
    	a1.channels.c1.capacity = 1000
    	a1.channels.c1.transactionCapacity = 100
    	 
    	a1.channels.c2.type = file
    	a1.channels.c2.checkpointDir = /tmp/flume/checkpoint
    	a1.channels.c2.dataDirs = /tmp/flume/data
    	 
    	# Bind the source and sink to the channel
    	a1.sources.r1.channels = c1 c2
    	 
    	# set channel for sinks
    	a1.sinks.k1.channel = c1
    	a1.sinks.k2.channel = c2
    	 # #
    	a1.sources.r1.selector.type = replicating
    	 #
    	 
    	 
    

    3 查看消费情况

      

    topic cmcc1的消费情况
    	 	
    	 	kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc1 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
     
     
     
     
     
     topic cmcc2的消费情况
     
    	 /tmp/logs]#kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc2 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

    4 查看tail的文件内容

    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

      

    4 总结
    应该是启动了两次的原因,实际上是把文件重复两次的发送到了每个sink里面,实现了实验要求

  • 相关阅读:
    08-JS中table隔行换色
    07-JS中 li 排序
    HTML DOM 事件
    JavaScript 事件
    jQuery事件函数
    JQuery与JS对象相互转换
    jQuery中的选择器
    jQuery实现放大镜特效
    java线程(2016-4-7)
    Java 线程的转换及状态
  • 原文地址:https://www.cnblogs.com/QuestionsZhang/p/10417797.html
Copyright © 2011-2022 走看看