拦截器作用:拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。
flume修改时间戳的插件见 https://github.com/haebin/flume-timestamp-interceptor
有一个缺陷是,DateUtils.parseDate(timestamp, dateFormat)里面的dateFormat不支持unix时间戳,只能自己手动添加了
原来是:
- String timestamp = get(index, data);
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- headers.put(TIMESTAMP, Long.toString(now));
修改后
- String timestamp = get(index, data);
- if (dateFormat[0].equals("tsecond")){
- now = Long.parseLong(timestamp)*1000;
- }
- else if(dateFormat[0].equals("tmillisecond")){
- now = Long.parseLong(timestamp);
- }
- else if(dateFormat[0].equals("tnanosecond")){
- now = Long.parseLong(timestamp)/1000000;
- }
- else {
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- }
- headers.put(TIMESTAMP, Long.toString(now));
flume配置:
- kafka_sn_hive.sources.s1.interceptors = timestamp
- kafka_sn_hive.sources.s1.interceptors.timestamp.type = org.apache.flume.interceptor.EventTimestampInterceptor$Builder
- kafka_sn_hive.sources.s1.interceptors.timestamp.preserveExisting = false
- kafka_sn_hive.sources.s1.interceptors.timestamp.delimiter = ,
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateIndex = 4
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateFormat = tsecond
表示按逗号做分隔符的第四个(从0开始)字段是一个秒单位的时间戳。
在flume里面,时间戳是毫秒级别的,所以要判断这个字段是秒还是毫秒纳秒
见http://lisux.me/lishuai/?p=867