zoukankan      html  css  js  c++  java
  • Flume配置Replicating Channel Selector

    1 官网内容

      上面的配置是r1获取到的内容会同时复制到c1 c2 c3 三个channel里面

    2 详细配置信息

      

    # Name the components on this agent
    	a1.sources = r1
    	a1.sinks = k1 k2
    	a1.channels = c1 c2
    	 
    	# Describe/configure the source
    	a1.sources.r1.type = exec
    	a1.sources.r1.command = tail -F /tmp/logs/cmcc.log
    	 
    	# Describe the sink
    	a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k1.topic = cmcc1
    	a1.sinks.k1.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k1.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k1.batchSize = 20
    	 
    	 
    	a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    	a1.sinks.k2.topic = cmcc2
    	a1.sinks.k2.brokerList = hdp1:9092,hdp2:9092,hdp3:9092
    	a1.sinks.k2.requiredAcks = 1
    	a1.sinks.k2.batchSize = 20
    	 
    	 
    	# Use a channel which buffers events in memory
    	a1.channels.c1.type = memory
    	a1.channels.c1.capacity = 1000
    	a1.channels.c1.transactionCapacity = 100
    	 
    	a1.channels.c2.type = file
    	a1.channels.c2.checkpointDir = /tmp/flume/checkpoint
    	a1.channels.c2.dataDirs = /tmp/flume/data
    	 
    	# Bind the source and sink to the channel
    	a1.sources.r1.channels = c1 c2
    	 
    	# set channel for sinks
    	a1.sinks.k1.channel = c1
    	a1.sinks.k2.channel = c2
    	 # #
    	a1.sources.r1.selector.type = replicating
    	 #
    	 
    	 
    

    3 查看消费情况

      

    topic cmcc1的消费情况
    	 	
    	 	kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc1 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
     
     
     
     
     
     topic cmcc2的消费情况
     
    	 /tmp/logs]#kafka-console-consumer.sh  --zookeeper hdp1:2181,hdp2:2181,hdp3:2181/kafka1.1.0  --topic cmcc2 --from-beginning
    	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

    4 查看tail的文件内容

    	hello
    	world
    	java
    	scala
    	hadoop
    	zhangjin
    	xxxx
    	yyyy
    	zhangjin
    	xxxx
    	yyyy
    	
    

      

      

    4 总结
    应该是启动了两次的原因,实际上是把文件重复两次的发送到了每个sink里面,实现了实验要求

  • 相关阅读:
    1.1、Go快速入坟系列之循环与分支
    1.0、Go快速入坟系列之变量、常量与算术运算符
    0.0、Go快速入坟系列配置与安装Go环境
    使用Docker部署.Net Core项目
    留言板
    List实体中不同字段值的转换
    Yum安装,Linux自带Python卸载 安装
    CentOS7系统配置国内yum源和epel源
    Centos7安装jdk1.8
    VUE下载文件,下载后台返回的response
  • 原文地址:https://www.cnblogs.com/QuestionsZhang/p/10417797.html
Copyright © 2011-2022 走看看