zoukankan      html  css  js  c++  java
  • MapReduce Java API-使用Partitioner实现输出到多个文件

    场景

    MapReduce Java API-多输入路径方式:

    https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119453275

    在上面的基础上,怎样用Partitioner的方式实现将学生的成绩数据

    分段输出到不同的文件。

    例如分为三个成绩段:

    小于60分

    大于等于60分小于等于80分

    大于80分

    Partitioner

    1、Partion发生在Map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,

    每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass设置的key

    比较函数类排序。

    2、 Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一个分组的数据交给同一个Reducer处理,

    它直接影响Reducer阶段的复杂均衡。

    3、Partitioner创建流程

    ① 先分析一下具体的业务逻辑,确定大概有多少个分区
    ② 首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类
    ③ 重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字
    ④ 在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);
    ⑤ 设置Reducer的数量,job.setNumReduceTasks(6);

    注:

    博客:
    https://blog.csdn.net/badao_liumang_qizhi
    关注公众号
    霸道的程序猿
    获取编程相关电子书、教程推送与免费下载。

    实现

    1、首先新建数据集score.txt,用来进行分段输出。

    1、自定义分区函数类

    通过成绩判断,用return的值为0、1、2代表三个分区。

    package com.badao.muloutput;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class StudentPartitioner extends Partitioner<IntWritable, Text> {
        @Override
        public int getPartition(IntWritable intWritable, Text text, int i) {
            //学生成绩
            int scoreInt = intWritable.get();
            //默认指定分区0
            if(i==0)
            {
                return 0;
            }
            if(scoreInt < 60)
            {
                return 0;
            }else if(scoreInt<=80)
            {
                return 1;
            }else
            {
                return 2;
            }
        }
    }

    3、定义Mapper类

    package com.badao.muloutput;
    
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    
    
    public class MulOutputMapper extends Mapper<LongWritable,Text,IntWritable,Text> {
    
    
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] studentArr = value.toString().split(" ");
            if(StringUtils.isNotBlank(studentArr[1]))
            {
                IntWritable pKey = new IntWritable(Integer.parseInt(studentArr[1].trim()));
                context.write(pKey,value);
            }
        }
    }

    4、定义Reduce类

    package com.badao.muloutput;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    
    public class MulOutputReducer extends Reducer<IntWritable,Text,NullWritable,Text> {
    
        @Override
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for(Text value:values)
            {
                context.write(NullWritable.get(),value);
            }
        }
    }

    5、新建Job类

    package com.badao.muloutput;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
    
    import java.io.IOException;
    
    public class MulOutputJob {
        public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
            wordCountLocal();
        }
    
        public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException
        {
            Configuration conf = new Configuration();
            System.setProperty("HADOOP_USER_NAME","root");
            conf.set("fs.defaultFS","hdfs://192.168.148.128:9000");
            //实例化一个作业,word count是作业的名字
            Job job = Job.getInstance(conf, "muloutput");
    
            //指定通过哪个类找到对应的jar包
            job.setJarByClass(MulOutputJob.class);
    
            //为job设置Mapper类
            job.setMapperClass(MulOutputMapper.class);
            //为job设置reduce类
            job.setReducerClass(MulOutputReducer.class);
            //设置Partitioner类
            job.setPartitionerClass(StudentPartitioner.class);
            //设置reduce的个数为3
            job.setNumReduceTasks(3);
    
            //mapper输出格式
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            //reduce输出格式
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
    
            //为job设置输入路径,输入路径是存在的文件夹/文件
            FileInputFormat.addInputPath(job,new Path("/score.txt"));
            //为job设置输出路径
            FileOutputFormat.setOutputPath(job,new Path("/muloutput8"));
            job.waitForCompletion(true);
        }
    
    
    }

    6、将数据集上传到HDFS指定的目录下,运行job查看输出结果

    注意事项

    这里要注意坑点,因为这里在分解数据时是按照一个空格来拆分的,所以数据集中

    每个key和value之间只能有一个空格。

    并且不要再数据集的最后面添加多余的换行,不然会导致不能正常输出数据。

    比如这里查看数据时发现多了个换行

    然后找不到不出统计数据的原因,就在代码中将每步的结果输出下

    如果是上面多了换行的话,下面输出key-value时就会有异常数据,都跟上面这样是正常的。 

    博客园: https://www.cnblogs.com/badaoliumangqizhi/ 关注公众号 霸道的程序猿 获取编程相关电子书、教程推送与免费下载。
  • 相关阅读:
    Java——多线程之方法详解
    Java——深入理解Java异常体系
    Java——Java代码的执行顺序
    Java——抽象类与接口的前世今生
    存储系列之 LUN 和 LVM
    存储系列之 RAID技术原理简介
    存储系列之 硬盘接口与SCSI总线协议
    存储系列之 介质(软盘、硬盘、固态)和磁盘寻址
    Redis设计原理简介
    MySQL InnoDB索引介绍以及在线添加索引实例分析
  • 原文地址:https://www.cnblogs.com/badaoliumangqizhi/p/15110064.html
Copyright © 2011-2022 走看看