zoukankan      html  css  js  c++  java
  • 排查启动脚本,弄清依赖关系 弄清楚一个命令的执行后的影响 讲解 启动 依赖 源码分析

    [root@hadoop1 apache-flume-1.8.0-bin]# less bin/flume-ng

    #!/bin/bash
    #
    #
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    #
    
    ################################
    # constants
    ################################
    
    FLUME_AGENT_CLASS="org.apache.flume.node.Application"
    FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
    FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
    FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
    
    CLEAN_FLAG=1
    ################################
    # functions
    ################################
    
    info() {
      if [ ${CLEAN_FLAG} -ne 0 ]; then
        local msg=$1
        echo "Info: $msg" >&2
      fi
    }
    
    warn() {
      if [ ${CLEAN_FLAG} -ne 0 ]; then
        local msg=$1
        echo "Warning: $msg" >&2
      fi
    }
    
    error() {
      local msg=$1
      local exit_code=$2
    
      echo "Error: $msg" >&2
    
      if [ -n "$exit_code" ] ; then
        exit $exit_code
      fi
    }
    
    # If avail, add Hadoop paths to the FLUME_CLASSPATH and to the
    # FLUME_JAVA_LIBRARY_PATH env vars.
    # Requires Flume jars to already be on FLUME_CLASSPATH.
    add_hadoop_paths() {
      local HADOOP_IN_PATH=$(PATH="${HADOOP_HOME:-${HADOOP_PREFIX}}/bin:$PATH" 
          which hadoop 2>/dev/null)
    
      if [ -f "${HADOOP_IN_PATH}" ]; then
        info "Including Hadoop libraries found via ($HADOOP_IN_PATH) for HDFS access"
    
        # determine hadoop java.library.path and use that for flume
        local HADOOP_CLASSPATH=""
        local HADOOP_JAVA_LIBRARY_PATH=$(HADOOP_CLASSPATH="$FLUME_CLASSPATH" 
            ${HADOOP_IN_PATH} org.apache.flume.tools.GetJavaProperty 
            java.library.path)
    
        # look for the line that has the desired property value
        # (considering extraneous output from some GC options that write to stdout)
        # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter)
        IFS=$'
    '
        for line in $HADOOP_JAVA_LIBRARY_PATH; do
          if [[ $line =~ ^java.library.path=(.*)$ ]]; then
            HADOOP_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]}
            break
          fi
        done
        unset IFS
    
        if [ -n "${HADOOP_JAVA_LIBRARY_PATH}" ]; then
          FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HADOOP_JAVA_LIBRARY_PATH"
        fi
    
        # determine hadoop classpath
        HADOOP_CLASSPATH=$($HADOOP_IN_PATH classpath)
    
        FLUME_CLASSPATH="$FLUME_CLASSPATH:$HADOOP_CLASSPATH"
      fi
    }
    add_HBASE_paths() {
      local HBASE_IN_PATH=$(PATH="${HBASE_HOME}/bin:$PATH" 
          which hbase 2>/dev/null)
    
      if [ -f "${HBASE_IN_PATH}" ]; then
        info "Including HBASE libraries found via ($HBASE_IN_PATH) for HBASE access"
    
        # determine HBASE java.library.path and use that for flume
        local HBASE_CLASSPATH=""
        local HBASE_JAVA_LIBRARY_PATH=$(HBASE_CLASSPATH="$FLUME_CLASSPATH" 
            ${HBASE_IN_PATH} org.apache.flume.tools.GetJavaProperty 
            java.library.path)
    
        # look for the line that has the desired property value
        # (considering extraneous output from some GC options that write to stdout)
        # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter)
        IFS=$'
    '
        for line in $HBASE_JAVA_LIBRARY_PATH; do
          if [[ $line =~ ^java.library.path=(.*)$ ]]; then
            HBASE_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]}
            break
          fi
        done
        unset IFS
    
        if [ -n "${HBASE_JAVA_LIBRARY_PATH}" ]; then
          FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HBASE_JAVA_LIBRARY_PATH"
        fi
    
        # determine HBASE classpath
        HBASE_CLASSPATH=$($HBASE_IN_PATH classpath)
    
        FLUME_CLASSPATH="$FLUME_CLASSPATH:$HBASE_CLASSPATH"
        FLUME_CLASSPATH="$FLUME_CLASSPATH:$HBASE_HOME/conf"
    
      fi
    }
    
    add_hive_paths(){
      if [ -d "${HIVE_HOME}/lib" ]; then
        info "Including Hive libraries found via ($HIVE_HOME) for Hive access"
        FLUME_CLASSPATH="$FLUME_CLASSPATH:$HIVE_HOME/lib/*"
      fi
      if [ -d "${HCAT_HOME}/share/hcatalog" ]; then
        info "Including HCatalog libraries found via ($HCAT_HOME) for Hive access"
        FLUME_CLASSPATH="$FLUME_CLASSPATH:${HCAT_HOME}/share/hcatalog/*"
      fi
    }
    
    set_LD_LIBRARY_PATH(){
    #Append the FLUME_JAVA_LIBRARY_PATH to whatever the user may have specified in
    #flume-env.sh
      if [ -n "${FLUME_JAVA_LIBRARY_PATH}" ]; then
        export LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${FLUME_JAVA_LIBRARY_PATH}"
      fi
    }
    
    display_help() {
      cat <<EOF
    Usage: $0 <command> [options]...
    
    commands:
      help                      display this help text
      agent                     run a Flume agent
      avro-client               run an avro Flume client
      version                   show Flume version info
    
    global options:
      --conf,-c <conf>          use configs in <conf> directory
      --classpath,-C <cp>       append to the classpath
      --dryrun,-d               do not actually start Flume, just print the command
      --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                                plugins.d section in the user guide for more details.
                                Default: $FLUME_HOME/plugins.d
      -Dproperty=value          sets a Java system property value
      -Xproperty=value          sets a Java -X option
    
    agent options:
      --name,-n <name>          the name of this agent (required)
      --conf-file,-f <file>     specify a config file (required if -z missing)
      --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
      --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
      --no-reload-conf          do not reload config file if changed
      --help,-h                 display help text
    
    avro-client options:
      --rpcProps,-P <file>   RPC client properties file with server connection params
      --host,-H <host>       hostname to which events will be sent
      --port,-p <port>       port of the avro source
      --dirname <dir>        directory to stream to avro source
      --filename,-F <file>   text file to stream to avro source (default: std input)
      --headerFile,-R <file> File containing event headers as key/value pairs on each new line
      --help,-h              display help text
    
      Either --rpcProps or both --host and --port must be specified.
    
    Note that if <conf> directory is specified, then it is always included first
    in the classpath.
    
    EOF
    }
    
    run_flume() {
      local FLUME_APPLICATION_CLASS
    
      if [ "$#" -gt 0 ]; then
        FLUME_APPLICATION_CLASS=$1
        shift
      else
        error "Must specify flume application class" 1
      fi
    
      if [ ${CLEAN_FLAG} -ne 0 ]; then
        set -x
      fi
      $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" 
          -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
    }
    
    ################################
    # main
    ################################
    
    # set default params
    FLUME_CLASSPATH=""
    FLUME_JAVA_LIBRARY_PATH=""
    JAVA_OPTS="-Xmx20m"
    LD_LIBRARY_PATH=""
    
    opt_conf=""
    opt_classpath=""
    opt_plugins_dirs=""
    arr_java_props=()
    arr_java_props_ct=0
    opt_dryrun=""
    
    mode=$1
    shift
    
    case "$mode" in
      help)
        display_help
        exit 0
        ;;
      agent)
        opt_agent=1
        ;;
      node)
        opt_agent=1
        warn "The "node" command is deprecated. Please use "agent" instead."
        ;;
      avro-client)
        opt_avro_client=1
        ;;
      tool)
        opt_tool=1
        ;;
      version)
       opt_version=1
       CLEAN_FLAG=0
       ;;
      *)
        error "Unknown or unspecified command '$mode'"
        echo
        display_help
        exit 1
        ;;
    esac
    
    args=""
    while [ -n "$*" ] ; do
      arg=$1
      shift
    
      case "$arg" in
        --conf|-c)
          [ -n "$1" ] || error "Option --conf requires an argument" 1
          opt_conf=$1
          shift
          ;;
        --classpath|-C)
          [ -n "$1" ] || error "Option --classpath requires an argument" 1
          opt_classpath=$1
          shift
          ;;
        --dryrun|-d)
          opt_dryrun="1"
          ;;
        --plugins-path)
          opt_plugins_dirs=$1
          shift
          ;;
        -agentlib*)
          arr_java_props[arr_java_props_ct]=$arg
          ((++arr_java_props_ct))
          ;;
        -agentpath*)
          arr_java_props[arr_java_props_ct]=$arg
          ((++arr_java_props_ct))
          ;;
        -javaagent*)
          arr_java_props[arr_java_props_ct]=$arg
          ((++arr_java_props_ct))
          ;;
        -D*)
          arr_java_props[arr_java_props_ct]=$arg
          ((++arr_java_props_ct))
          ;;
        -X*)
          arr_java_props[arr_java_props_ct]=$arg
          ((++arr_java_props_ct))
          ;;
        *)
          args="$args $arg"
          ;;
      esac
    done
    
    # make opt_conf absolute
    if [[ -n "$opt_conf" && -d "$opt_conf" ]]; then
      opt_conf=$(cd $opt_conf; pwd)
    fi
    
    # allow users to override the default env vars via conf/flume-env.sh
    if [ -z "$opt_conf" ]; then
      warn "No configuration directory set! Use --conf <dir> to override."
    elif [ -f "$opt_conf/flume-env.sh" ]; then
      info "Sourcing environment configuration script $opt_conf/flume-env.sh"
      source "$opt_conf/flume-env.sh"
    fi
    
    # prepend command-line classpath to env script classpath
    if [ -n "${opt_classpath}" ]; then
      if [ -n "${FLUME_CLASSPATH}" ]; then
        FLUME_CLASSPATH="${opt_classpath}:${FLUME_CLASSPATH}"
      else
        FLUME_CLASSPATH="${opt_classpath}"
      fi
    fi
    
    if [ -z "${FLUME_HOME}" ]; then
      FLUME_HOME=$(cd $(dirname $0)/..; pwd)
    fi
    
    # prepend $FLUME_HOME/lib jars to the specified classpath (if any)
    if [ -n "${FLUME_CLASSPATH}" ] ; then
      FLUME_CLASSPATH="${FLUME_HOME}/lib/*:$FLUME_CLASSPATH"
    else
      FLUME_CLASSPATH="${FLUME_HOME}/lib/*"
    fi
    
    # load plugins.d directories
    PLUGINS_DIRS=""
    if [ -n "${opt_plugins_dirs}" ]; then
      PLUGINS_DIRS=$(sed -e 's/:/ /g' <<<${opt_plugins_dirs})
    else
      PLUGINS_DIRS="${FLUME_HOME}/plugins.d"
    fi
    
    unset plugin_lib plugin_libext plugin_native
    for PLUGINS_DIR in $PLUGINS_DIRS; do
      if [[ -d ${PLUGINS_DIR} ]]; then
        for plugin in ${PLUGINS_DIR}/*; do
          if [[ -d "$plugin/lib" ]]; then
            plugin_lib="${plugin_lib}${plugin_lib+:}${plugin}/lib/*"
          fi
          if [[ -d "$plugin/libext" ]]; then
            plugin_libext="${plugin_libext}${plugin_libext+:}${plugin}/libext/*"
          fi
          if [[ -d "$plugin/native" ]]; then
            plugin_native="${plugin_native}${plugin_native+:}${plugin}/native"
          fi
        done
      fi
    done
    
    if [[ -n "${plugin_lib}" ]]
    then
      FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_lib}"
    fi
    
    if [[ -n "${plugin_libext}" ]]
    then
      FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_libext}"
    fi
    
    if [[ -n "${plugin_native}" ]]
    then
      if [[ -n "${FLUME_JAVA_LIBRARY_PATH}" ]]
      then
        FLUME_JAVA_LIBRARY_PATH="${FLUME_JAVA_LIBRARY_PATH}:${plugin_native}"
      else
        FLUME_JAVA_LIBRARY_PATH="${plugin_native}"
      fi
    fi
    
    # find java
    if [ -z "${JAVA_HOME}" ] ; then
      warn "JAVA_HOME is not set!"
      # Try to use Bigtop to autodetect JAVA_HOME if it's available
      if [ -e /usr/libexec/bigtop-detect-javahome ] ; then
        . /usr/libexec/bigtop-detect-javahome
      elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ] ; then
        . /usr/lib/bigtop-utils/bigtop-detect-javahome
      fi
    
      # Using java from path if bigtop is not installed or couldn't find it
      if [ -z "${JAVA_HOME}" ] ; then
        JAVA_DEFAULT=$(type -p java)
        [ -n "$JAVA_DEFAULT" ] || error "Unable to find java executable. Is it in your PATH?" 1
        JAVA_HOME=$(cd $(dirname $JAVA_DEFAULT)/..; pwd)
      fi
    fi
    
    # look for hadoop libs
    add_hadoop_paths
    add_HBASE_paths
    add_hive_paths
    
    # prepend conf dir to classpath
    if [ -n "$opt_conf" ]; then
      FLUME_CLASSPATH="$opt_conf:$FLUME_CLASSPATH"
    fi
    
    set_LD_LIBRARY_PATH
    # allow dryrun
    EXEC="exec"
    if [ -n "${opt_dryrun}" ]; then
      warn "Dryrun mode enabled (will not actually initiate startup)"
      EXEC="echo"
    fi
    
    # finally, invoke the appropriate command
    if [ -n "$opt_agent" ] ; then
      run_flume $FLUME_AGENT_CLASS $args
    elif [ -n "$opt_avro_client" ] ; then
      run_flume $FLUME_AVRO_CLIENT_CLASS $args
    elif [ -n "${opt_version}" ] ; then
      run_flume $FLUME_VERSION_CLASS $args
    elif [ -n "${opt_tool}" ] ; then
      run_flume $FLUME_TOOLS_CLASS $args
    else
      error "This message should never appear" 1
    fi
    
    exit 0
    

      

    2018-07-20 08:32:44,613 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.
    java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
    	at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:235)
    	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:411)
    	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
    	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 12 more
    2018-07-20 08:33:14,615 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    

      

    HADOOP_PREFIX=/home/hadoop-2.9.1;export HADOOP_PREFIX;HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoop;export HADOOP_CONF_DIR;HADOOP_HOME=/home/hadoop-2.9.1;export HADOOP_HOME;

    [root@hadoop1 apache-flume-1.8.0-bin]# bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1  
    Info: Sourcing environment configuration script /home/apache-flume-1.8.0-bin/conf/flume-env.sh
    Info: Including Hadoop libraries found via (/home/hadoop-2.9.1/bin/hadoop) for HDFS access
    Info: Including Hive libraries found via () for Hive access
    + exec /usr/local/jdk/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/apache-flume-1.8.0-bin/conf:/home/apache-flume-1.8.0-bin/lib/*:/home/hadoop-2.9.1/etc/hadoop:/home/hadoop-2.9.1/share/hadoop/common/lib/*:/home/hadoop-2.9.1/share/hadoop/common/*:/home/hadoop-2.9.1/share/hadoop/hdfs:/home/hadoop-2.9.1/share/hadoop/hdfs/lib/*:/home/hadoop-2.9.1/share/hadoop/hdfs/*:/home/hadoop-2.9.1/share/hadoop/yarn:/home/hadoop-2.9.1/share/hadoop/yarn/lib/*:/home/hadoop-2.9.1/share/hadoop/yarn/*:/home/hadoop-2.9.1/share/hadoop/mapreduce/lib/*:/home/hadoop-2.9.1/share/hadoop/mapreduce/*:/home/hadoop-2.9.1/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/hadoop-2.9.1/lib/native org.apache.flume.node.Application -f conf/flume.conf -n agent1
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/home/apache-flume-1.8.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/home/hadoop-2.9.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    2018-07-20 10:49:26,713 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
    2018-07-20 10:49:26,723 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:79)] Configuration provider started
    2018-07-20 10:49:26,727 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 10:49:26,728 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flume.conf
    2018-07-20 10:49:26,733 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,733 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for HDFS: hdfs.useLocalTimeStamp
    2018-07-20 10:49:26,735 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,738 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,738 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,739 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,739 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,739 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,740 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: HDFS Agent: agent1
    2018-07-20 10:49:26,740 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,741 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,741 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,745 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
    2018-07-20 10:49:26,746 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: agent1
    2018-07-20 10:49:26,749 (conf-file-poller-0) [INFO - org.apache.flume.conf.LogPrivacyUtil.<clinit>(LogPrivacyUtil.java:51)] Logging of configuration details is disabled. To see configuration details in the log run the agent with -Dorg.apache.flume.log.printconfig=true JVM argument. Please note that this is not recommended in production systems as it may leak private information to the logfile.
    2018-07-20 10:49:26,756 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:467)] Created channel ch1
    2018-07-20 10:49:26,764 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: HDFS using HDFS
    2018-07-20 10:49:26,767 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:ch1
    
    2018-07-20 10:49:26,768 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks HDFS
    
    2018-07-20 10:49:26,768 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources avro-source1
    
    2018-07-20 10:49:26,768 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent1]
    2018-07-20 10:49:26,768 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
    2018-07-20 10:49:26,779 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel ch1 type memory
    2018-07-20 10:49:26,783 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel ch1
    2018-07-20 10:49:26,784 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source avro-source1, type avro
    2018-07-20 10:49:26,801 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: HDFS, type: hdfs
    2018-07-20 10:49:26,813 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel ch1 connected to [avro-source1, HDFS]
    2018-07-20 10:49:26,821 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Avro source avro-source1: { bindAddress: hadoop1, port: 41414 } }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@62589e29 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
    2018-07-20 10:49:26,834 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel ch1
    2018-07-20 10:49:26,908 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
    2018-07-20 10:49:26,908 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: ch1 started
    2018-07-20 10:49:26,908 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink HDFS
    2018-07-20 10:49:26,911 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
    2018-07-20 10:49:26,911 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: HDFS started
    2018-07-20 10:49:26,920 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source avro-source1
    2018-07-20 10:49:26,921 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling sink runner starting
    2018-07-20 10:49:26,921 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source avro-source1: { bindAddress: hadoop1, port: 41414 }...
    2018-07-20 10:49:27,264 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean.
    2018-07-20 10:49:27,264 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: avro-source1 started
    2018-07-20 10:49:27,266 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source avro-source1 started.
    2018-07-20 10:49:56,924 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 10:50:26,924 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes

    错误详情

    2018-07-20 10:57:33,800 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 => /192.168.3.101:41414] OPEN
    2018-07-20 10:57:33,801 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 => /192.168.3.101:41414] BOUND: /192.168.3.101:41414
    2018-07-20 10:57:33,801 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 => /192.168.3.101:41414] CONNECTED: /192.168.3.101:45426
    2018-07-20 10:57:34,012 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,032 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,033 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,035 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,036 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,037 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,039 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,040 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,042 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,043 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,044 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,046 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,047 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,049 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,050 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,052 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,054 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,055 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,057 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,059 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
    2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 :> /192.168.3.101:41414] DISCONNECTED
    2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 :> /192.168.3.101:41414] UNBOUND
    2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 :> /192.168.3.101:41414] CLOSED
    2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.3.101:45426 disconnected.
    2018-07-20 10:57:35,527 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
    2018-07-20 10:57:35,552 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:251)] Creating hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp
    2018-07-20 10:57:35,608 (hdfs-HDFS-call-runner-4) [DEBUG - org.apache.flume.sink.hdfs.AbstractHDFSWriter.reflectGetNumCurrentReplicas(AbstractHDFSWriter.java:200)] Using getNumCurrentReplicas--HDFS-826
    2018-07-20 10:57:35,608 (hdfs-HDFS-call-runner-4) [DEBUG - org.apache.flume.sink.hdfs.AbstractHDFSWriter.reflectGetDefaultReplication(AbstractHDFSWriter.java:228)] Using FileSystem.getDefaultReplication(Path) from HADOOP-8014
    2018-07-20 10:57:36,609 (hdfs-HDFS-roll-timer-0) [DEBUG - org.apache.flume.sink.hdfs.BucketWriter$2.call(BucketWriter.java:291)] Rolling file (hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp): Roll scheduled after 1 sec elapsed.
    2018-07-20 10:57:36,624 (Thread-11) [INFO - org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1762)] Exception in createBlockOutputStream
    java.io.IOException: Got error, status=ERROR, status message , ack with firstBadLink as 192.168.3.102:50010
    	at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:118)
    	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1751)
    	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1655)
    	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)
    2018-07-20 10:57:36,625 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1658)] Abandoning BP-1823215470-192.168.3.101-1531885297754:blk_1073741828_1004
    2018-07-20 10:57:36,630 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1663)] Excluding datanode DatanodeInfoWithStorage[192.168.3.102:50010,DS-56002022-a16c-46fa-8b64-a78c65fac104,DISK]
    2018-07-20 10:57:36,651 (Thread-11) [INFO - org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1762)] Exception in createBlockOutputStream
    java.io.IOException: Got error, status=ERROR, status message , ack with firstBadLink as 192.168.3.103:50010
    	at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:118)
    	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1751)
    	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1655)
    	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)
    2018-07-20 10:57:36,652 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1658)] Abandoning BP-1823215470-192.168.3.101-1531885297754:blk_1073741829_1005
    2018-07-20 10:57:36,663 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1663)] Excluding datanode DatanodeInfoWithStorage[192.168.3.103:50010,DS-2b0f3ddc-d575-4400-9fdb-629cd71cd8e9,DISK]
    2018-07-20 10:57:36,688 (hdfs-HDFS-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:393)] Closing hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp
    2018-07-20 10:57:36,699 (hdfs-HDFS-call-runner-7) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:655)] Renaming hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp to hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528
    2018-07-20 10:57:36,705 (hdfs-HDFS-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
    2018-07-20 10:57:56,931 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 10:58:26,931 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 10:58:56,932 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 10:59:26,932 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 10:59:56,933 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 11:00:26,933 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    2018-07-20 11:00:56,934 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
    

      

    下载源码,分析

    <!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-hdfs-sink -->
    <dependency>
    <groupId>org.apache.flume.flume-ng-sinks</groupId>
    <artifactId>flume-hdfs-sink</artifactId>
    <version>1.8.0</version>
    </dependency>



    org.apache.flume.sink.hdfs.BucketWriter
    C:Usersmymy.m2 epositoryorgapacheflumeflume-ng-sinksflume-hdfs-sink1.8.0flume-hdfs-sink-1.8.0.jar!orgapacheflumesinkhdfsBucketWriter.class


    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.apache.flume.sink.hdfs;
    
    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Throwables;
    import java.io.IOException;
    import java.lang.reflect.Method;
    import java.security.PrivilegedExceptionAction;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CancellationException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    import org.apache.flume.Clock;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.SystemClock;
    import org.apache.flume.auth.PrivilegedExecutor;
    import org.apache.flume.instrumentation.SinkCounter;
    import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hdfs.DistributedFileSystem;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    class BucketWriter {
        private static final Logger LOG = LoggerFactory.getLogger(BucketWriter.class);
        private static final Integer staticLock = new Integer(1);
        private Method isClosedMethod;
        private HDFSWriter writer;
        private final long rollInterval;
        private final long rollSize;
        private final long rollCount;
        private final long batchSize;
        private final CompressionCodec codeC;
        private final CompressionType compType;
        private final ScheduledExecutorService timedRollerPool;
        private final PrivilegedExecutor proxyUser;
        private final AtomicLong fileExtensionCounter;
        private long eventCounter;
        private long processSize;
        private FileSystem fileSystem;
        private volatile String filePath;
        private volatile String fileName;
        private volatile String inUsePrefix;
        private volatile String inUseSuffix;
        private volatile String fileSuffix;
        private volatile String bucketPath;
        private volatile String targetPath;
        private volatile long batchCounter;
        private volatile boolean isOpen;
        private volatile boolean isUnderReplicated;
        private volatile int consecutiveUnderReplRotateCount;
        private volatile ScheduledFuture<Void> timedRollFuture;
        private SinkCounter sinkCounter;
        private final int idleTimeout;
        private volatile ScheduledFuture<Void> idleFuture;
        private final WriterCallback onCloseCallback;
        private final String onCloseCallbackPath;
        private final long callTimeout;
        private final ExecutorService callTimeoutPool;
        private final int maxConsecUnderReplRotations;
        private boolean mockFsInjected;
        private final long retryInterval;
        private final int maxRenameTries;
        protected boolean closed;
        AtomicInteger renameTries;
    
        BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries) {
            this(rollInterval, rollSize, rollCount, batchSize, context, filePath, fileName, inUsePrefix, inUseSuffix, fileSuffix, codeC, compType, writer, timedRollerPool, proxyUser, sinkCounter, idleTimeout, onCloseCallback, onCloseCallbackPath, callTimeout, callTimeoutPool, retryInterval, maxCloseTries, new SystemClock());
        }
    
        BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries, Clock clock) {
            this.isClosedMethod = null;
            this.consecutiveUnderReplRotateCount = 0;
            this.maxConsecUnderReplRotations = 30;
            this.mockFsInjected = false;
            this.closed = false;
            this.renameTries = new AtomicInteger(0);
            this.rollInterval = rollInterval;
            this.rollSize = rollSize;
            this.rollCount = rollCount;
            this.batchSize = batchSize;
            this.filePath = filePath;
            this.fileName = fileName;
            this.inUsePrefix = inUsePrefix;
            this.inUseSuffix = inUseSuffix;
            this.fileSuffix = fileSuffix;
            this.codeC = codeC;
            this.compType = compType;
            this.writer = writer;
            this.timedRollerPool = timedRollerPool;
            this.proxyUser = proxyUser;
            this.sinkCounter = sinkCounter;
            this.idleTimeout = idleTimeout;
            this.onCloseCallback = onCloseCallback;
            this.onCloseCallbackPath = onCloseCallbackPath;
            this.callTimeout = callTimeout;
            this.callTimeoutPool = callTimeoutPool;
            this.fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
            this.retryInterval = retryInterval;
            this.maxRenameTries = maxCloseTries;
            this.isOpen = false;
            this.isUnderReplicated = false;
            this.writer.configure(context);
        }
    
        @VisibleForTesting
        void setFileSystem(FileSystem fs) {
            this.fileSystem = fs;
            this.mockFsInjected = true;
        }
    
        @VisibleForTesting
        void setMockStream(HDFSWriter dataWriter) {
            this.writer = dataWriter;
        }
    
        private void resetCounters() {
            this.eventCounter = 0L;
            this.processSize = 0L;
            this.batchCounter = 0L;
        }
    
        private Method getRefIsClosed() {
            try {
                return this.fileSystem.getClass().getMethod("isFileClosed", Path.class);
            } catch (Exception var2) {
                LOG.info("isFileClosed() is not available in the version of the distributed filesystem being used. Flume will not attempt to re-close files if the close fails on the first attempt");
                return null;
            }
        }
    
        private Boolean isFileClosed(FileSystem fs, Path tmpFilePath) throws Exception {
            return (Boolean)((Boolean)this.isClosedMethod.invoke(fs, tmpFilePath));
        }
    
        private void open() throws IOException, InterruptedException {
            if (this.filePath != null && this.writer != null) {
                final Configuration config = new Configuration();
                config.setBoolean("fs.automatic.close", false);
                Integer var2 = staticLock;
                synchronized(staticLock) {
                    checkAndThrowInterruptedException();
    
                    try {
                        long counter = this.fileExtensionCounter.incrementAndGet();
                        String fullFileName = this.fileName + "." + counter;
                        if (this.fileSuffix != null && this.fileSuffix.length() > 0) {
                            fullFileName = fullFileName + this.fileSuffix;
                        } else if (this.codeC != null) {
                            fullFileName = fullFileName + this.codeC.getDefaultExtension();
                        }
    
                        this.bucketPath = this.filePath + "/" + this.inUsePrefix + fullFileName + this.inUseSuffix;
                        this.targetPath = this.filePath + "/" + fullFileName;
                        LOG.info("Creating " + this.bucketPath);
                        this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                            public Void call() throws Exception {
                                if (BucketWriter.this.codeC == null) {
                                    if (!BucketWriter.this.mockFsInjected) {
                                        BucketWriter.this.fileSystem = (new Path(BucketWriter.this.bucketPath)).getFileSystem(config);
                                    }
    
                                    BucketWriter.this.writer.open(BucketWriter.this.bucketPath);
                                } else {
                                    if (!BucketWriter.this.mockFsInjected) {
                                        BucketWriter.this.fileSystem = (new Path(BucketWriter.this.bucketPath)).getFileSystem(config);
                                    }
    
                                    BucketWriter.this.writer.open(BucketWriter.this.bucketPath, BucketWriter.this.codeC, BucketWriter.this.compType);
                                }
    
                                return null;
                            }
                        });
                    } catch (Exception var7) {
                        this.sinkCounter.incrementConnectionFailedCount();
                        if (var7 instanceof IOException) {
                            throw (IOException)var7;
                        }
    
                        throw Throwables.propagate(var7);
                    }
                }
    
                this.isClosedMethod = this.getRefIsClosed();
                this.sinkCounter.incrementConnectionCreatedCount();
                this.resetCounters();
                if (this.rollInterval > 0L) {
                    Callable<Void> action = new Callable<Void>() {
                        public Void call() throws Exception {
                            BucketWriter.LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", BucketWriter.this.bucketPath, BucketWriter.this.rollInterval);
    
                            try {
                                BucketWriter.this.close(true);
                            } catch (Throwable var2) {
                                BucketWriter.LOG.error("Unexpected error", var2);
                            }
    
                            return null;
                        }
                    };
                    this.timedRollFuture = this.timedRollerPool.schedule(action, this.rollInterval, TimeUnit.SECONDS);
                }
    
                this.isOpen = true;
            } else {
                throw new IOException("Invalid file settings");
            }
        }
    
        public synchronized void close() throws IOException, InterruptedException {
            this.close(false);
        }
    
        private BucketWriter.CallRunner<Void> createCloseCallRunner() {
            return new BucketWriter.CallRunner<Void>() {
                private final HDFSWriter localWriter;
    
                {
                    this.localWriter = BucketWriter.this.writer;
                }
    
                public Void call() throws Exception {
                    this.localWriter.close();
                    return null;
                }
            };
        }
    
        private Callable<Void> createScheduledRenameCallable() {
            return new Callable<Void>() {
                private final String path;
                private final String finalPath;
                private FileSystem fs;
                private int renameTries;
    
                {
                    this.path = BucketWriter.this.bucketPath;
                    this.finalPath = BucketWriter.this.targetPath;
                    this.fs = BucketWriter.this.fileSystem;
                    this.renameTries = 1;
                }
    
                public Void call() throws Exception {
                    if (this.renameTries >= BucketWriter.this.maxRenameTries) {
                        BucketWriter.LOG.warn("Unsuccessfully attempted to rename " + this.path + " " + BucketWriter.this.maxRenameTries + " times. File may still be open.");
                        return null;
                    } else {
                        ++this.renameTries;
    
                        try {
                            BucketWriter.this.renameBucket(this.path, this.finalPath, this.fs);
                            return null;
                        } catch (Exception var2) {
                            BucketWriter.LOG.warn("Renaming file: " + this.path + " failed. Will retry again in " + BucketWriter.this.retryInterval + " seconds.", var2);
                            BucketWriter.this.timedRollerPool.schedule(this, BucketWriter.this.retryInterval, TimeUnit.SECONDS);
                            return null;
                        }
                    }
                }
            };
        }
    
        private synchronized void recoverLease() {
            if (this.bucketPath != null && this.fileSystem instanceof DistributedFileSystem) {
                try {
                    LOG.debug("Starting lease recovery for {}", this.bucketPath);
                    ((DistributedFileSystem)this.fileSystem).recoverLease(new Path(this.bucketPath));
                } catch (IOException var2) {
                    LOG.warn("Lease recovery failed for {}", this.bucketPath, var2);
                }
            }
    
        }
    
        public synchronized void close(boolean callCloseCallback) throws IOException, InterruptedException {
            checkAndThrowInterruptedException();
    
            try {
                this.flush();
            } catch (IOException var7) {
                LOG.warn("pre-close flush failed", var7);
            }
    
            LOG.info("Closing {}", this.bucketPath);
            BucketWriter.CallRunner<Void> closeCallRunner = this.createCloseCallRunner();
            if (this.isOpen) {
                try {
                    this.callWithTimeout(closeCallRunner);
                    this.sinkCounter.incrementConnectionClosedCount();
                } catch (IOException var6) {
                    LOG.warn("failed to close() HDFSWriter for file (" + this.bucketPath + "). Exception follows.", var6);
                    this.sinkCounter.incrementConnectionFailedCount();
                    this.recoverLease();
                }
    
                this.isOpen = false;
            } else {
                LOG.info("HDFSWriter is already closed: {}", this.bucketPath);
            }
    
            if (this.timedRollFuture != null && !this.timedRollFuture.isDone()) {
                this.timedRollFuture.cancel(false);
                this.timedRollFuture = null;
            }
    
            if (this.idleFuture != null && !this.idleFuture.isDone()) {
                this.idleFuture.cancel(false);
                this.idleFuture = null;
            }
    
            if (this.bucketPath != null && this.fileSystem != null) {
                try {
                    this.renameBucket(this.bucketPath, this.targetPath, this.fileSystem);
                } catch (Exception var5) {
                    LOG.warn("failed to rename() file (" + this.bucketPath + "). Exception follows.", var5);
                    this.sinkCounter.incrementConnectionFailedCount();
                    Callable<Void> scheduledRename = this.createScheduledRenameCallable();
                    this.timedRollerPool.schedule(scheduledRename, this.retryInterval, TimeUnit.SECONDS);
                }
            }
    
            if (callCloseCallback) {
                this.runCloseAction();
                this.closed = true;
            }
    
        }
    
        public synchronized void flush() throws IOException, InterruptedException {
            checkAndThrowInterruptedException();
            if (!this.isBatchComplete()) {
                this.doFlush();
                if (this.idleTimeout > 0 && (this.idleFuture == null || this.idleFuture.cancel(false))) {
                    Callable<Void> idleAction = new Callable<Void>() {
                        public Void call() throws Exception {
                            BucketWriter.LOG.info("Closing idle bucketWriter {} at {}", BucketWriter.this.bucketPath, System.currentTimeMillis());
                            if (BucketWriter.this.isOpen) {
                                BucketWriter.this.close(true);
                            }
    
                            return null;
                        }
                    };
                    this.idleFuture = this.timedRollerPool.schedule(idleAction, (long)this.idleTimeout, TimeUnit.SECONDS);
                }
            }
    
        }
    
        private void runCloseAction() {
            try {
                if (this.onCloseCallback != null) {
                    this.onCloseCallback.run(this.onCloseCallbackPath);
                }
            } catch (Throwable var2) {
                LOG.error("Unexpected error", var2);
            }
    
        }
    
        private void doFlush() throws IOException, InterruptedException {
            this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                public Void call() throws Exception {
                    BucketWriter.this.writer.sync();
                    return null;
                }
            });
            this.batchCounter = 0L;
        }
    
        public synchronized void append(final Event event) throws IOException, InterruptedException {
            checkAndThrowInterruptedException();
            if (this.idleFuture != null) {
                this.idleFuture.cancel(false);
                if (!this.idleFuture.isDone()) {
                    try {
                        this.idleFuture.get(this.callTimeout, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException var6) {
                        LOG.warn("Timeout while trying to cancel closing of idle file. Idle file close may have failed", var6);
                    } catch (Exception var7) {
                        LOG.warn("Error while trying to cancel closing of idle file. ", var7);
                    }
                }
    
                this.idleFuture = null;
            }
    
            if (!this.isOpen) {
                if (this.closed) {
                    throw new BucketClosedException("This bucket writer was closed and this handle is thus no longer valid");
                }
    
                this.open();
            }
    
            if (this.shouldRotate()) {
                boolean doRotate = true;
                if (this.isUnderReplicated) {
                    if (this.consecutiveUnderReplRotateCount >= 30) {
                        doRotate = false;
                        if (this.consecutiveUnderReplRotateCount == 30) {
                            LOG.error("Hit max consecutive under-replication rotations ({}); will not continue rolling files under this path due to under-replication", 30);
                        }
                    } else {
                        LOG.warn("Block Under-replication detected. Rotating file.");
                    }
    
                    ++this.consecutiveUnderReplRotateCount;
                } else {
                    this.consecutiveUnderReplRotateCount = 0;
                }
    
                if (doRotate) {
                    this.close();
                    this.open();
                }
            }
    
            try {
                this.sinkCounter.incrementEventDrainAttemptCount();
                this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                    public Void call() throws Exception {
                        BucketWriter.this.writer.append(event);
                        return null;
                    }
                });
            } catch (IOException var5) {
                LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + this.bucketPath + ") and rethrowing exception.", var5.getMessage());
    
                try {
                    this.close(true);
                } catch (IOException var4) {
                    LOG.warn("Caught IOException while closing file (" + this.bucketPath + "). Exception follows.", var4);
                }
    
                throw var5;
            }
    
            this.processSize += (long)event.getBody().length;
            ++this.eventCounter;
            ++this.batchCounter;
            if (this.batchCounter == this.batchSize) {
                this.flush();
            }
    
        }
    
        private boolean shouldRotate() {
            boolean doRotate = false;
            if (this.writer.isUnderReplicated()) {
                this.isUnderReplicated = true;
                doRotate = true;
            } else {
                this.isUnderReplicated = false;
            }
    
            if (this.rollCount > 0L && this.rollCount <= this.eventCounter) {
                LOG.debug("rolling: rollCount: {}, events: {}", this.rollCount, this.eventCounter);
                doRotate = true;
            }
    
            if (this.rollSize > 0L && this.rollSize <= this.processSize) {
                LOG.debug("rolling: rollSize: {}, bytes: {}", this.rollSize, this.processSize);
                doRotate = true;
            }
    
            return doRotate;
        }
    
        private void renameBucket(String bucketPath, String targetPath, final FileSystem fs) throws IOException, InterruptedException {
            if (!bucketPath.equals(targetPath)) {
                final Path srcPath = new Path(bucketPath);
                final Path dstPath = new Path(targetPath);
                this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                    public Void call() throws Exception {
                        if (fs.exists(srcPath)) {
                            BucketWriter.LOG.info("Renaming " + srcPath + " to " + dstPath);
                            BucketWriter.this.renameTries.incrementAndGet();
                            fs.rename(srcPath, dstPath);
                        }
    
                        return null;
                    }
                });
            }
        }
    
        public String toString() {
            return "[ " + this.getClass().getSimpleName() + " targetPath = " + this.targetPath + ", bucketPath = " + this.bucketPath + " ]";
        }
    
        private boolean isBatchComplete() {
            return this.batchCounter == 0L;
        }
    
        private static void checkAndThrowInterruptedException() throws InterruptedException {
            Thread.currentThread();
            if (Thread.interrupted()) {
                throw new InterruptedException("Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.");
            }
        }
    
        private <T> T callWithTimeout(final BucketWriter.CallRunner<T> callRunner) throws IOException, InterruptedException {
            Future future = this.callTimeoutPool.submit(new Callable<T>() {
                public T call() throws Exception {
                    return BucketWriter.this.proxyUser.execute(new PrivilegedExceptionAction<T>() {
                        public T run() throws Exception {
                            return callRunner.call();
                        }
                    });
                }
            });
    
            try {
                return this.callTimeout > 0L ? future.get(this.callTimeout, TimeUnit.MILLISECONDS) : future.get();
            } catch (TimeoutException var5) {
                future.cancel(true);
                this.sinkCounter.incrementConnectionFailedCount();
                throw new IOException("Callable timed out after " + this.callTimeout + " ms on file: " + this.bucketPath, var5);
            } catch (ExecutionException var6) {
                this.sinkCounter.incrementConnectionFailedCount();
                Throwable cause = var6.getCause();
                if (cause instanceof IOException) {
                    throw (IOException)cause;
                } else if (cause instanceof InterruptedException) {
                    throw (InterruptedException)cause;
                } else if (cause instanceof RuntimeException) {
                    throw (RuntimeException)cause;
                } else if (cause instanceof Error) {
                    throw (Error)cause;
                } else {
                    throw new RuntimeException(var6);
                }
            } catch (CancellationException var7) {
                throw new InterruptedException("Blocked callable interrupted by rotation event");
            } catch (InterruptedException var8) {
                LOG.warn("Unexpected Exception " + var8.getMessage(), var8);
                throw var8;
            }
        }
    
        private interface CallRunner<T> {
            T call() throws Exception;
        }
    }
    

      

    jps

    [root@hadoop1 hadoop-2.9.1]# jps
    2485 DataNode
    2950 ResourceManager
    2264 NameNode
    4057 Jps
    3052 NodeManager
    2815 SecondaryNameNode
    [root@hadoop1 hadoop-2.9.1]#

    [root@hadoop2 ~]# jps
    29251 DataNode
    31144 Jps
    [root@hadoop2 ~]#

    [root@hadoop3 ~]# jps
    18513 DataNode
    1709 Jps
    [root@hadoop3 ~]#

    jps进程已经开启,但是  


    <!-- Put site-specific property overrides in this file. -->
    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://hadoop1:9001/</value>
    </property>
    <property>
    <name>io.file.buffer.size</name>
    <value>131072</value>
    </property>
    </configuration>

    [root@hadoop2 hadoop-2.9.1]# cat etc/hadoop/core-site.xml

    没有配置hdfs dataNodes

    Apache Hadoop 2.9.1 – Hadoop Cluster Setup http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html

    This section deals with important parameters to be specified in the given configuration files:

    • etc/hadoop/core-site.xml
    ParameterValueNotes
    fs.defaultFS NameNode URI hdfs://host:port/
    io.file.buffer.size 131072 Size of read/write buffer used in SequenceFiles.
    • etc/hadoop/hdfs-site.xml

    • Configurations for NameNode:

    ParameterValueNotes
    dfs.namenode.name.dir Path on the local filesystem where the NameNode stores the namespace and transactions logs persistently. If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy.
    dfs.hosts /dfs.hosts.exclude List of permitted/excluded DataNodes. If necessary, use these files to control the list of allowable datanodes.
    dfs.blocksize 268435456 HDFS blocksize of 256MB for large file-systems.
    dfs.namenode.handler.count 100 More NameNode server threads to handle RPCs from large number of DataNodes.
    • Configurations for DataNode:
    ParameterValueNotes
    dfs.datanode.data.dir Comma separated list of paths on the local filesystem of a DataNode where it should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices.
    <configuration>
      <property>
        <name>dfs.replication</name>
        <value>2</value>
      </property>
      <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///home/hadoop-2.9.1/mydata/datanode</value>
      </property>
      <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///home/hadoop-2.9.1/mydata/namenode</value>
      </property>
      <property>
        <name>dfs.blocksize</name>
        <value>268435456</value>
      </property>
      <property>
        <name>dfs.namenode.handler.count</name>
        <value>100</value>
      </property>  
    </configuration>
    [root@hadoop1 hadoop-2.9.1]# cat  etc/hadoop/hdfs-site.xml 
    

      






  • 相关阅读:
    Android使用SO库时要注意的一些问题
    android studio 生成引用arr
    android studio 改包名
    P2P通信原理与实现(C++)
    unity3d各种OpenFileDialog操作
    使用ffmpeg编码时,如何设置恒定码率,并控制好关键帧I帧间隔
    ffmpeg h264转h265
    照片人脸建模
    自动减面
    Unity3d 5.3.5使用sqlite3
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9339946.html
Copyright © 2011-2022 走看看