zoukankan      html  css  js  c++  java
  • flume拦截器

    拦截器主要分两种:ETL 拦截器、日志类型区分拦截器。
    ETL 拦截器主要用于过滤时间戳不合法和 Json 数据不完整的日志
    日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往 Kafka 的不同Topic。

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.ldy</groupId>
        <artifactId>flume-interceptor</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.7.0</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
    
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>

    自定义ETL拦截器:

    package com.ldy.flume.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
    
    public class LogETLInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            // 1 获取数据
            byte[] body = event.getBody();
            String log = new String(body, Charset.forName("UTF-8"));
    
            // 2 判断数据类型并向Header中赋值
         //log中有个en(event_name)字段表明该日志是不是启动日志类型
    if (log.contains("start")) { //3 校验数据 if (LogUtils.validateStart(log)){ return event; } }else { if (LogUtils.validateEvent(log)){ return event; } } // 4 返回校验结果 return null; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); if (intercept1 != null){ interceptors.add(intercept1); } } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogETLInterceptor(); } @Override public void configure(Context context) { } } }

    自定义Utils(ETL的清洗工具类):

    package com.ldy.flume.interceptor;
    import org.apache.commons.lang.math.NumberUtils;
    
    public class LogUtils {
    
        //检测event数据格式
        public static boolean validateEvent(String log) {
            // 样本: 服务器时间 | json
            /*  1613956146081|
            {"cm":
                {"ln":"-69.2","sv":"V2.3.0","os":"8.1.1","g":"G0MLZ2K4@gmail.com","mid":"1","nw":"4G","l":"pt","vc":"10","hw":"1080*1920","ar":"MX","uid":"1","t":"1613861466406","la":"11.9","md":"sumsung-4","vn":"1.0.9","ba":"Sumsung","sr":"V"},
    
             "ap":"app",
             "et":[{"ett":"1613872348614",
                    "en":"newsdetail",
                    "kv":{"entry":"3","goodsid":"0","news_staytime":"24","loading_time":"0","action":"2","showtype":"1","category":"9","type1":""}},
    
                 {"ett":"1613873812304",
                    "en":"loading",
                     "kv":{"extend2":"","loading_time":"6","action":"3","extend1":"","type":"1","type1":"325","loading_way":"2"}},
                   ]}
    
             */
    
            // 1 在|切割
            String[] logContents = log.split("\|");
    
            // 2 校验
            if(logContents.length != 2){
                return false;
            }
    
            //3 校验服务器时间
            if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
                return false;
            }
    
            // 4 校验json
            if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
                return false;
            }
    
            return true;
        }
    
        //检测start数据格式
        public static boolean validateStart(String log) {
        /* {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2",
        "extend1 ":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4",
        "ln":"-98.3","lo ading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1",
        "os":"8.2. 1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}
        */
            if (log == null){
                return false;
            }
    
            // 校验json
            if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
                return false;
            }
    
            return true;
        }
    }

    自定义Type拦截器:

    package com.ldy.flume.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Event有Headers,Body两个部位
     * Body存放具体数据,如果数据为启动日志,会专门有一个字段来记录 "en":"start"
     * Headers可以存放主题信息区分日志类型,以便发送不同的topic
     */
    public class LogTypeInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            // 1 获取body数据
            byte[] body = event.getBody();
            String log = new String(body, Charset.forName("UTF-8"));
    
            // 2 获取header
            Map<String, String> headers = event.getHeaders();
    
            // 3 判断数据类型并向Header中赋值
            if (log.contains("start")) {
                headers.put("topic","topic_start");
            }else {
                headers.put("topic","topic_event");
            }
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
    
            ArrayList<Event> interceptors = new ArrayList<>();
    
            for (Event event : events) {
                Event intercept1 = intercept(event);
    
                interceptors.add(intercept1);
            }
    
            return interceptors;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements  Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new LogTypeInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }

    最后打包放到flume/lib下

    只要在flume的job文件使用到拦截器就能生效了

  • 相关阅读:
    mysql ACID与四种隔离级别归纳总结
    python django查询12306火车票
    python json dumps与loads有可能犯的错误
    python出现UnicodeEncodeError有可能产生的另一个原因
    python安装新版本及pip
    Django添加防跨站请求伪造中间件
    python List的一些相关操作
    mysql中varbinary、binary、char、varchar异同
    (原创)如何使用boost.asio写一个简单的通信程序(一)
    (原创)用c++11实现简洁的ScopeGuard
  • 原文地址:https://www.cnblogs.com/ldy233/p/14435284.html
Copyright © 2011-2022 走看看