zoukankan      html  css  js  c++  java
  • Flume配置Replicating Channel Selector

    1 官网内容

      上面的配置是r1获取到的内容会同时复制到c1 c2 c3 三个channel里面

    2 详细配置信息

      

    # Name the components on this agent
    	a1.sources = r1
    	a1.sinks = k1 k2
    	a1.channels = c1 c2
    	 
    	# Describe/configure the source
    	a1.sources.r1.type = exec
    	a1.sources.r1.command = tail -F /tmp/logs/cmcc.log
    	 
    	# Describe the sink
    	a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k1.topic = cmcc1
    	a1.sinks.k1.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k1.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k2.batchSize = 20
    	 
    	 
    	# Use a channel which buffers events in memory
    	a1.channels.c1.type = memory
    	a1.channels.c1.capacity = 1000
    	a1.channels.c1.transactionCapacity = 100
    	 
    	a1.channels.c2.type = file
    	a1.channels.c2.checkpointDir = /tmp/flume/checkpoint
    	a1.channels.c2.dataDirs = /tmp/flume/data
    	 
    	# Bind the source and sink to the channel
    	a1.sources.r1.channels = c1 c2
    	 
    	# set channel for sinks
    	a1.sinks.k1.channel = c1
    	a1.sinks.k2.channel = c2
    	 # #
    	a1.sources.r1.selector.type = replicating
    	 #
    	 
    	 
    

    3 查看消费情况

      

    topic cmcc1的消费情况
    	 	
    	 	kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc1 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
     
     
     
     
     
     topic cmcc2的消费情况
     
    	 /tmp/logs]#kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc2 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

    4 查看tail的文件内容

    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

      

    4 总结
    应该是启动了两次的原因,实际上是把文件重复两次的发送到了每个sink里面,实现了实验要求

  • 相关阅读:
    C#趣味程序---车牌号推断
    使用 C# 开发智能手机软件:推箱子(十四)
    【Oracle错误集锦】:ORA-12154: TNS: 无法解析指定的连接标识符
    java中你确定用对单例了吗?
    linux tty设置详解
    tty linux 打开和设置范例
    C和C++之间库的互相调用
    Android 编译参数 LOCAL_MODULE_TAGS
    pthread_once 和 pthread_key
    Android系统root破解原理分析
  • 原文地址:https://www.cnblogs.com/QuestionsZhang/p/10417797.html
Copyright © 2011-2022 走看看