zoukankan      html  css  js  c++  java
  • Flume自定义Interceptor

    有的时候我们想要对Source的数据做自定义的一些修改操作。

    我们主要是通过实现Interceptor 接口来达到我们的目的。

    第一步:
    我们先使用IDEA创建一个空的maven项目,然后添加一个flume依赖即可。

        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
        </dependencies>
    

    第二步:
    直接新建一个实现类LogFormat
    代码如下:

    import org.apache.commons.codec.binary.Base64;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    import java.util.Locale;
    import java.util.Map;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    /**
     * Description: TODO
     *
     * @Author: 留歌36
     * @Date: 2019-11-20 15:02
     */
    public class LogFormat implements Interceptor {
        private String date = null;
        private String year = "";
        private String month = "";
        private String day = "";
        private String hour = "";
    
    
        final Base64 base64 = new Base64();
    
        public void initialize() {
    
        }
    
        /** 获取信息头和信息体
         *
         *  主要是把原始日志中的 日期时间字段 和 其中的部分数据进行解密好
         *
         * */
        public Event intercept(Event event) {
            Map<String, String>  head = event.getHeaders();
            String body = new String(event.getBody());
    
    //        正则提取
            Pattern p = Pattern.compile("\[([^]]+)\]");
    
            Matcher matcher = p.matcher(body);
            while (matcher.find()){
                String _matcher = matcher.group(1);
                date = _matcher;
            }
            if (date != null){
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH);
    
                SimpleDateFormat sd = new SimpleDateFormat("dd");
    
                SimpleDateFormat sm = new SimpleDateFormat("MMM");
    
                SimpleDateFormat sy = new SimpleDateFormat("yyyy");
    
                SimpleDateFormat sh = new SimpleDateFormat("HH");
    
                try {
                    Date _date = sdf.parse(date);
    
                    year = sy.format(_date);
                    month = sm.format(_date);
                    date = sd.format(_date);
                    hour = sh.format(_date);
    
                } catch (ParseException e) {
                    e.printStackTrace();
                }
    
                String[] _body = body.split(" ");
                String[] _splits = _body[6].split("\?");
    
                String mes = "";
                if (_splits.length == 2){
                    mes = new String(base64.decode(_splits[1]));
                }
    
                event.setBody(mes.getBytes());
    
                head.put("yearStr",year);
                head.put("monthStr",month);
                head.put("dayStr",day);
                head.put("hourStr",hour);
                return event;
            }
    
    
            return null;
        }
    
        /** 每一条数据都要循环上面的方法 */
        public List<Event> intercept(List<Event> list) {
            for (Event event:list){
                intercept(event);
            }
            return list;
        }
    
        public void close() {
    
        }
    
    
    
    
        /**
         * 把这个类打包成一个单独的Jar,上传到Flume目录下的Lib文件夹中即可
         *
         *
         * */
        public static class Builder implements Interceptor.Builder{
    
            /** 返回我们上面这个类 */
            public Interceptor build() {
                return new LogFormat();
            }
    
            public void configure(Context context) {
    
            }
        }
    }
    
    

    第三步:
    打包直接上传到$FLUME_HOME/lib目录下

    第四步:
    编写自己的conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = LogFormat$Builder
    
    
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/nginxlog/access.log
    a1.sources.r1.shell = /bin/sh -c
    
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://192.168.1.200:8020/user/hive/warehouse/%{yearStr}/%{monthStr}/%{dayStr}
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.filePrefix = %{hourStr}
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    
    
    
    a1.channels.c1.type = memory
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动:

     flume-ng agent --name a1  --conf $FLUME_HOME/conf  --conf-file $FLUME_HOME/conf/test2.conf -Dflume.root.logger=INFO,console
    
  • 相关阅读:
    数据库原理 第七章 数据库设计和ER模型
    jeecgboot常见问题及处理方法-found character '@' that cannot start any token. (Do not use @ for indentation)
    jeecgboot积木报表(jimuReport)Oracle切换
    datart表结构
    这几天找工作的经历
    Jenkins 无法登陆解决方法
    Nginx 部署前后端分离项目(SpringBoot Vue)
    CentOS7 用yum方式安装Nginx
    Centos 7 安装 MYSQL 8.0
    Centos 7 安装 JDK1.8
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614697.html
Copyright © 2011-2022 走看看