zoukankan      html  css  js  c++  java
  • [bigdata] 使用Flume hdfs sink, hdfs文件未关闭的问题

    现象: 执行mapreduce任务时失败

    通过hadoop fsck -openforwrite命令查看发现有文件没有关闭。

    [root@com ~]# hadoop fsck -openforwrite /data/rc/click/mpp/15-08-05/
    DEPRECATED: Use of this script to execute hdfs command is deprecated.
    Instead use the hdfs command for it.

    Connecting to namenode via http://com.hunantv.hadoopnamenode:50070
    FSCK started by root (auth:SIMPLE) from /10.100.1.46 for path /data/rc/click/mpp/15-08-05/ at Thu Aug 06 14:05:03 CST 2015
    ....................................................................................................
    ....................................................................................................
    ........./data/rc/click/mpp/15-08-05/FlumeData.1438758322864 42888 bytes, 1 block(s), OPENFORWRITE:
    /data/rc/click/mpp/15-08-05/FlumeData.1438758322864: Under replicated BP-1672356070-10.100.1.36-1412072991411:blk_1120646538_47162789{blockUCState=UNDER_CONSTRUCTION, primaryNodeIndex=-1, replicas=[ReplicaUnderConstruction[[DISK]DS-f4fff5f3-f3fd-4054-a75c-1d7da53a73af:NORMAL|FINALIZED], ReplicaUnderConstruction[[DISK]DS-26f54bc5-5026-4e6a-94ec-8435224e4aa9:NORMAL|RWR], ReplicaUnderConstruction[[DISK]DS-4ab3fffc-6468-47df-8023-79f23a330371:NORMAL|FINALIZED]]}. Target Replicas is 3 but found 2 replica(s).
    ..........................................................................................
    ............................Status: HEALTHY
    Total size: 99186583 B
    Total dirs: 1
    Total files: 328
    Total symlinks: 0
    Total blocks (validated): 328 (avg. block size 302398 B)
    Minimally replicated blocks: 328 (100.0 %)
    Over-replicated blocks: 0 (0.0 %)
    Under-replicated blocks: 1 (0.30487806 %)
    Mis-replicated blocks: 0 (0.0 %)
    Default replication factor: 3
    Average block replication: 2.996951
    Corrupt blocks: 0
    Missing replicas: 1 (0.101626016 %)
    Number of data-nodes: 59
    Number of racks: 6
    FSCK ended at Thu Aug 06 14:05:03 CST 2015 in 36 milliseconds


    The filesystem under path '/data/rc/click/mpp/15-08-05/' is HEALTHY

    查看FLume日志

    [root@10.100.1.117] out: 05 Aug 2015 11:15:19,322 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:234) - Creating hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp
    [root@10.100.1.117] out: 05 Aug 2015 11:16:20,493 INFO [hdfs-sin_hdfs_201-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter$5.call:429) - Closing idle bucketWriter hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp at 1438744580493
    [root@10.100.1.117] out: 05 Aug 2015 11:16:20,497 INFO [hdfs-sin_hdfs_201-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363) - Closing hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp
    [root@10.100.1.117] out: 05 Aug 2015 11:16:30,501 WARN [hdfs-sin_hdfs_201-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:370) - failed to close() HDFSWriter for file (hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp). Exception follows.
    [root@10.100.1.117] out: java.io.IOException: Callable timed out after 10000 ms on file: hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp
    [root@10.100.1.117] out: 05 Aug 2015 11:16:30,503 INFO [hdfs-sin_hdfs_201-call-runner-7] (org.apache.flume.sink.hdfs.BucketWriter$8.call:629) - Renaming hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp to hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293

    关闭hdfs文件操作因为超时失败,

    查看源码:

     public synchronized void close(boolean callCloseCallback)
        throws IOException, InterruptedException {
        checkAndThrowInterruptedException();
        try {
          flush();
        } catch (IOException e) {
          LOG.warn("pre-close flush failed", e);
        }
        boolean failedToClose = false;
        LOG.info("Closing {}", bucketPath);
        CallRunner<Void> closeCallRunner = createCloseCallRunner();
        if (isOpen) {
          try {
            callWithTimeout(closeCallRunner);
            sinkCounter.incrementConnectionClosedCount();
          } catch (IOException e) {
            LOG.warn(
              "failed to close() HDFSWriter for file (" + bucketPath +
                "). Exception follows.", e);
            sinkCounter.incrementConnectionFailedCount();
            failedToClose = true;
          }
          isOpen = false;
        } else {
          LOG.info("HDFSWriter is already closed: {}", bucketPath);
        }
    
        // NOTE: timed rolls go through this codepath as well as other roll types
        if (timedRollFuture != null && !timedRollFuture.isDone()) {
          timedRollFuture.cancel(false); // do not cancel myself if running!
          timedRollFuture = null;
        }
    
        if (idleFuture != null && !idleFuture.isDone()) {
          idleFuture.cancel(false); // do not cancel myself if running!
          idleFuture = null;
        }
    
        if (bucketPath != null && fileSystem != null) {
          // could block or throw IOException
          try {
            renameBucket(bucketPath, targetPath, fileSystem);
          } catch(Exception e) {
            LOG.warn(
              "failed to rename() file (" + bucketPath +
              "). Exception follows.", e);
            sinkCounter.incrementConnectionFailedCount();
            final Callable<Void> scheduledRename =
                    createScheduledRenameCallable();
            timedRollerPool.schedule(scheduledRename, retryInterval,
                    TimeUnit.SECONDS);
          }
        }
        if (callCloseCallback) {
          runCloseAction();
          closed = true;
        }
      }

    默认超时为10000ms,失败后没有重试,代码中有 failedToClose变量, 但未用到,可能开发人员忘了处理了。。。

    解决方法:

    1. 配置调用操作超时时间,将其调大一点,如5分钟。Flume hdfs sink配置如下:

    agent12.sinks.sin_hdfs_201.type=hdfs
    agent12.sinks.sin_hdfs_201.channel=ch_hdfs_201
    agent12.sinks.sin_hdfs_201.hdfs.path=hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-%{month}-%{day}
    agent12.sinks.sin_hdfs_201.hdfs.round=true
    agent12.sinks.sin_hdfs_201.hdfs.roundValue=10
    agent12.sinks.sin_hdfs_201.hdfs.roundUnit=minute
    agent12.sinks.sin_hdfs_201.hdfs.fileType=DataStream
    agent12.sinks.sin_hdfs_201.hdfs.writeFormat=Text
    agent12.sinks.sin_hdfs_201.hdfs.rollInterval=0
    agent12.sinks.sin_hdfs_201.hdfs.rollSize=209715200
    agent12.sinks.sin_hdfs_201.hdfs.rollCount=0
    agent12.sinks.sin_hdfs_201.hdfs.idleTimeout=300
    agent12.sinks.sin_hdfs_201.hdfs.batchSize=100
    agent12.sinks.sin_hdfs_201.hdfs.minBlockReplicas=1
    agent12.sinks.sin_hdfs_201.hdfs.callTimeout=300000
    

      

    2. 修改源码,增加重试。如下:

     public synchronized void close(boolean callCloseCallback)
                throws IOException, InterruptedException {
            checkAndThrowInterruptedException();
            try {
                flush();
            } catch (IOException e) {
                LOG.warn("pre-close flush failed", e);
            }
            boolean failedToClose = false;
            LOG.info("Closing {}", bucketPath);
            CallRunner<Void> closeCallRunner = createCloseCallRunner();
            int tryTime = 1;
            while (isOpen && tryTime <= 5) {
                try {
                    callWithTimeout(closeCallRunner);
                    sinkCounter.incrementConnectionClosedCount();
                } catch (IOException e) {
                    LOG.warn(
                            "failed to close() HDFSWriter for file (try times:" + tryTime + "): " + bucketPath +
                                    ". Exception follows.", e);
                    sinkCounter.incrementConnectionFailedCount();
                    failedToClose = true;
                }
                if (failedToClose) {
                    isOpen = true;
                    tryTime++;
                    Thread.sleep(this.callTimeout);
                } else {
                    isOpen = false;
                }
            }
            //如果isopen失敗
            if (isOpen) {
                LOG.error("failed to close file: " + bucketPath + " after " + tryTime + " tries.");
            } else {
                LOG.info("HDFSWriter is already closed: {}", bucketPath);
            }
    
            // NOTE: timed rolls go through this codepath as well as other roll types
            if (timedRollFuture != null && !timedRollFuture.isDone())
    
            {
                timedRollFuture.cancel(false); // do not cancel myself if running!
                timedRollFuture = null;
            }
    
            if (idleFuture != null && !idleFuture.isDone())
    
            {
                idleFuture.cancel(false); // do not cancel myself if running!
                idleFuture = null;
            }
    
            if (bucketPath != null && fileSystem != null) {
                // could block or throw IOException
                try {
                    renameBucket(bucketPath, targetPath, fileSystem);
                } catch (Exception e) {
                    LOG.warn(
                            "failed to rename() file (" + bucketPath +
                                    "). Exception follows.", e);
                    sinkCounter.incrementConnectionFailedCount();
                    final Callable<Void> scheduledRename =
                            createScheduledRenameCallable();
                    timedRollerPool.schedule(scheduledRename, retryInterval,
                            TimeUnit.SECONDS);
                }
            }
            if (callCloseCallback)
    
            {
                runCloseAction();
                closed = true;
            }
        }
    

      

    
    
    
  • 相关阅读:
    自定义异常
    finally关键字
    捕捉异常try-catch
    throws抛出异常
    exception概述和分类
    jvm前奏篇
    Java并发编程学习随笔 (一) 使用run() 和 start()的差别
    MyCat学习 ------分库分表 随笔
    java最常用的内置工具类
    Mybatis框架常见面试题
  • 原文地址:https://www.cnblogs.com/spec-dog/p/4708552.html
Copyright © 2011-2022 走看看