ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
一 Replicating
需求:flume1采集文件数据,通过flume2存储到hdfs,通过flume3存储到本地文件
flume1配置:Taildir Source -> Memory Channel -> Avro Sink
# 给三大组件取名 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # 配置 Taildir Source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/data/hive.log a1.sources.r1.positionFile = /opt/module/flume/position/position1.json # 将数据流复制给所有 channel(默认replicating) a1.sources.r1.selector.type = replicating # 配置 Memory Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 配置 Avro Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # 配置三大组件的绑定关系 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
flume2配置:Avro Source -> Memory Channel -> HDFS Sink
# 给三大组件取名 a2.sources = r1 a2.channels = c1 a2.sinks = k1 # 配置 Avro Source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # 配置 Memory Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 配置 HDFS Sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/group1/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小 a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k1.hdfs.rollCount = 0 # 配置三大组件的绑定关系 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3配置:Avro Source -> Memory Channel -> File Roll Sink
# 给三大组件取名 a3.sources = r1 a3.channels = c1 a3.sinks = k1 # 配置 Avro Source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # 配置 Memory Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # 配置 File Roll Sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/datas/group1 # 配置三大组件的绑定关系 a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动顺序:先启动下游flume2和flume3,然后启动flume1,flume2,3充当了avro的服务端。
bin/flume-ng agent -c conf/ -f job/group1/flume2.conf -n a2
bin/flume-ng agent -c conf/ -f job/group1/flume3.conf -n a3
bin/flume-ng agent -c conf/ -f job/group1/flume1.conf -n a1
二 Multiplexing
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,而不同的Channel就可以对应发往不同的Sink。所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。
案例:flume2采集端口数据,如果数据包含"hello"则发往flume3,否则发往flume4
配置flume2.conf:NetCat Source -> Memory Channel(c1),Memory Channel(c1)(c2) -> Avro Sink(k1),Avro Sink(k2)
默认 a1.sources.r1.selector.default = c4
############## name ############## a2.sources = r1 a2.channels = c1 c2 a2.sinks = k1 k2 ############## source ############## a2.sources.r1.type = netcat a2.sources.r1.bind = localhost a2.sources.r1.port = 44444 ############## channel selector ############## a2.sources.r1.selector.type = multiplexing #event的header里的键为type a2.sources.r1.selector.header = type #如果对应键的值为hello则发往c1 a2.sources.r1.selector.mapping.hello = c1 #如果对应键的值为no_hello则发往c2 a2.sources.r1.selector.mapping.no_hello = c2 ############## interceptors ############## a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder ############## channel ############## a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 ############## sink ############## a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop103 a2.sinks.k1.port = 4142 a2.sinks.k2.type = avro a2.sinks.k2.hostname = hadoop104 a2.sinks.k2.port = 4142 ############## bind ############## a2.sources.r1.channels = c1 c2 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2
配置flume3.conf:Avro Source -> Memory Channel -> Logger Sink
############## name ############## a3.sources = r1 a3.channels = c1 a3.sinks = k1 ############## source ############## a3.sources.r1.type = avro a3.sources.r1.bind = hadoop103 a3.sources.r1.port = 4142 ############## channel ############## a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 ############## sink ############## a3.sinks.k1.type = logger ############## bind ############## a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
配置flume4.conf:Avro Source -> Memory Channel -> Logger Sink
############## name ############## a4.sources = r1 a4.channels = c1 a4.sinks = k1 ############## source ############## a4.sources.r1.type = avro a4.sources.r1.bind = hadoop104 a4.sources.r1.port = 4142 ############## channel ############## a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100 ############## sink ############## a4.sinks.k1.type = logger ############## bind ############## a4.sources.r1.channels = c1 a4.sinks.k1.channel = c1
拦截器代码,将其打包,放在flume的lib目录下
public class TypeInterceptor implements Interceptor { //声明一个存放事件的集合 private List<Event> eventList; @Override public void initialize() { eventList = new ArrayList<>(); } //单个事件拦截 @Override public Event intercept(Event event) { //获取事件中的头信息 Map<String, String> headers = event.getHeaders(); //获取事件中的body信息 String body = new String(event.getBody()); //根据body中是否有"hello"来决定添加怎样的头信息 if (body.contains("hello")){ headers.put("type","hello"); }else { headers.put("type","no_hello"); } return event; } //批量事件拦截 @Override public List<Event> intercept(List<Event> list) { //清空集合 eventList.clear(); //遍历所有的event for (Event event : list) { //给每一个事件添加头信息 eventList.add(intercept(event)); } return eventList; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new TypeInterceptor(); } @Override public void configure(Context context) { } } }