zoukankan      html  css  js  c++  java
  • flume学习(四):Flume Interceptors的使用

     转载:http://blog.csdn.net/xiao_jun_0820/article/details/38111305

    对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

    官方上提供的已有的拦截器有:

    Timestamp Interceptor

    Host Interceptor

    Static Interceptor

    Regex Filtering Interceptor

    Regex Extractor Interceptor

    像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。

    Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到

    Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。

    Static Interceptor:可以在event的header中添加自定义的key和value。

    Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。

    Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

     
    下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:
    [java] view plain copy
     
    1. public class WriteLog {  
    2.     protected static final Log logger = LogFactory.getLog(WriteLog.class);  
    3.   
    4.     /** 
    5.      * @param args 
    6.      * @throws InterruptedException 
    7.      */  
    8.     public static void main(String[] args) throws InterruptedException {  
    9.         // TODO Auto-generated method stub  
    10.         while (true) {  
    11.             logger.info(new Date().getTime());  
    12.             logger.info("{"requestTime":"  
    13.                     + System.currentTimeMillis()  
    14.                     + ","requestParams":{"timestamp":1405499314238,"phone":"02038824941","cardName":"测试商家名称","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}");  
    15.             Thread.sleep(2000);  
    16.   
    17.         }  
    18.     }  
    19. }  
    又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。
     
    接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
    1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
    2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。
     
    修改后的flume.conf如下:
    [plain] view plain copy
     
    1. tier1.sources=source1  
    2. tier1.channels=channel1  
    3. tier1.sinks=sink1  
    4.   
    5. tier1.sources.source1.type=avro  
    6. tier1.sources.source1.bind=0.0.0.0  
    7. tier1.sources.source1.port=44444  
    8. tier1.sources.source1.channels=channel1  
    9.   
    10. tier1.sources.source1.interceptors=i1 i2  
    11. tier1.sources.source1.interceptors.i1.type=regex_filter  
    12. tier1.sources.source1.interceptors.i1.regex=\{.*\}  
    13. tier1.sources.source1.interceptors.i2.type=timestamp  
    14.   
    15. tier1.channels.channel1.type=memory  
    16. tier1.channels.channel1.capacity=10000  
    17. tier1.channels.channel1.transactionCapacity=1000  
    18. tier1.channels.channel1.keep-alive=30  
    19.   
    20. tier1.sinks.sink1.type=hdfs  
    21. tier1.sinks.sink1.channel=channel1  
    22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  
    23. tier1.sinks.sink1.hdfs.fileType=DataStream  
    24. tier1.sinks.sink1.hdfs.writeFormat=Text  
    25. tier1.sinks.sink1.hdfs.rollInterval=0  
    26. tier1.sinks.sink1.hdfs.rollSize=10240  
    27. tier1.sinks.sink1.hdfs.rollCount=0  
    28. tier1.sinks.sink1.hdfs.idleTimeout=60  

    我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\{.*\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
    i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。
     
    然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。

     多个日志文件flume如何统计?

    http://www.aboutyun.com/forum.php?mod=viewthread&tid=14530

  • 相关阅读:
    PPP协议 PAP认证
    交换机广播风暴,STP生成树协议,端口聚合
    传统远程注入线程,加载DLL
    EXE和DLL调用关系,DLL制作,钩子
    window bat批处理 实用脚本
    python利用scapy嗅探流量
    关于AWD线下攻防的经验
    APP 仿微信登录
    价值1500左右的毕业设计都开源啦
    EOS2.0环境搭建-centos7
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/6861621.html
Copyright © 2011-2022 走看看