SinkProcessor共 有 三 种 类 型 , 分 别 是DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor。DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。
一 FailoverSinkProcessor 可以实现故障转移
需求:flume1采集端口数据,发送给flume2或flume3。当flume2或3挂掉后,发送给另一台flume。
flume1配置:NetCat Source -> Memory Channel -> Avro Sink
# 给三大组件取名 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 #配置 NetCat Source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置 Memory Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 Avro Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # 配置 sink groups a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 # 最大退避时间(期间不重试) a1.sinkgroups.g1.processor.maxpenalty = 10000 # 配置三大组件的绑定关系 a1.sources.r1.channels = c1 # c1的数据发给k1或k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
flume2配置:Avro Source -> Memory Channel -> Logger Sink
# 给三大组件取名 a2.sources = r1 a2.channels = c1 a2.sinks = k1 # 配置 Avro Source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # 配置 Memory Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 配置 Logger Sink a2.sinks.k1.type = logger # 配置三大组件的绑定关系 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3配置:Avro Source -> Memory Channel -> Logger Sink
# 给三大组件取名 a3.sources = r1 a3.channels = c1 a3.sinks = k1 # 配置 Avro Source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # 配置 Memory Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # 配置 Logger Sink a3.sinks.k1.type = logger # 配置三大组件的绑定关系 a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
测试略
二 LoadBalancingSinkProcessor 实现负载均衡
与FailoverSinkProcessor 的配置极为类似,只需在flume1中将sink组的配置改为
# sink group 负载均衡 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random