zoukankan      html  css  js  c++  java
  • flume拦截器

    flume 拦截器(interceptor)
    1、flume拦截器介绍
    拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。
    2、flume内置的拦截器


    2.1 时间戳拦截器
    flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置:

    参数

    默认值

    描述

    type

    timestamp

    类型名称timestamp,也可以使用类名的全路径org.apache.flume.interceptor.TimestampInterceptor$Builder

    preserveExisting

    false

    如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值


    参数 默认值 描述
    type timestamp 类型名称timestamp,也可以使用类名的全路径org.apache.flume.interceptor.TimestampInterceptor$Builder
    preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值

    source连接到时间戳拦截器的配置:


    a1.sources.r1.interceptors=i1 
    a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false


    2.2 主机拦截器
    主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。事件报头中的key使用hostHeader配置,默认是host。主机拦截器的配置:

    参数

    默认值

    描述

    type

    host

    类型名称host,也可以使用类名的全路径org.apache.flume.interceptor.HostInterceptor$Builder

    hostHeader

    host

    事件头的key

    useIP

    true

    如果设置为false,host键插入主机名

    preserveExisting

    false

    如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值


    参数 默认值 描述
    type host 类型名称host,也可以使用类名的全路径org.apache.flume.interceptor.HostInterceptor$Builder
    hostHeader host 事件头的key
    useIP true 如果设置为false,host键插入主机名
    preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值

    source连接到主机拦截器的配置:


    a1.sources.r1.interceptors=i2
    a1.sources.r1.interceptors.i2.type=host
    a1.sources.r1.interceptors.i2.useIP=false
    a1.sources.r1.interceptors.i2.preserveExisting=false



    2.3 静态拦截器
    静态拦截器的作用是将k/v插入到事件的报头中。配置如下

    参数

    默认值

    描述

    type

    static

    类型名称static,也可以使用类全路径名称org.apache.flume.interceptor.StaticInterceptor$Builder

    key

    key

    事件头的key

    value

    value

    key对应的value值

    preserveExisting

    true

    如果设置为true,若事件中报头已经存在该key,不会替换value的值


    参数 默认值 描述
    type static 类型名称static,也可以使用类全路径名称org.apache.flume.interceptor.StaticInterceptor$Builder
    key key 事件头的key
    value value key对应的value值
    preserveExisting true 如果设置为true,若事件中报头已经存在该key,不会替换value的值

    source连接到静态拦截器的配置:


    a1.sources.r1.interceptors = static
    a1.sources.r1.interceptors.static.type=static
    a1.sources.r1.interceptors.static.key=logs
    a1.sources.r1.interceptors.static.value=logFlume
    a1.sources.r1.interceptors.static.preserveExisting=false


    2.4 正则过滤拦截器
    在日志采集的时候,可能有一些数据是我们不需要的,这样添加过滤拦截器,可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。配置如下

    参数

    默认值

    描述

    type

    REGEX_FILTER

    类型名称REGEX_FILTER,也可以使用类全路径名称org.apache.flume.interceptor.RegexFilteringInterceptor$Builder

    regex

    .*

    匹配除“ ”之外的任何个字符

    excludeEvents

    false

    默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的


    参数 默认值 描述
    type REGEX_FILTER 类型名称REGEX_FILTER,也可以使用类全路径名称org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
    regex .* 匹配除“ ”之外的任何个字符
    excludeEvents false 默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的

    source连接到正则过滤拦截器的配置:


    a1.sources.r1.interceptors=i4
    a1.sources.r1.interceptors.i4.type=REGEX_FILTER a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false



    这样配置的拦截器就只会接收日志消息中带有rm 或者kill的日志。

    测试案例:
    test_regex.conf
    # 定义这个agent中各组件的名字


    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1


    # 描述和配置source组件:r1


    a1.sources.r1.type = netcat
    a1.sources.r1.bind = hadoop-001
    a1.sources.r1.port = 44444
    a1.sources.r1.interceptors=i4
    a1.sources.r1.interceptors.i4.type=REGEX_FILTER



    #保留内容中出现hadoop或者是spark的字符串的记录


    a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark)
    a1.sources.r1.interceptors.i4.excludeEvents=false


    # 描述和配置sink组件:k1


    a1.sinks.k1.type = logger


    # 描述和配置channel组件,此处使用是内存缓存的方式


    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100


    # 描述和配置source channel sink之间的连接关系


    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1



    输入以下指令:


    bin/flume-ng agent --conf conf --conf-file conf/test_regex.conf --name a1 -Dflume.root.logger=INFO,console



    发送数据测试:


    打印到控制台信息:

     

    只接受到存在hadoop或者spark的记录,验证成功!

  • 相关阅读:
    七天冲刺04
    软件工程概论项目——典型用户场景分析
    七天冲刺3
    七天冲刺2
    七天冲刺1
    第十三周总结
    软件工程概论个人作业02
    第二周学习进度
    软件工程个人作业01--四则运算
    连接数据库的javaee编译简易的WEB登陆界面
  • 原文地址:https://www.cnblogs.com/ninglinglong/p/11586985.html
Copyright © 2011-2022 走看看