zoukankan      html  css  js  c++  java
  • flume拦截器

    拦截器作用:拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。

    flume修改时间戳的插件见 https://github.com/haebin/flume-timestamp-interceptor

    有一个缺陷是,DateUtils.parseDate(timestamp, dateFormat)里面的dateFormat不支持unix时间戳,只能自己手动添加了

    原来是:

    1. String timestamp = get(index, data);
    2. now = DateUtils.parseDate(timestamp, dateFormat).getTime();
    3. headers.put(TIMESTAMP, Long.toString(now));

    修改后

    1. String timestamp = get(index, data);
    2. if (dateFormat[0].equals("tsecond")){
    3. now = Long.parseLong(timestamp)*1000;
    4. }
    5. else if(dateFormat[0].equals("tmillisecond")){
    6. now = Long.parseLong(timestamp);
    7. }
    8. else if(dateFormat[0].equals("tnanosecond")){
    9. now = Long.parseLong(timestamp)/1000000;
    10. }
    11. else {
    12. now = DateUtils.parseDate(timestamp, dateFormat).getTime();
    13. }
    14. headers.put(TIMESTAMP, Long.toString(now));

    flume配置:

    1. kafka_sn_hive.sources.s1.interceptors = timestamp
    2. kafka_sn_hive.sources.s1.interceptors.timestamp.type = org.apache.flume.interceptor.EventTimestampInterceptor$Builder
    3. kafka_sn_hive.sources.s1.interceptors.timestamp.preserveExisting = false
    4. kafka_sn_hive.sources.s1.interceptors.timestamp.delimiter = ,
    5. kafka_sn_hive.sources.s1.interceptors.timestamp.dateIndex = 4
    6. kafka_sn_hive.sources.s1.interceptors.timestamp.dateFormat = tsecond

    表示按逗号做分隔符的第四个(从0开始)字段是一个秒单位的时间戳。

    在flume里面,时间戳是毫秒级别的,所以要判断这个字段是秒还是毫秒纳秒

    见http://lisux.me/lishuai/?p=867

  • 相关阅读:
    [置顶] location.href你真的会用了?
    Hive HA使用说明
    十进制转换为任意进制及操作符重载
    《重构》学习总结
    linux kill进程和子进程小trick
    express for node 路由route几种实现方式的思考
    BEGINNING SHAREPOINT® 2013 DEVELOPMENT 第2章节--SharePoint 2013 App 模型概览 总结
    Can rename table but can not truncate table
    HBase
    链表解说和基本操作练习附代码
  • 原文地址:https://www.cnblogs.com/doosmile/p/6284024.html
Copyright © 2011-2022 走看看