zoukankan      html  css  js  c++  java
  • Flume wasn't able to parse timestamp header

    来自:http://caiguangguang.blog.51cto.com/1652935/1384187

    flume bucketpath的bug一例

    测试的配置文件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    agent-server1.sources= testtail
    agent-server1.sinks = hdfs-sink
    agent-server1.channels= hdfs-channel
    agent-server1.sources.testtail.type = netcat
    agent-server1.sources.testtail.bind = localhost
    agent-server1.sources.testtail.port = 9999
    agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOP
    agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytab
    agent-server1.channels.hdfs-channel.type = memory
    agent-server1.channels.hdfs-channel.capacity = 200000000
    agent-server1.channels.hdfs-channel.transactionCapacity = 10000
    agent-server1.sinks.hdfs-sink.type = hdfs
    agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%d
    agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
    agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
    agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
    agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
    agent-server1.sinks.hdfs-sink.hdfs.round = false
    agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
    agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
    agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
    agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
    agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
    agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
    agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
    agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
    agent-server1.sinks.hdfs-sink.channel = hdfs-channel
    agent-server1.sources.testtail.channels = hdfs-channel

    在启动服务后,使用telnet进行测试,发现如下报错:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failed
    java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
     Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
            at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
            at java.lang.Thread.run(Thread.java:662)
    Caused by: java.lang.NumberFormatException: null
            at java.lang.Long.parseLong(Long.java:375)
            at java.lang.Long.valueOf(Long.java:525)
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
            ... 5 more
    14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
    resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
            at java.lang.Thread.run(Thread.java:662)
    Caused by: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
            at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
            at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
            ... 3 more
    Caused by: java.lang.NumberFormatException: null
            at java.lang.Long.parseLong(Long.java:375)
            at java.lang.Long.valueOf(Long.java:525)
            at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
            ... 5 more

    从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。
    在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
    其中replaceShorthand方法的相关代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
      public static String replaceShorthand(char c, Map<String, String> headers,
          TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
        String timestampHeader = headers.get("timestamp");
        long ts;
        try {
          ts = Long.valueOf(timestampHeader);
        } catch (NumberFormatException e) {
          throw new RuntimeException("Flume wasn't able to parse timestamp header"
            + " in the event to resolve time based bucketing. Please check that"
            + " you're correctly populating timestamp header (for example using"
            + " TimestampInterceptor source interceptor).", e);
        }
        if(needRounding){
          ts = roundDown(roundDown, unit, ts);
        }
    ........

    从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。
    这其实是flume的一个bug,bug id:
    https://issues.apache.org/jira/browse/FLUME-1419
    解决方法有3个:
    1.更改配置,更新hdfs文件的路径格式

    1
    agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume

    但是这样就不能按天来存放日志了
    2.通过更改相关的代码
    (patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)
    如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
    相关代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
         String timestampHeader = headers.get("timestamp");
         long ts;
         try {
          if (timestampHeader == null) {
            ts = System.currentTimeMillis();
          } else {
            ts = Long.valueOf(timestampHeader);
          }
         } catch (NumberFormatException e) {
           throw new RuntimeException("Flume wasn't able to parse timestamp header"
             + " in the event to resolve time based bucketing. Please check that"
             + " you're correctly populating timestamp header (for example using"
                      + " TimestampInterceptor source interceptor).", e);
    }

    3.为source定义基于timestamp的interceptors 
    在配置中增加两行即可:

    1
    2
    agent-server1.sources.testtail.interceptors = i1
    agent-server1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

    一个技巧:
    在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

    1
    -Dflume.root.logger=DEBUG,console,LOGFILE
  • 相关阅读:
    Array,prototype.toString.call()
    js 中的delete运算符
    c#连接sql数据库以及操作数据库
    ArrayList集合
    查找出数据表中的重复值
    C#中的List<string>泛型类示例
    C#中的List<string>泛型类示例
    C#中Convert.ToInt32、int.TryParse、(int)和int.Parse四者的区别
    C#获取文件夹下的所有文件的方法
    从本地文件夹中读取文本文档,并将所有的文档内容合并到一个文本中
  • 原文地址:https://www.cnblogs.com/sunxucool/p/3820499.html
Copyright © 2011-2022 走看看