从端口读数据读取到本地文件
#1.给三个组件命名
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#2.给source组件属性赋值
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666
#3.给channel组件属性赋值
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#4.给sink组件属性赋值
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/datas/fileroll
#5.让sources、sinks连接上对应的channels
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
从端口读数据读取到HDFS
a2 ==> a2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
复制和多路复用
可以将相同数据复制到多个channel中(flume默认),也可以将不同数据分发到不同channel中,sink可以选择传送到不同的目的地
监控一个文件,然后通过两个channel搭配两个sink吧内容写出到控制台.
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /tmp/atguigu/hive.log
a1.sources.r1.selector.type = replicating
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
故障转移
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障
1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作
2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink
- 遇到故障时,我们要立即修复
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 6666
#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
负载均衡
通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)
- 使用负载均衡以后,channel会轮训分配任务,减少机器负荷
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
聚合
聚合,指的是将多台日志服务器上的数据,汇总到一台日志服务器上,进行输出
a3 ==> a3.conf(hadoop104)
a3.sources = r1
a3.channels = c1
a3.sinks = k1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 6666
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.type = logger
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
a2 ==> a2.conf(hadoop103)
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 4444
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 6666
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
a1 ==> a1.conf(hadoop102)
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /tmp/atguigu/hive.log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 6666
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
ChannelSelector案例
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating
- Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
- 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用
replicating类型例子如下
a1.sources = r1
a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
multiplexing类型的ChannelSelector例子如下
a1.sources = r1
a1.channels = c1 c2
a1.sources.source1.selector.type = multiplexing
a1.sources.source1.selector.header = title # 以header中的title对应的值作为条件
a1.sources.source1.selector.mapping.a = c2 # 如果header中title的值为a,使用c2这个channel
a1.sources.source1.selector.mapping.b = c1 # 如果header中title的值为FAIL,使用c1这个channel
a1.sources.source1.selector.default = c1 # 默认使用c1这个channel
SinkProcessor案例
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。
自定义Interceptor
使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
需求
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
实现代码
1)创建maven项目,引入依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
2)Java代码
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
/**
* 1. 如何自定义拦截器?
* flume的自定义拦截器需要实现Flume提供的Interceptor接口.
*
* 实现抽象方法:
* initialize: 完成一些初始化工作.
* close: 完成一些善后的工作
* intercept:拦截器的核心处理方法. 拦截的逻辑.
* intercept(Event event) : 单个event的拦截处理
* intercept(List<Event> events): 批次event的拦截处理
*
* 2. 拦截器的对象如何实例化?
* 在拦截器中定义一个static的内部类,实现Flume提供的Builder接口
*
* 实现抽象方法:
* build : 用于构建拦截器对象
* configure:用于读取配置信息(xxxx.conf)
*
*
*
*/
public class LogDataInterceptor implements Interceptor {
@Override
public void initialize() {
}
/**
* 需求: 判断每个Event的body中是否包含"atguigu"
* 如果包含,给Event的header中添加一个kv: title = at
* 如果不包含,给Event的header中添加一个kv: title = ot
*/
@Override
public Event intercept(Event event) {
//1. 获取event的 header 和 body
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody()); // 编码问题
//2. 判断处理
if(body.contains("atguigu")){
headers.put("title","at");
}else{
headers.put("title","ot");
}
//3. 将处理好的event返回
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events ;
}
@Override
public void close() {
}
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new LogDataInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
3)将代码打成jar包
4)配置文件
1.进阶案例 - channel选择器 - 多路
a3 ==> a3.conf
a3.sources = r1
a3.channels = c1
a3.sinks = k1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.type = logger
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
a2 ==> a2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
a2.sinks.k1.type =logger
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
a1 ==> a1.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666
#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = title
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c1里
a1.sources.r1.selector.mapping.at = c1
a1.sources.r1.selector.mapping.ot = c2
a1.sources.r1.selector.default=c1
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogDataInterceptor$MyBuilder
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
5)分别在三台机器上启动flume并测试
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
注:使用 jps-ml 查看Flume进程