#测试 channel selector #测试方法,chanel改为kafka 通过两个消费者验证消息 的发送策略 # a1.sources = r1 a1.sinks = k1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 #a1.sources.r1.selector.optional = c3 # For each one of the sources, the type is defined #agent.sources.seqGenSrc.type = seq #a1.sources.r1.type = netcat #a1.sources.r1.bind=mini1 #a1.sources.r1.port=44444 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/flume/test/logs/flume2.dat # The channel can be defined as follows. #agent.sources.seqGenSrc.channels = memoryChannel #a1.channels.c1.type=memory #a1.channels.c1.capacity=1000 #a1.channels.c1.transactionCapacity =100 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = mini1:9092,mini2:9092,mini3:9092 #channel selector replicating a1.channels.c1.kafka.topic = csr1 a1.channels.c1.kafka.consumer.group.id = csr01 a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = mini1:9092,mini2:9092,mini3:9092 #channel selector replicating a1.channels.c2.kafka.topic = csr2 a1.channels.c2.kafka.consumer.group.id = csr02 # Each sink's type must be defined #agent.sinks.loggerSink.type = logger a1.sinks.k1.type = logger #Specify the channel the sink should use #agent.sinks.loggerSink.channel = memoryChannel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 # Each channel's type is defined. #agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel #agent.channels.memoryChannel.capacity = 100
kafka 消费程序
public static void main(String[] args) throws IOException { Properties props = new Properties(); props.load(TestConsumer.class.getClass().getResourceAsStream("/kfkConsumer.properties")); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("csr2","csr1")); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { System.out.print("Thread : " + Thread.currentThread().getName()); System.out.printf("topic = %s, offset = %d, key = %s, value = %s, partition = %d %n",record.topic(), record.offset(), record.key(), record.value(), record.partition()); } consumer.commitSync(); } }
消费结果
Thread : maintopic = csr1, offset = 3, key = null, value = from haishang, partition = 0 Thread : maintopic = csr2, offset = 4, key = null, value = from haishang, partition = 1
结论,flume channel selector 使用 replicating 策略时 会把消息发送给所有的配置的可以用的channel
第二种验证方法,此时要启动三个节点,注意其中sources.sinks,的名字
第一个flume中
#channelSelector_replicationg_avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 50000 #a1.sources.r1.host = 192.168.233.128 a1.sources.r1.host = 192.168.10.201 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 #a1.sinks.k1.hostname = 192.168.233.129 a1.sinks.k1.hostname = 192.168.10.202 a1.sinks.k1.port = 50000 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 #a1.sinks.k2.hostname = 192.168.233.130 a1.sinks.k2.hostname = 192.168.10.203 a1.sinks.k2.port = 50000 # Use a channel which buffers events inmemory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
sink
#channelSelector_replicating_sink.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
#a2.sources.r1.bind = 192.168.233.129
a2.sources.r1.bind = 192.168.10.202
a2.sources.r1.port = 50000
# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
sink
#channelSelector_replicating_sink.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
#a3.sources.r1.bind = 192.168.233.130
a3.sources.r1.bind = 192.168.10.203
a3.sources.r1.port = 50000
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
~
启动命令
启动sink
bin/flume-ng agent -c conf -f conf/channelSelector_replicating_sink.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/channelSelector_replicating_sink.conf -n a2 -Dflume.root.logger=INFO,console
启动source
flume-ng agent -c conf -f conf/channelSelector_replicationg_avro.conf -n a1 -Dflume.root.logger=INFO,console
发送消息 :echo "you are the best "| nc 192.168.10.201 50000
验证multiplexing
source
#配置文 a1.sources= r1 a1.sinks= k1 k2 a1.channels= c1 c2 #Describe/configure the source a1.sources.r1.type=http a1.sources.r1.port= 50000 #a1.sources.r1.host= 192.168.233.128 a1.sources.r1.host=mini1 a1.sources.r1.selector.type= multiplexing a1.sources.r1.channels= c1 c2 a1.sources.r1.selector.header= state a1.sources.r1.selector.mapping.CZ= c1 a1.sources.r1.selector.mapping.US= c2 a1.sources.r1.selector.default= c1 #Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel= c1 #a1.sinks.k1.hostname= 192.168.233.129 a1.sinks.k1.hostname=mini2 a1.sinks.k1.port= 50000 a1.sinks.k2.type= avro a1.sinks.k2.channel= c2 #a1.sinks.k2.hostname= 192.168.233.130 a1.sinks.k2.hostname=mini3 a1.sinks.k2.port= 50000 # Usea channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity= 1000 a1.channels.c1.transactionCapacity= 100 a1.channels.c2.type= memory a1.channels.c2.capacity= 1000 a1.channels.c2.transactionCapacity= 100
sink1
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.channels = c1 #a2.sources.r1.bind = 192.168.233.129 a2.sources.r1.bind = mini2 a2.sources.r1.port = 50000 # Describe the sink a2.sinks.k1.type = logger a2.sinks.k1.channel = c1 # Use a channel which buffers events inmemory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100
sink2
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.channels = c1 #32.sources.r1.bind = 192.168.233.129 a3.sources.r1.bind = mini3 a3.sources.r1.port = 50000 # Describe the sink a3.sinks.k1.type = logger a3.sinks.k1.channel = c1 # Use a channel which buffers events inmemory a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100
启动sink
bin/flume-ng agent -c conf -f conf/channelSelector_mul_sink.conf -n a3 -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf -f conf/channelSelector_mul_sink.conf -n a2 -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf -f conf/channelSelector_multi.conf -n a1 -Dflume.root.logger=INFO,console
有以上命令推断出配置文件名字
执行命令
curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"CZ"}]' http://mini1:50000
curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"US"}]' http://mini1:50000
curl -X POST -d '[{"headers" :{"state" : "NO"},"body" :"no"}]' http://mini1:50000
结果
CZ的消息会发送到sink1节点上
US会发送大sink2基点,
//,NO 的消息会发送到sink1节点上
//其中CZ和US是在上面source节点配置的,NO没有配置
//但是为什么NO的消息会一直发送到sink1
上面的source 中有连个新的类型 syslogtcp(Syslogtcp监听TCP的端口做为数据源) http()