zoukankan      html  css  js  c++  java
  • 日志系统之扩展Flume-LineDeserializer

    本人博客文章如未特别注明皆为原创。如有转载请注明出处:http://blog.csdn.net/yanghua_kobe/article/details/46595401

    继续闲聊日志系统,在之前的博文里已提到我们在日志收集上的选择是flume-ng。应用程序将日志打到各自的日志文件或指定的目录(日志文件按天滚动),然后利用flume的agent去日志文件里收集。

    Deserializer简单介绍

    flume将一条日志抽象成一个event。

    这里我们从日志文件里收集日志採用的是定制版的SpoolDirectorySource(我们对当日日志文件追加写入收集提供了支持)。

    从日志源中将每条日志转换成event须要Deserializer(反序列化器)。

    flume的每个source相应的deserializer必须实现接口EventDeserializer,该接口定义了readEvent/readEvents方法从各种日志源读取Event。

    flume主要支持两种反序列化器:

    (1)AvroEventDeserializer:解析Avro容器文件的反序列化器。对Avro文件的每条记录生成一个flume Event,并将基于avro编码的二进制记录存入event body中。

    (2)LineDeserializer:它是基于日志文件的反序列化器。以“ ”行结束符将每行区分为一条日志记录。

    LineDeserializer的缺陷

    大部分情况下SpoolDictionarySource配合LineDeserializer工作起来都没问题。

    但当日志记录本身被切割成多行时。比方异常日志的堆栈或日志中包括“ ”换行符时,问题就来了:原先的按行界定日志记录的方式不能满足这样的要求。形如这样的格式:

    [2015-06-22 13:14:28,780] [ERROR] [sysName] [subSys or component] [Thread-9] [com.messagebus.client.handler.common.CommonLoopHandler] -*- stacktrace -*- : com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
    	at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:203)
    	at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:220)
    	at com.messagebus.client.handler.common.CommonLoopHandler.handle(CommonLoopHandler.java:34)
    	at com.messagebus.client.handler.consume.ConsumerDispatchHandler.handle(ConsumerDispatchHandler.java:17)
    	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
    	at com.messagebus.client.handler.consume.RealConsumer.handle(RealConsumer.java:44)
    	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
    	at com.messagebus.client.handler.consume.ConsumerTagGenerator.handle(ConsumerTagGenerator.java:22)
    	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
    	at com.messagebus.client.handler.consume.ConsumePermission.handle(ConsumePermission.java:37)
    	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
    	at com.messagebus.client.handler.consume.ConsumeParamValidator.handle(ConsumeParamValidator.java:17)
    	at com.messagebus.client.handler.MessageCarryHandlerChain.handle(MessageCarryHandlerChain.java:72)
    	at com.messagebus.client.carry.GenericConsumer.run(GenericConsumer.java:50)
    	at java.lang.Thread.run(Thread.java:744)
    Caused by: com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

    当然你也能够对日志内容进行特殊处理,让一条日志的全部内容以一行输出,但这样须要对日志框架进行定制。有时这并不受你控制。

    因此这里最好的选择是定制日志收集器。

    源代码问题定位

    我们先来了解一下Flume源代码中LineDeserializer的核心实现:

      private String readLine() throws IOException {
        StringBuilder sb = new StringBuilder();
        int c;
        int readChars = 0;
        while ((c = in.readChar()) != -1) {
          readChars++;
    
          // FIXME: support 
    
          if (c == '
    ') {
            break;
          }
    
          sb.append((char)c);
    
          if (readChars >= maxLineLength) {
            logger.warn("Line length exceeds max ({}), truncating line!",
                maxLineLength);
            break;
          }
        }
    
        if (readChars > 0) {
          return sb.toString();
        } else {
          return null;
        }
      }

    首先,构建一个StringBuilder,然后以字符为单位挨个读取,假设读取到换行符“ ”。则表示读取本条日志结束。跳出循环。否则将该字符串追加到StringBuilder中。与此同一时候会给读取的字符个数计数:假设读取的字符个数大于预先配置的一行日志的最大字符串长度,也会跳出循环。


    这里的主要问题出在以换行符“ ”作为日志结尾的分隔符逻辑上。当我们记录异常日志时。我们须要又一次找到一种界定日志记录结尾的方式。

    解决思路

    考虑到我们採用[]作为日志的tag界定符。每条日志差点儿都是以“[”打头。因此,我们採取的做法是:推断读取到换行符“ ”后再预读下一位。假设下一位是“[”。则觉得这是一条普通不换行的日志。此时再回退一个字符(由于刚刚预读了一个字符,须要让指针后退回原来的位置),然后跳出循环;而假设下一位不是“[”,则觉得它是一个异常日志或者多行日志。则继续往后读取字符,当遇到换行符时,再次反复以上推断。当然假设你的日志格式是以某个固定的格式打头,首字母固定的话,才干够用这样的方式,否则你非常可能要配置日志的apender,使其以某个特定的符号作为日志的结尾来推断了。

    另外,有时也能够基于正则来匹配。

    定制实现

    为了提升扩展性,我们提供对预读的下一个字符进行配置。并将其命名为:newLineStartPrefix。我们新建一个反序列化类:MultiLineDeserializer。该类的大部分逻辑都跟LineDeserializer相同,主要须要又一次实现上面的readLine方法,实现例如以下:

        private String readLine() throws IOException {
            StringBuilder sb = new StringBuilder();
            int c;
            int readChars = 0;
            while ((c = in.readChar()) != -1) {
                readChars++;
    
                // FIXME: support 
    
                if (c == '
    ') {
    
                    //walk more one step
                    c = in.readChar();
                    if (c == -1)
                        break;
                    else if (c == this.newLineStartPrefix) {    //retreat one step
                        long currentPosition = in.tell();
                        in.seek(currentPosition - 1);
                        break;
                    }
                }
    
                sb.append((char)c);
    
                if (readChars >= maxLineLength) {
                    logger.warn("Line length exceeds max ({}), truncating line!",
                                maxLineLength);
                    break;
                }
            }
    
            if (readChars > 0) {
                return sb.toString();
            } else {
                return null;
            }
        }

    这里有个小插曲,由于之前已定制了source/sink的缘故。原以为deserializer也能够用相同的方式进行定制。并在agent的deserializer配置中指定定制过的deserializer的全然限定名。但经过验证后发现。这条路走不通。会报错(貌似从flume官网上也找不到对deserializer定制的介绍)。

    因此,仅仅能在源代码上进行扩展,然后编译源代码。又一次生成jar。

    从源代码里你会发现为什么在第三方包内扩展deserializer是行不通的。从github上clone下源代码。进入flume-ng-core module的例如以下类:org.apache.flume.serialization.EventDeserializerType,你就会一目了然:

    public enum EventDeserializerType {
      LINE(LineDeserializer.Builder.class),
      MULTILINE(MultiLineDeserializer.Builder.class),
      AVRO(AvroEventDeserializer.Builder.class),
      OTHER(null);
    
      private final Class<? extends EventDeserializer.Builder> builderClass;
    
      EventDeserializerType(Class<? extends EventDeserializer.Builder> builderClass) {
        this.builderClass = builderClass;
      }
    
      public Class<? extends EventDeserializer.Builder> getBuilderClass() {
        return builderClass;
      }
    
    }

    你必须显式在这里定义deserializer的枚举,然后指定其builder的Class实例,并在agent里的deserializer配置项中填写你这里的枚举名称才行。我们仅仅需在子package:serialization中新建MultiLineDeserializer类,然后又一次实现逻辑、编译、打包flume-ng-core Module生成新的jar就可以。flume将其源代码中的每个Module生成的jar都放在其二进制包的lib目录下。你仅仅需将又一次打包好的flume-ng-core jar替换原来的,重新启动agent就可以看到效果。

    这里还有个须要注意的地方:LineDeserializer有一个參数(maxLineLength)用于定义一个日志行的最长字符数。

    假设某条日志超过这个长度,将不再读取。而一条日志占领多行情况下,该值须要适当增大,由于像异常日志的堆栈长度明显比普通日志长不少。这里你能够设置为8192。

  • 相关阅读:
    爬虫助手spider_tool-JUN
    frida 保存打印日志到本地
    frida get_frontmost_application报错
    adb shell安装证书/修改证书到系统级/
    利用celery进行分布式爬虫
    vscode Go插件安装失败解决方法,亲测2020.10.15
    Frida hook map集合遍历和修改
    frida_rpc dou音、饿le么 header加密
    Frida入门
    adb连接模拟器
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/7280993.html
Copyright © 2011-2022 走看看