zoukankan      html  css  js  c++  java
  • flume自定义反序列化器deserializer

    需求背景:

      在利用flume进行日志收集的时候,错误信息会将堆栈多行打印,需要将多行信息合并成一行,包装成一个event进行传输。

    解决思路: 

      解决上述需求可以通过自定义拦截器和自定义反序列化器来实现。网上关于自定义拦截器的资料比较多,但考虑到拦截器的定位和使用场景,拦截器不应用于多个event拆分组合,并若flume有并发处理的话,不能保证读取event是顺序的。查阅资料发现,通过自定义flume的反序列化器更加合理和安全。

    实现步骤:

      1:新建一个类,实现 EventDeserializer 接口

      2: 重写 readEvent()方法或readEvents方法

      3: 修改flume的配置文件,将sources.deserializer属性设置为自定义类

    源码:

      1:自定义反序列化器 ---> MyLineDeserializer

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     * <p>
     * http://www.apache.org/licenses/LICENSE-2.0
     * <p>
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.xxx.flume.serializer;
    
    import com.google.common.collect.Lists;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.annotations.InterfaceAudience;
    import org.apache.flume.annotations.InterfaceStability;
    import org.apache.flume.event.EventBuilder;
    import org.apache.flume.serialization.EventDeserializer;
    import org.apache.flume.serialization.ResettableInputStream;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.nio.charset.Charset;
    import java.util.List;
    
    /**
     * A deserializer that parses text lines from a file.
     */
    @InterfaceAudience.Private
    @InterfaceStability.Evolving
    public class MyLineDeserializer implements EventDeserializer {
    
        private static final Logger logger = LoggerFactory.getLogger
                (MyLineDeserializer.class);
    
        private final ResettableInputStream in;
        private final Charset outputCharset;
        private final int maxLineLength;
        private volatile boolean isOpen;
    
        public static final String OUT_CHARSET_KEY = "outputCharset";
        public static final String CHARSET_DFLT = "UTF-8";
    
        public static final String MAXLINE_KEY = "maxLineLength";
        public static final int MAXLINE_DFLT = 2048;
        private StringBuffer eventStringBuffer = new StringBuffer();
    
        MyLineDeserializer(Context context, ResettableInputStream in) {
            this.in = in;
            this.outputCharset = Charset.forName(
                    context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
            this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
            this.isOpen = true;
        }
    
    
        /**
         * Reads a line from a file and returns an event
         *
         * @return Event containing parsed line
         * @throws IOException
         */
        @Override
        public Event readEvent() throws IOException {
            ensureOpen();
            String line = readLine();
            Event event = null;
    
            while (line != null) {
                //  start with 20 is one timestamp , event end
                if (line.trim().startsWith("20")) {
                    event = EventBuilder.withBody(eventStringBuffer.toString(), outputCharset);
                    eventStringBuffer.delete(0, eventStringBuffer.length());
                }
                //  add current line push to buffer
                if (line.trim().length() > 0) {
                    if (eventStringBuffer.length() > 0) {
                        eventStringBuffer.append(System.lineSeparator()).append(line);
                    } else {
                        eventStringBuffer.append(line);
                    }
                }
                if (line.trim().startsWith("20")) {
                    break;
                }
                line = readLine();
            }
    
            if (line == null && eventStringBuffer.toString().length() > 0 ){
                event =  EventBuilder.withBody(eventStringBuffer.toString(), outputCharset);
                eventStringBuffer.delete(0, eventStringBuffer.length());
                return event;
            }
            return event;
        }
    
        /**
         * Batch line read
         *
         * @param numEvents Maximum number of events to return.
         * @return List of events containing read lines
         * @throws IOException
         */
        @Override
        public List<Event> readEvents(int numEvents) throws IOException {
            ensureOpen();
            List<Event> events = Lists.newLinkedList();
            for (int i = 0; i < numEvents; i++) {
                Event event = readEvent();
                if (event != null) {
                    events.add(event);
                } else {
                    break;
                }
            }
            return events;
        }
    
        @Override
        public void mark() throws IOException {
            ensureOpen();
            in.mark();
        }
    
        @Override
        public void reset() throws IOException {
            ensureOpen();
            in.reset();
        }
    
        @Override
        public void close() throws IOException {
            if (isOpen) {
                reset();
                in.close();
                isOpen = false;
            }
        }
    
        private void ensureOpen() {
            if (!isOpen) {
                throw new IllegalStateException("Serializer has been closed");
            }
        }
    
        // TODO: consider not returning a final character that is a high surrogate
        // when truncating
        private String readLine() throws IOException {
            StringBuilder sb = new StringBuilder();
            int c;
            int readChars = 0;
            while ((c = in.readChar()) != -1) {
                readChars++;
    
                // FIXME: support 
    
                if (c == '
    ') {
                    break;
                }
    
                sb.append((char) c);
    
                if (readChars >= maxLineLength) {
                    logger.warn("Line length exceeds max ({}), truncating line!",
                            maxLineLength);
                    break;
                }
            }
    
            if (readChars > 0) {
                return sb.toString();
            } else {
                return null;
            }
        }
    
        public static class Builder implements EventDeserializer.Builder {
    
            @Override
            public MyLineDeserializer build(Context context, ResettableInputStream in) {
                return new MyLineDeserializer(context, in);
            }
    
        }
    
    }

      2: flume 配置文件

    a1.sources.r1.deserializer =  com.xxx.flume.serializer.MyLineDeserializer$Builder
  • 相关阅读:
    [微软官方]SQLSERVER的兼容级别
    使用 OPENJSON 分析和转换 JSON 数据 (SQL Server)
    WPF 解决TreeViewItem上为IsMouseOver 时 父级Item也会 受影响
    依赖注入
    关于编译告警 C4819 的完整解决方案
    你想知道的 std::vector::push_back 和 std::vector::emplace_back
    如何使用 Dump 文件?
    关于 PDB 文件你需要知道什么?
    图解哈希表及其原理
    C++ 中的虚函数表及虚函数执行原理
  • 原文地址:https://www.cnblogs.com/yuwenhui/p/9367625.html
Copyright © 2011-2022 走看看