zoukankan      html  css  js  c++  java
  • YARN集群的mapreduce测试(二)

    只有mapTask任务没有reduceTask的情况:

    测试准备:

    首先同步时间,然后 开启hdfs集群,开启yarn集群;在本地"/home/hadoop/test/"目录创建user文件;

    user是存了数据的文件,内容如下:

    (导入hadoop-2.7.3-All.jar包

     测试目标:

     让groupId排在数据的第一列;

    测试代码

    outValue = NullWritable.get();//其内部定义决定了只能调用get方法初始化
     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.io.LongWritable;
     6 import org.apache.hadoop.io.NullWritable;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 
    10 public class OnlyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    11     
    12     private Text outKey;
    13     private NullWritable outValue;
    14     
    15     @Override
    16     protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    17             throws IOException, InterruptedException {
    18         outKey = new Text();
    19         outValue = NullWritable.get();
    20     }
    21     
    22     @Override
    23     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    24             throws IOException, InterruptedException {
    25         String[] fields = value.toString().split("\s+");
    26 //        String groupId = fields[2];
    27 //        StringBuilder builder = new StringBuilder();
    28 //        builder.append(groupId).append("	").append(fields[0]).append("	").append(fields[1]);
    29         StringBuilder builder = new StringBuilder(fields[2]).append("	").append(fields[0]).append("	").append(fields[1]);
    30         outKey.set(builder.toString());
    31         context.write(outKey , outValue);
    32     }
    33 
    34     @Override
    35     protected void cleanup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    36             throws IOException, InterruptedException {
    37         outKey = null;
    38     }
    39     
    40 }
    OnlyMapper
     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.IOException;
     4 import java.net.URI;
     5 import java.net.URISyntaxException;
     6 
     7 import org.apache.hadoop.conf.Configuration;
     8 import org.apache.hadoop.fs.FileSystem;
     9 import org.apache.hadoop.fs.Path;
    10 import org.apache.hadoop.io.LongWritable;
    11 import org.apache.hadoop.io.NullWritable;
    12 import org.apache.hadoop.io.Text;
    13 import org.apache.hadoop.mapreduce.Job;
    14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16 
    17 /**
    18  * @author hadoop
    19  *
    20  */
    21 public class OnlyDriver {
    22 
    23     private static FileSystem fs;
    24     private static Configuration conf;
    25     static {
    26         String uri = "hdfs://master01:9000/";
    27         conf = new Configuration();
    28         try {
    29             fs = FileSystem.get(new URI(uri), conf, "hadoop");
    30         } catch (IOException e) {
    31             e.printStackTrace();
    32         } catch (InterruptedException e) {
    33             e.printStackTrace();
    34         } catch (URISyntaxException e) {
    35             e.printStackTrace();
    36         }
    37     }
    38     
    39     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    40 
    41         Job wcJob = getJob(args);
    42         if (null == wcJob) {
    43             return;
    44         }
    45         //提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
    46         boolean flag = false;
    47         flag = wcJob.waitForCompletion(true);
    48         System.exit(flag?0:1);
    49     }
    50     
    51     /**
    52      * 获取Job实例
    53      * @param args
    54      * @return
    55      * @throws IOException
    56      */
    57     public static Job getJob(String[] args) throws IOException {
    58         if (null==args || args.length<2) return null;
    59         Path localPath = new Path("/home/hadoop/test/user");
    60         //放置需要处理的数据所在的HDFS路径
    61         Path inputPath = new Path(args[0]);
    62         //放置Job作业执行完成之后其处理结果的输出路径
    63         Path outputPath = new Path(args[1]);
    64 
    65         //如果输入的集群路径不存在,则创建并且将数据文件拷贝到创建的集群路径
    66         if (!fs.exists(inputPath)) {
    67             fs.mkdirs(inputPath);
    68             fs.copyFromLocalFile(localPath, inputPath);
    69         }
    70         if (fs.exists(outputPath)) {
    71             fs.delete(outputPath, true);//true表示递归删除
    72         }
    73         
    74         //获取Job实例
    75         Job wcJob = Job.getInstance(conf, "OnlyMapperJob");
    76         //设置运行此jar包入口类
    77         //wcJob的入口是WordCountDriver类
    78         wcJob.setJarByClass(OnlyDriver.class);
    79         //设置Job调用的Mapper类
    80         wcJob.setMapperClass(OnlyMapper.class);
    81         
    82         //设置MapTask的输出键类型
    83         wcJob.setMapOutputKeyClass(Text.class);
    84         //设置MapTask的输出值类型
    85         wcJob.setMapOutputValueClass(NullWritable.class);
    86         
    87         //如果没有ReduceTask则下面这条语句必须存在
    88         wcJob.setNumReduceTasks(0);
    89         
    90         //设置整个Job需要处理数据的输入路径
    91         FileInputFormat.setInputPaths(wcJob, inputPath);
    92         //设置整个Job计算结果的输出路径
    93         FileOutputFormat.setOutputPath(wcJob, outputPath);
    94         
    95         return wcJob;
    96     }
    97     
    98 }
    OnlyDriver

    测试结果:

    运行时传入参数是:

    如果在eclipse上运行:传参需要加上集群的master的uri即 hdfs://master01:9000

    输入路径参数:  /data/onlymapper/src

    输出路径参数:  /data/onlymapper/dst

     

    //如果没有ReduceTask则下面这条语句必须存在
    wcJob.setNumReduceTasks(0);

    输出目录中的结果有:

    _SUCCESS(我猜是一种Job运行成功的标志文件,其没有内容)

    part-m-00000(只有mapTask任务时就是m)

    part-r-00000(既有mapTask任务也有reduceTask任务时就是r)

  • 相关阅读:
    how to uninstall devkit
    asp.net中bin目录下的 dll.refresh文件
    查找2个分支的共同父节点
    Three ways to do WCF instance management
    WCF Concurrency (Single, Multiple, and Reentrant) and Throttling
    检查string是否为double
    How to hide TabPage from TabControl
    获取当前系统中的时区
    git svn cygwin_exception
    lodoop打印控制具体解释
  • 原文地址:https://www.cnblogs.com/mmzs/p/8040030.html
Copyright © 2011-2022 走看看