有的时候我们想要对Source的数据做自定义的一些修改操作。
我们主要是通过实现Interceptor 接口来达到我们的目的。
第一步:
我们先使用IDEA创建一个空的maven项目,然后添加一个flume依赖即可。
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
第二步:
直接新建一个实现类LogFormat
代码如下:
import org.apache.commons.codec.binary.Base64;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Description: TODO
*
* @Author: 留歌36
* @Date: 2019-11-20 15:02
*/
public class LogFormat implements Interceptor {
private String date = null;
private String year = "";
private String month = "";
private String day = "";
private String hour = "";
final Base64 base64 = new Base64();
public void initialize() {
}
/** 获取信息头和信息体
*
* 主要是把原始日志中的 日期时间字段 和 其中的部分数据进行解密好
*
* */
public Event intercept(Event event) {
Map<String, String> head = event.getHeaders();
String body = new String(event.getBody());
// 正则提取
Pattern p = Pattern.compile("\[([^]]+)\]");
Matcher matcher = p.matcher(body);
while (matcher.find()){
String _matcher = matcher.group(1);
date = _matcher;
}
if (date != null){
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH);
SimpleDateFormat sd = new SimpleDateFormat("dd");
SimpleDateFormat sm = new SimpleDateFormat("MMM");
SimpleDateFormat sy = new SimpleDateFormat("yyyy");
SimpleDateFormat sh = new SimpleDateFormat("HH");
try {
Date _date = sdf.parse(date);
year = sy.format(_date);
month = sm.format(_date);
date = sd.format(_date);
hour = sh.format(_date);
} catch (ParseException e) {
e.printStackTrace();
}
String[] _body = body.split(" ");
String[] _splits = _body[6].split("\?");
String mes = "";
if (_splits.length == 2){
mes = new String(base64.decode(_splits[1]));
}
event.setBody(mes.getBytes());
head.put("yearStr",year);
head.put("monthStr",month);
head.put("dayStr",day);
head.put("hourStr",hour);
return event;
}
return null;
}
/** 每一条数据都要循环上面的方法 */
public List<Event> intercept(List<Event> list) {
for (Event event:list){
intercept(event);
}
return list;
}
public void close() {
}
/**
* 把这个类打包成一个单独的Jar,上传到Flume目录下的Lib文件夹中即可
*
*
* */
public static class Builder implements Interceptor.Builder{
/** 返回我们上面这个类 */
public Interceptor build() {
return new LogFormat();
}
public void configure(Context context) {
}
}
}
第三步:
打包直接上传到$FLUME_HOME/lib目录下
第四步:
编写自己的conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = LogFormat$Builder
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/nginxlog/access.log
a1.sources.r1.shell = /bin/sh -c
# a1.sinks.k1.type = logger
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.1.200:8020/user/hive/warehouse/%{yearStr}/%{monthStr}/%{dayStr}
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = %{hourStr}
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/test2.conf -Dflume.root.logger=INFO,console