zoukankan      html  css  js  c++  java
  • hadoop学习笔记--找到执行hadoop的入口

    参与个hadoop项目,之前没搞过,赶紧学习:

    照葫芦画瓢,得到代码是hdfs2local.sh脚本和LiaoNingFilter.jar包,迫不及待用jd-gui打开jar包,搜索到main(在MANIFEST.MF中没有找到main,只能search,其实在hdfs2local.sh脚本中写明了main所在的package)。

      1 package cn.com.dtmobile.hadoop.biz.LiaoNingFilter.job;
      2 
      3 import cn.com.dtmobile.hadoop.biz.LiaoNingFilter.mr.DataMap;
      4 import cn.com.dtmobile.hadoop.biz.LiaoNingFilter.mr.DataReduce;
      5 import java.net.URI;
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.filecache.DistributedCache;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.NullWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.io.compress.CompressionCodec;
     13 import org.apache.hadoop.io.compress.GzipCodec;
     14 import org.apache.hadoop.mapreduce.Job;
     15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
     18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     19 import org.apache.hadoop.util.GenericOptionsParser;
     20 import org.apache.hadoop.util.Tool;
     21 import org.apache.hadoop.util.ToolRunner;
     22 
     23 public class DataJob
     24   extends Configured
     25   implements Tool
     26 {
     27   public int run(String[] args)
     28     throws Exception
     29   {
     30     Configuration conf = getConf();
     31     conf.setBoolean("mapred.output.compress", true);
     32     conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
     33     
     34     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     35     
     36     String sysHour = otherArgs[10];
     37     
     38     DistributedCache.addCacheFile(new URI(otherArgs[9]), conf);
     39     Job job = new Job(conf, "LiaoNingFilter");
     40     job.getConfiguration().setStrings("sys_hour", new String[] { sysHour });
     41     job.setJarByClass(DataJob.class);
     42     job.setMapperClass(DataMap.class);
     43     job.setReducerClass(DataReduce.class);
     44     job.setNumReduceTasks(100);
     45     
     46     job.setMapOutputKeyClass(Text.class);
     47     job.setMapOutputValueClass(Text.class);
     48     
     49     job.setOutputKeyClass(NullWritable.class);
     50     job.setOutputValueClass(Text.class);
     51     
     52     MultipleOutputs.addNamedOutput(job, "volterx", TextOutputFormat.class, NullWritable.class, Text.class);
     53     
     54     MultipleOutputs.addNamedOutput(job, "s1mmeorgn", TextOutputFormat.class, NullWritable.class, Text.class);
     55     
     56     MultipleOutputs.addNamedOutput(job, "voltesv", TextOutputFormat.class, NullWritable.class, Text.class);
     57     
     58     MultipleOutputs.addNamedOutput(job, "sgsorgn", TextOutputFormat.class, NullWritable.class, Text.class);
     59     
     60     MultipleOutputs.addNamedOutput(job, "s1uhttporgn", TextOutputFormat.class, NullWritable.class, Text.class);
     61     
     62     MultipleOutputs.addNamedOutput(job, "volteorgn", TextOutputFormat.class, NullWritable.class, Text.class);
     63     
     64     MultipleOutputs.addNamedOutput(job, "sharevolterx", TextOutputFormat.class, NullWritable.class, Text.class);
     65     
     66     MultipleOutputs.addNamedOutput(job, "shares1mme", TextOutputFormat.class, NullWritable.class, Text.class);
     67     
     68     MultipleOutputs.addNamedOutput(job, "sharehttp", TextOutputFormat.class, NullWritable.class, Text.class);
     69     
     70     MultipleOutputs.addNamedOutput(job, "sharevolteorgn", TextOutputFormat.class, NullWritable.class, Text.class);
     71     
     72     MultipleOutputs.addNamedOutput(job, "voltemossession", TextOutputFormat.class, NullWritable.class, Text.class);
     73     
     74     MultipleOutputs.addNamedOutput(job, "voltemosslice", TextOutputFormat.class, NullWritable.class, Text.class);
     75     
     76     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     77     
     78     FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
     79     
     80     FileInputFormat.addInputPath(job, new Path(otherArgs[2]));
     81     
     82     FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
     83     
     84     FileInputFormat.addInputPath(job, new Path(otherArgs[4]));
     85     
     86     FileInputFormat.addInputPath(job, new Path(otherArgs[5]));
     87     
     88     FileInputFormat.addInputPath(job, new Path(otherArgs[6]));
     89     
     90     FileInputFormat.addInputPath(job, new Path(otherArgs[7]));
     91     
     92     FileOutputFormat.setOutputPath(job, new Path(otherArgs[8]));
     93     
     94     return job.waitForCompletion(true) ? 0 : 1;
     95   }
     96   
     97   public static void main(String[] args)
     98     throws Exception
     99   {
    100     try
    101     {
    102       int returnCode = ToolRunner.run(new DataJob(), args);
    103       System.exit(returnCode);
    104     }
    105     catch (Exception e)
    106     {
    107       e.printStackTrace();
    108     }
    109   }

    class DataJob76~92行:

    这是传递参数给hadoop,但args到底是什么,otherArgs从run(String[] args)来的,run(String[] args)从main(String[] args)来的,main(String[] args)需要回到调用jar包的hdfs2local.sh脚本中找:

      1  #!/bin/bash
      2 # 
      3  export HADOOP_CONF_DIR=/etc/hadoop/conf
      4  
      5 #ANALY_DATE=20180619
      6 #ANALY_HOUR=13
      7 ANALY_DATE=`date +%Y%m%d`
      8 ANALY_HOUR="`date -d ' -1 hour' +%H`"
      9 CUR_DATE=`date +%Y%m%d`
     10 CUR_HOUR="`date +%H`"
     11 #CUR_DATE=20180619
     12 #CUR_HOUR=13
     13 if [ $CUR_HOUR = 00 ]
     14 then
     15    ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
     16 else
     17    ANALY_DATE=$CUR_DATE
     18 fi
     19  mypath="$(cd "$(dirname "$0")"; pwd)"
     20  cd $mypath
     21  #SOURCE_SVR="hdfs://nameservice1:8020/datang"
     22  SOURCE_SVR="hdfs://nameservice1:8020/ws/detail"
     23  
     24  JAR_PACKAGK="/cup/d6/datang/bin/LiaoNingFilter.jar"
     25  JAR_MAIN="cn.com.dtmobile.hadoop.biz.LiaoNingFilter.job.DataJob"
     26  
     27  HIVE_TABLES="s1mme_orgn sgs_orgn volte_rx volte_sv s1u_http_orgn volte_mos_session volte_mos_slice"
     28  
     29  GX=${SOURCE_SVR}/volte_rx/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
     30  S1=${SOURCE_SVR}/s1mme_orgn/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.txt
     31  SV=${SOURCE_SVR}/volte_sv/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
     32  SGS=${SOURCE_SVR}/sgs_orgn/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
     33  MW=${SOURCE_SVR}/volte_sip/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
     34  HTTP=${SOURCE_SVR}/s1u_http_orgn/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.txt
     35  SESSION=${SOURCE_SVR}/volte_mos_session/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
     36  SLICE=${SOURCE_SVR}/volte_mos_slice/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
     37  OUTPUT=/datang/FILTER/${ANALY_DATE}${ANALY_HOUR}
     38  NODATA=/datang/nodata
     39  hdfs dfs -test -e ${GX}
     40  if [ $? -ne 0 ];then
     41  GX=${NODATA}
     42  fi
     43  hdfs dfs -test -e ${S1}
     44  if [ $? -ne 0 ];then
     45  S1=${NODATA}
     46  fi
     47  hdfs dfs -test -e ${SV}
     48  if [ $? -ne 0 ];then
     49  SV=${NODATA}
     50  fi
     51  hdfs dfs -test -e ${SGS}
     52  if [ $? -ne 0 ]; then
     53  SGS=${NODATA}
     54  fi
     55  hdfs dfs -test -e ${MW}
     56  if [ $? -ne 0 ]; then
     57  MW=${NODATA}
     58  fi
     59  hdfs dfs -test -e ${HTTP}
     60  if [ $? -ne 0 ]; then
     61  HTTP=${NODATA}
     62  fi
     63  hdfs dfs -test -e ${SESSION}
     64  if [ $? -ne 0 ];then
     65  SESSION=${NODATA}
     66  fi
     67  hdfs dfs -test -e ${SLICE}
     68  if [ $? -ne 0 ];then
     69  SLICE=${NODATA}
     70  fi
     71 
     72  T_PROCESS=/datang/t_process/t_process.csv
     73  echo "`date '+/%y/%m/%d %H:%M:%S'` INFO begni,exit..."
     74  HADOOP=`which hadoop`
     75  #${HADOOP} fs -rm -R ${OUTPUT}
     76  ${HADOOP} jar ${JAR_PACKAGK} ${JAR_MAIN} 
     77  ${GX} 
     78  ${S1} 
     79  ${SV} 
     80  ${SGS} 
     81  ${MW} 
     82  ${HTTP} 
     83  ${SESSION} 
     84  ${SLICE} 
     85  ${OUTPUT} 
     86  ${T_PROCESS} 
     87  ${ANALY_HOUR}
     88  
     89  #exit
     90  
     91  TEMP_DIR=/cup/d6/datang/TEMP
     92  echo "get data begin!------------------------------------------"
     93  
     94  for tableName in ${HIVE_TABLES}
     95   do
     96   
     97   file_name=`echo ${tableName}|sed 's/_//g'`
     98   
     99   rm -rf ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR/*
    100   mkdir -p ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR
    101   
    102   hdfs dfs -get ${OUTPUT}/${file_name}* ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR/
    103   echo "hdfs dfs -get ${OUTPUT}/${file_name}* ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR/"
    104  done
    105  
    106  orgn=volteorgn
    107  rm -rf $TEMP_DIR/volte_orgn/$ANALY_DATE/$ANALY_HOUR/*
    108  mkdir -p ${TEMP_DIR}/volte_orgn/$ANALY_DATE/$ANALY_HOUR
    109  hdfs dfs -get ${OUTPUT}/${orgn}* ${TEMP_DIR}/volte_orgn/$ANALY_DATE/$ANALY_HOUR
    110  rm -rf $TEMP_DIR/share_volte_orgn/$ANALY_DATE/$ANALY_HOUR/*
    111  mkdir -p ${TEMP_DIR}/share_volte_orgn/$ANALY_DATE/$ANALY_HOUR
    112  hdfs dfs -get ${OUTPUT}/sharevolteorgn* ${TEMP_DIR}/share_volte_orgn/$ANALY_DATE/$ANALY_HOUR
    113  
    114  rm -rf $TEMP_DIR/share_s1mme/$ANALY_DATE/$ANALY_HOUR/*
    115  mkdir -p ${TEMP_DIR}/share_s1mme/$ANALY_DATE/$ANALY_HOUR
    116  hdfs dfs -get ${OUTPUT}/shares1mme* ${TEMP_DIR}/share_s1mme/$ANALY_DATE/$ANALY_HOUR
    117 
    118  rm -rf $TEMP_DIR/share_volterx/$ANALY_DATE/$ANALY_HOUR/*
    119  mkdir -p ${TEMP_DIR}/share_volterx/$ANALY_DATE/$ANALY_HOUR
    120  hdfs dfs -get ${OUTPUT}/sharevolterx* ${TEMP_DIR}/share_volterx/$ANALY_DATE/$ANALY_HOUR
    121 
    122  DEL_HOUR="`date -d ' -6 hour' +%H`"
    123  if [ $ANALY_HOUR = 00 ]
    124  then
    125     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
    126  elif [ $ANALY_HOUR = 01 ]; then
    127     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
    128  elif [ $ANALY_HOUR = 02 ]; then
    129     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
    130  elif [ $ANALY_HOUR = 03 ]; then
    131     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
    132  elif [ $ANALY_HOUR = 04 ]; then
    133     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
    134  elif [ $ANALY_HOUR = 23 ]; then
    135     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
    136  else
    137     ANALY_DATE=$1
    138  fi
    139  hdfs dfs -rm -R -skipTrash /datang/FILTER/${ANALY_DATE}${DEL_HOUR}
    140  echo "get data end!------------------------------------------"
    141  echo "`date '+/%y/%m/%d %H:%M:%S'` INFO done ,exit..."
    142  exit
    143  

    hdfs2local.sh脚本76~87行:

    这是hadoop命令调用jar包的命令,格式为:

    jar

    运行jar文件。用户可以把他们的Map Reduce代码捆绑到jar文件中,使用这个命令执行。

    用法:hadoop jar <jar> [mainClass] args...

    http://hadoop.apache.org/docs/r1.0.4/cn/commands_manual.html

    问题来了,取到的参数是哪些?

    ${HADOOP} jar ${JAR_PACKAGK} ${JAR_MAIN}  ${GX}  ${S1}  ${SV}  ${SGS}  ${MW}  ${HTTP}  ${SESSION}  ${SLICE}  ${OUTPUT}  ${T_PROCESS}  ${ANALY_HOUR}

    按理说${HADOOP} jar后面的都是参数,但是数量与class DataJob76~92行的参数数量对不上,问题出在哪?分析GenericOptionsParser类有蹊跷:

    34     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

     找到官方文档:http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html

     String[] getRemainingArgs() 
              Returns an array of Strings containing only application-specific arguments.
    还需要说明的是,hadoop的参数分两类:通用参数和命令参数
    bin/hadoop command [genericOptions] [commandOptions]

    通用参数是:

    常规选项

    下面的选项被 dfsadminfsfsck和 job支持。 应用程序要实现 Tool来支持 常规选项

    GENERIC_OPTION描述
    -conf <configuration file> 指定应用程序的配置文件。
    -D <property=value> 为指定property指定值value。
    -fs <local|namenode:port> 指定namenode。
    -jt <local|jobtracker:port> 指定job tracker。只适用于job
    -files <逗号分隔的文件列表> 指定要拷贝到map reduce集群的文件的逗号分隔的列表。 只适用于job
    -libjars <逗号分隔的jar列表> 指定要包含到classpath中的jar文件的逗号分隔的列表。 只适用于job
    -archives <逗号分隔的archive列表> 指定要被解压到计算节点上的档案文件的逗号分割的列表。 只适用于job





    除了generic options 通用参数以外的,都是命令的参数(或者叫application-specific arguments命令专属参数)

    所以执行代码
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()后,otherArgs 中剔除了${JAR_PACKAGK} ${JAR_MAIN},只保留剩余的参数:
    ${GX}  ${S1}  ${SV}  ${SGS}  ${MW}  ${HTTP}  ${SESSION}  ${SLICE}  ${OUTPUT}  ${T_PROCESS}  ${ANALY_HOUR}

    这就与class DataJob76~92行对应上了。


    相关参考材料:

    Hadoop 常见指令

    https://blog.csdn.net/u011414200/article/details/50465433


    MapReduce处理输出多文件格式(MultipleOutputs)

    https://blog.csdn.net/shujuboke/article/details/77199840

    Hadoop系列之ToolRunner与GenericOptionsParser用法

    https://blog.csdn.net/u011734144/article/details/60769745

    Class GenericOptionsParser

    http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html

    Hadoop 0.18 命令手册

    http://hadoop.apache.org/docs/r1.0.4/cn/commands_manual.html

     
  • 相关阅读:
    java:选择排序
    java:快速排序
    MYSQL 5.7版本修改密码
    多级代理下获取客户端真实IP
    map相关操作:map遍历,map转换为list
    【导航】微信开发者相关网址导航
    【java】微信开发后台官方后台配置篇
    HTML页面加载loading
    表单JS提交失效
    后台request获得所有参数(当你不知道它包含什么参数)
  • 原文地址:https://www.cnblogs.com/wangziyi0513/p/10529493.html
Copyright © 2011-2022 走看看