zoukankan      html  css  js  c++  java
  • YARN集群的mapreduce测试(五)

    将user表计算后的结果分区存储

    测试准备:

    首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

    master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

    slave上:   先有DataNode;再有NodeManager;

    如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: 

    hadoop-daemon.sh start datanode
    yarn-daemon.sh start nodemanager

    然后在集群的主机本地环境创建myinfo.txt;内容如下:

    然后将测试文件myinfo.txt上传到集群中:

    测试目标:

    hadoop集群分区及缓存:
    1、分区是必须要经历Shuffle过程的,没有Shuffle过程无法完成分区操作
    2、分区是通过MapTask输出的key来完成的,默认的分区算法是数组求模法:

    数组求模法:
    将Map的输出Key调用hashcode()函数得到的哈希吗(hashcode),此哈希吗是一个数值类型,将此哈希吗数值直接与整数的最大值(Integer.MAXVALUE)取按位与(&)操作,将与操作的结果与ReducerTask
    的数量取余数,将此余数作为当前Key落入的Reduce节点的索引;
    -------------------------
    Integer mod = (Key.hashCode()&Integer.MAXVALUE)%NumReduceTask;
    被除数=34567234
    NumReduceTas=3
    ------结果:
    0、1、2 这三个数作为Reduce节点的索引;
    数组求模法是有HashPartitioner类来实现的,也是MapReduce分区的默认算法;

    测试代码

     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Mapper;
     8 
     9 public class PartitionMapper extends Mapper<LongWritable, Text,LongWritable, Text>{
    10     private LongWritable outKey;
    11     private Text outValue;
    12     
    13 
    14     @Override
    15     protected void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
    16             throws IOException, InterruptedException {
    17         outKey = new LongWritable();
    18         outValue= new Text();
    19     }
    20 
    21     @Override
    22     protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
    23             throws IOException, InterruptedException {
    24         outKey=null;
    25         outValue=null;
    26     }
    27 
    28     @Override
    29     protected void map(LongWritable key, Text value,
    30             Mapper<LongWritable, Text, LongWritable, Text>.Context context)
    31             throws IOException, InterruptedException {
    32         String[] fields=value.toString().split("\s+");
    33         Long userId=Long.parseLong(fields[0]);
    34         outKey.set(userId);
    35         outValue.set(new StringBuilder(fields[1]).append("	").append(fields[2]).toString());
    36         context.write(outKey, outValue);
    37         
    38     }
    39 
    40     
    41     
    42     
    43 
    44 }
    PartitionMapper
     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import org.apache.hadoop.io.LongWritable;
     4 import org.apache.hadoop.mapreduce.Partitioner;
     5 
     6 public class MyPartitoner extends Partitioner {
     7 
     8     @Override
     9     public int getPartition(Object key, Object value, int num) {
    10         LongWritable userId=(LongWritable)key;
    11         Long userCode=userId.get();
    12         //分区的依据
    13         if(userCode<6){
    14             return 0;
    15         }else if(userCode<10){
    16             return 1;
    17         }else{
    18             return 2;
    19         }
    20     }
    21 
    22 }
    MyPartitoner
     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.IOException;
     4 import java.util.Iterator;
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Reducer;
     8 
     9 public class PartitionReducer extends Reducer<LongWritable, Text, LongWritable, Text>{
    10 
    11     @Override
    12     protected void cleanup(Reducer<LongWritable, Text, LongWritable, Text>.Context context)
    13             throws IOException, InterruptedException {
    14         super.cleanup(context);
    15     }
    16 
    17     @Override
    18     protected void reduce(LongWritable key, Iterable<Text> values,
    19             Reducer<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
    20             Iterator<Text> its= values.iterator();
    21             if(its.hasNext()){
    22                 context.write(key, its.next());
    23             }
    24     }
    25 
    26     @Override
    27     protected void setup(Reducer<LongWritable, Text, LongWritable, Text>.Context context)
    28             throws IOException, InterruptedException {
    29         super.setup(context);
    30     }
    31     
    32     
    33 }
    PartitionReducer
     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.IOException;
     4 import java.net.URI;
     5 import java.net.URISyntaxException;
     6 
     7 import org.apache.hadoop.conf.Configuration;
     8 import org.apache.hadoop.fs.FileSystem;
     9 import org.apache.hadoop.fs.Path;
    10 import org.apache.hadoop.io.LongWritable;
    11 import org.apache.hadoop.io.Text;
    12 import org.apache.hadoop.mapreduce.Job;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15 
    16 public class ParititionDriver {
    17 private static FileSystem fs;
    18     
    19     private static Configuration conf;
    20     
    21     static{
    22         String uri="hdfs://master01:9000/";
    23         conf=new Configuration();
    24         try {
    25             fs=FileSystem.get(new URI(uri),conf,"hadoop");
    26         } catch (IOException e) {
    27             // TODO Auto-generated catch block
    28             e.printStackTrace();
    29         } catch (InterruptedException e) {
    30             // TODO Auto-generated catch block
    31             e.printStackTrace();
    32         } catch (URISyntaxException e) {
    33             // TODO Auto-generated catch block
    34             e.printStackTrace();
    35         }
    36     }
    37     
    38     
    39     
    40     public static void main(String[] args) throws Exception{
    41         Job wcJob =getJob(args);
    42         if(null==wcJob)return;
    43                 /*
    44                  *提交Job到集群并等到Job运行完成,参数true表示将Job的运行是状态信息返回到
    45                  *客户端控制台输出,返回值的布尔值代表Job是否运行成功
    46                  */
    47                 boolean flag=wcJob.waitForCompletion(true);
    48                 System.exit(flag?0:1);
    49 
    50     }
    51     public static Job getJob(String[] args) throws Exception{
    52         if(null==args||args.length<2)return null;
    53         //放置需要处理的数据所在的HDFS路径
    54         Path inputPath=new Path(args[0]);
    55         //放置Job作业执行完成之后其处理结果的输出路径
    56         Path ouputPath=new Path(args[1]);
    57         //如果输出目录已经存在则将其删除并重建
    58         if(!fs.exists(inputPath))return null;
    59         if(fs.exists(ouputPath)){
    60             fs.delete(ouputPath,true);
    61         }
    62         //获取Job实例
    63         Job wcJob=Job.getInstance(conf, "PartitionerJob");
    64         //设置运行此jar包的入口类
    65         wcJob.setJarByClass(ParititionDriver.class);
    66         //设置job调用的Mapper类
    67         wcJob.setMapperClass(PartitionMapper.class);
    68         //设置job调用的Reducer类(如果一个job没有ReduceTask则此条语句可以不掉用)
    69         wcJob.setReducerClass(PartitionReducer.class);
    70         //设置MapTask的输出值类型
    71         wcJob.setMapOutputKeyClass(LongWritable.class);
    72         //设置MapTask的输出键类型
    73         wcJob.setMapOutputValueClass(Text.class);
    74         //设置整个Job的输出键类型
    75         wcJob.setOutputKeyClass(LongWritable.class);
    76         //设置整个Job的输出值类型
    77         wcJob.setOutputValueClass(Text.class);
    78         
    79         //设置分区类
    80         wcJob.setPartitionerClass(MyPartitoner.class);
    81         wcJob.setNumReduceTasks(3);//这个数字和MyPartitioner类中的三种分区依据相对应
    82         //如果将数字调整大了,那么只有分区依据的前三个文件有内容,多出任务对应的仅仅是个空分区、空文件;
    83         //如果将数字调整小了,那么将得不到任何一个分区结果
    84         
    85         //设置整个Job需要处理数据的输入路径
    86         FileInputFormat.setInputPaths(wcJob, inputPath);
    87         //设置整个Job需要计算结果的输出路径
    88         FileOutputFormat.setOutputPath(wcJob, ouputPath);
    89         return wcJob;
    90     }
    91     
    92 
    93 }
    94 
    95 ParititionDriver
    ParititionDriver

      

    测试结果:

    运行时传入参数是:

    如果在客户端eclipse上运行:传参需要加上集群的master的uri即 hdfs://master01:9000

    输入路径参数:  /data/partition/src

    输出路径参数:  /data/partition/dst

    深入测试:(修改PartitionDriver类的如下代码)

    //设置分区类
    wcJob.setPartitionerClass(MyPartitoner.class);
    wcJob.setNumReduceTasks(3);//这个数字和MyPartitioner类中的三种分区依据相对应

    //如果将数字调整大了(比如调整为4),那么只有分区依据的前三个文件有内容,多出任务对应的仅仅是个空分区、空文件;

    //如果将数字调整小了(比如调整为2),那么将得不到任何一个分区结果

     

    小结:

    1、数据量需要达到一定的数量级使用hadoop集群来处理才是划算的
    2、集群的计算性能取决于任务数量的多少,设置任务数量必须充分考虑到集群的计算能力(比如:物理节点数量);
    a、Map设置的任务数量作为最小值参考
    b、Reduce的任务数默认是1(使用的也是默认的Partitioner类),如果设置了则启动设置的数量;
    不管MapTask还是ReduceTask,只要任务数量越多则并发能力越强,处理效率会在一定程度上越高,但是设置的任务数量必须参考集群中的物理节点数量,如果设置的任务数量过多,会导致每个物理节点上分摊的任务数量越多,处理器并发每一个任务产生的计算开销越大,任务之间因处理负载导致相互之间的影响非常大,任务失败率上升(任务失败时会重新请求进行计算,最多重新请求3次),计算性能反而下降,因此在设计MapTask与ReduceTask任务数量时必须权衡利弊,折中考虑...

  • 相关阅读:
    一本通1018
    并查集&MST
    hdu 1875 畅通工程再续
    hdu 1811 Rank of Tetris(拓扑排序+并查集)
    hdu 1325 is it a tree?
    hdu1285拓扑排序
    hdu2063 过山车(最大二分匹配)
    最小生成树二·Kruscal算法
    hiho一下 第二十一周(线段树 离散化)
    hiho一下 第二十周(线段树模板)
  • 原文地址:https://www.cnblogs.com/mmzs/p/8059991.html
Copyright © 2011-2022 走看看