zoukankan      html  css  js  c++  java
  • Flume ChannelSelector (包括自定义flume拦截器)

    ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
     
    一 Replicating

    需求:flume1采集文件数据,通过flume2存储到hdfs,通过flume3存储到本地文件

    flume1配置:Taildir Source -> Memory Channel -> Avro Sink

    # 给三大组件取名
    a1.sources = r1
    a1.channels = c1 c2
    a1.sinks = k1 k2
    
    # 配置 Taildir Source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/data/hive.log
    a1.sources.r1.positionFile = /opt/module/flume/position/position1.json
    # 将数据流复制给所有 channel(默认replicating)
    a1.sources.r1.selector.type = replicating
    
    # 配置 Memory Channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    # 配置 Avro Sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    
    # 配置三大组件的绑定关系
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

    flume2配置:Avro Source -> Memory Channel -> HDFS Sink

    # 给三大组件取名
    a2.sources = r1
    a2.channels = c1
    a2.sinks = k1
    
    # 配置 Avro Source
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    
    # 配置 Memory Channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # 配置 HDFS Sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/group1/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k1.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k1.hdfs.batchSize = 1000
    #设置文件类型,可支持压缩
    a2.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k1.hdfs.rollInterval = 30
    #设置每个文件的滚动大小
    a2.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a2.sinks.k1.hdfs.rollCount = 0
    
    # 配置三大组件的绑定关系
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1

    flume3配置:Avro Source -> Memory Channel -> File Roll Sink

    # 给三大组件取名
    a3.sources = r1
    a3.channels = c1 
    a3.sinks = k1 
    
    # 配置 Avro Source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    
    # 配置 Memory Channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # 配置 File Roll Sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/datas/group1
    
    # 配置三大组件的绑定关系
    a3.sources.r1.channels = c1 
    a3.sinks.k1.channel = c1

    启动顺序:先启动下游flume2和flume3,然后启动flume1,flume2,3充当了avro的服务端。

    bin/flume-ng agent -c conf/ -f job/group1/flume2.conf -n a2
    bin/flume-ng agent -c conf/ -f job/group1/flume3.conf -n a3
    bin/flume-ng agent -c conf/ -f job/group1/flume1.conf -n a1

    二 Multiplexing

    在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,而不同的Channel就可以对应发往不同的Sink。所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

    案例:flume2采集端口数据,如果数据包含"hello"则发往flume3,否则发往flume4

    配置flume2.conf:NetCat Source -> Memory Channel(c1),Memory Channel(c1)(c2) -> Avro Sink(k1),Avro Sink(k2)

    默认 a1.sources.r1.selector.default = c4
    ############## name ##############
    a2.sources = r1
    a2.channels = c1 c2
    a2.sinks = k1 k2
    
    ############## source ##############
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = localhost
    a2.sources.r1.port = 44444
    
    ############## channel selector ##############
    a2.sources.r1.selector.type = multiplexing
    #event的header里的键为type
    a2.sources.r1.selector.header = type
    #如果对应键的值为hello则发往c1
    a2.sources.r1.selector.mapping.hello = c1
    #如果对应键的值为no_hello则发往c2
    a2.sources.r1.selector.mapping.no_hello = c2
    
    ############## interceptors ##############
    a2.sources.r1.interceptors = i1
    a2.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
    
    ############## channel ##############
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    ############## sink ##############
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop103
    a2.sinks.k1.port = 4142
    
    a2.sinks.k2.type = avro
    a2.sinks.k2.hostname = hadoop104
    a2.sinks.k2.port = 4142
    
    ############## bind ##############
    a2.sources.r1.channels = c1 c2
    a2.sinks.k1.channel = c1
    a2.sinks.k2.channel = c2

    配置flume3.conf:Avro Source -> Memory Channel -> Logger Sink

    ############## name ##############
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1
    
    ############## source ##############
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop103
    a3.sources.r1.port = 4142
    
    ############## channel ##############
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    ############## sink ##############
    a3.sinks.k1.type = logger
    
    ############## bind ##############
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1

    配置flume4.conf:Avro Source -> Memory Channel -> Logger Sink

    ############## name ##############
    a4.sources = r1
    a4.channels = c1
    a4.sinks = k1
    
    ############## source ##############
    a4.sources.r1.type = avro
    a4.sources.r1.bind = hadoop104
    a4.sources.r1.port = 4142
    
    ############## channel ##############
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 1000
    a4.channels.c1.transactionCapacity = 100
    
    ############## sink ##############
    a4.sinks.k1.type = logger
    
    ############## bind ##############
    a4.sources.r1.channels = c1
    a4.sinks.k1.channel = c1

    拦截器代码,将其打包,放在flume的lib目录下

    public class TypeInterceptor implements Interceptor {
    
        //声明一个存放事件的集合
        private List<Event> eventList;
    
        @Override
        public void initialize() {
            eventList = new ArrayList<>();
        }
    
        //单个事件拦截
        @Override
        public Event intercept(Event event) {
    
            //获取事件中的头信息
            Map<String, String> headers = event.getHeaders();
    
            //获取事件中的body信息
            String body = new String(event.getBody());
    
            //根据body中是否有"hello"来决定添加怎样的头信息
            if (body.contains("hello")){
                headers.put("type","hello");
            }else {
                headers.put("type","no_hello");
            }
    
    
            return event;
        }
    
        //批量事件拦截
        @Override
        public List<Event> intercept(List<Event> list) {
    
            //清空集合
            eventList.clear();
    
            //遍历所有的event
            for (Event event : list) {
                //给每一个事件添加头信息
                eventList.add(intercept(event));
            }
    
            return eventList;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new TypeInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
  • 相关阅读:
    bzoj 5455
    hdu 6705
    hdu 6706
    斜率优化
    bzoj3672
    bzoj1367
    bzoj2118
    bzoj2337
    Codeforces 1077D Cutting Out(二分答案)
    Codeforces 1079C Playing Piano(记忆化搜索)
  • 原文地址:https://www.cnblogs.com/noyouth/p/13066295.html
Copyright © 2011-2022 走看看