zoukankan      html  css  js  c++  java
  • Flume wasn't able to parse timestamp header

    来自:http://caiguangguang.blog.51cto.com/1652935/1384187

    flume bucketpath的bug一例

    测试的配置文件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    agent-server1.sources= testtail
    agent-server1.sinks = hdfs-sink
    agent-server1.channels= hdfs-channel
    agent-server1.sources.testtail.type = netcat
    agent-server1.sources.testtail.bind = localhost
    agent-server1.sources.testtail.port = 9999
    agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOP
    agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytab
    agent-server1.channels.hdfs-channel.type = memory
    agent-server1.channels.hdfs-channel.capacity = 200000000
    agent-server1.channels.hdfs-channel.transactionCapacity = 10000
    agent-server1.sinks.hdfs-sink.type = hdfs
    agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%d
    agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
    agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
    agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
    agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
    agent-server1.sinks.hdfs-sink.hdfs.round = false
    agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
    agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
    agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
    agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
    agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
    agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
    agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
    agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
    agent-server1.sinks.hdfs-sink.channel = hdfs-channel
    agent-server1.sources.testtail.channels = hdfs-channel

    在启动服务后,使用telnet进行测试,发现如下报错:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failed
    java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
     Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
            at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
            at java.lang.Thread.run(Thread.java:662)
    Caused by: java.lang.NumberFormatException: null
            at java.lang.Long.parseLong(Long.java:375)
            at java.lang.Long.valueOf(Long.java:525)
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
            ... 5 more
    14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
    resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
            at java.lang.Thread.run(Thread.java:662)
    Caused by: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
            at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
            ... 3 more
    Caused by: java.lang.NumberFormatException: null
            at java.lang.Long.parseLong(Long.java:375)
            at java.lang.Long.valueOf(Long.java:525)
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
            ... 5 more

    从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。
    在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
    其中replaceShorthand方法的相关代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
      public static String replaceShorthand(char c, Map<String, String> headers,
          TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
        String timestampHeader = headers.get("timestamp");
        long ts;
        try {
          ts = Long.valueOf(timestampHeader);
        } catch (NumberFormatException e) {
          throw new RuntimeException("Flume wasn't able to parse timestamp header"
            + " in the event to resolve time based bucketing. Please check that"
            + " you're correctly populating timestamp header (for example using"
            + " TimestampInterceptor source interceptor).", e);
        }
        if(needRounding){
          ts = roundDown(roundDown, unit, ts);
        }
    ........

    从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。
    这其实是flume的一个bug,bug id:
    https://issues.apache.org/jira/browse/FLUME-1419
    解决方法有3个:
    1.更改配置,更新hdfs文件的路径格式

    1
    agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume

    但是这样就不能按天来存放日志了
    2.通过更改相关的代码
    (patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)
    如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
    相关代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
         String timestampHeader = headers.get("timestamp");
         long ts;
         try {
          if (timestampHeader == null) {
            ts = System.currentTimeMillis();
          } else {
            ts = Long.valueOf(timestampHeader);
          }
         } catch (NumberFormatException e) {
           throw new RuntimeException("Flume wasn't able to parse timestamp header"
             + " in the event to resolve time based bucketing. Please check that"
             + " you're correctly populating timestamp header (for example using"
                      + " TimestampInterceptor source interceptor).", e);
    }

    3.为source定义基于timestamp的interceptors 
    在配置中增加两行即可:

    1
    2
    agent-server1.sources.testtail.interceptors = i1
    agent-server1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

    一个技巧:
    在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

    1
    -Dflume.root.logger=DEBUG,console,LOGFILE
  • 相关阅读:
    一维数组遍历
    实战级Stand-Alone Self-Attention in CV,快加入到你的trick包吧 | NeurIPS 2019
    ThunderNet :像闪电一样,旷视再出超轻量级检测器,高达267fps | ICCV 2019
    Light-Head R-CNN : 旷世提出用于加速two-stage detector的通用结构,速度达102fps
    NASNet : Google Brain经典作,改造搜索空间,性能全面超越人工网络,继续领跑NAS领域 | CVPR 2018
    ICLR 2020 | 抛开卷积,multi-head self-attention能够表达任何卷积操作
    告别炼丹,Google Brain提出强化学习助力Neural Architecture Search | ICLR2017
    AAAI 2020 | 反向R?削弱显著特征为细粒度分类带来提升
    AAAI 2020 | DIoU和CIoU:IoU在目标检测中的正确打开方式
    目标检测 | 经典算法 Cascade R-CNN: Delving into High Quality Object Detection
  • 原文地址:https://www.cnblogs.com/sunxucool/p/3820499.html
Copyright © 2011-2022 走看看