zoukankan      html  css  js  c++  java
  • Hadoop之MapReduce(二)序列化,排序及分区

    MapReduce的序列化

      序列化(Serialization)是指把结构化对象转化为字节流。

      反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。

      当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop 自己开发了一套序列化机制( Writable),精简,高效。不用像 java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。

      Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。一个类要支持可序列化只需实现这个接口即可。

    public class BeanDemo implements Writable {
    
        private long id;
        private String desc;
    
        //一定要有无参构造,不然反序列化的时候会报错
        public BeanDemo() {
        }
    
        public BeanDemo(long id, String desc) {
            this.id = id;
            this.desc = desc;
        }
    
        /**
         * 序列化方法
         *
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(id);
            out.writeUTF(desc);
        }
    
    
        /**
         * 反序列化方法
         *
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readLong();
            this.desc = in.readUTF();
        }
    }

    MapReduce的排序

    如果我们需要用某个我们自定义的JavaBean中的某个字段进行结果的排序,那么就需要把这个JavaBean放到key中传输,因为在MapReduce的过程中一定会对key进行排序,而且我们可以自己定义排序的方式,一旦我们需要把JavaBean放到key中传输的话,那么这个JavaBean需要实现Comparable接口的compareTo方法:

    public class BeanDemo implements Writable, Comparable<BeanDemo> {
    
        private long id;
        private String desc;
    
        //一定要有无参构造,不然反序列化的时候会报错
        public BeanDemo() {
        }
    
        public BeanDemo(long id, String desc) {
            this.id = id;
            this.desc = desc;
        }
    
        /**
         * 序列化方法
         *
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(id);
            out.writeUTF(desc);
        }
    
    
        /**
         * 反序列化方法
         *
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readLong();
            this.desc = in.readUTF();
        }
    
        /**
         * @param o
         * @return
         */
        @Override
        public int compareTo(BeanDemo o) {
            //按照id倒序排
            //虽然this.id 比o.id 大  依然返回-1  认为小  由于排序规则谁大谁在后  所以就形成了倒序
            return this.id > o.id ? -1 : 1;
        }
    }

    这样得出的结果就以id倒序排序了。

    MapReduce的分区

    如果有一种需求,需要将结果根据规则分别写到不同的文件中去,那么我们就需要多个reduce来执行,既然需要多个reduce那么就需要多个分区,让每个reduce拉取属于自己分区的数据进行操作和输出

    Mapreduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask。
    默认的分发规则为:根据 key 的 hashcode%reducetask 数来分发;
    所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件 Partitioner,自定义一个 CustomPartitioner 继承抽象类:Partitioner,然后在job 对象中,设置自定义 partitioner: job.setPartitionerClass(CustomPartitioner.class)。

    案例:以 上一篇的简单案例 为基础,在此基础上实现:a开头的写到一个文件中,b开头的写到一个文件中,其他的写到另外一个文件中

    首先编写Partitioner类:

    public class WordPartitioner extends Partitioner<Text, IntWritable> {
    
        public static HashMap<String, Integer> map = new HashMap<String, Integer>();
    
        static {
            map.put("a", 0);
            map.put("b", 1);
        }
    
        @Override
        public int getPartition(Text key, IntWritable intWritable, int numPartitions) {
            //获取每个词的第一个字母 在 map中对应的数字
            Integer code = map.get(key.toString().substring(0, 1));
            if (code != null) {
                return code;
            }
            return 2;
        }
    }

    修改执行类,修改reduce个数和添加自定义分区组件:

    public class WordCountRunner {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            //指定mr采用本地模式运行 本地测试用
            conf.set("mapreduce.framework.name", "local");
    
            //使用job构建本次mr程序
            Job job = Job.getInstance(conf);
    
            //指定本次mr程序运行的主类
            job.setJarByClass(WordCountRunner.class);
    
            //指定本次mr程序的mapper reducer
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //指定本次mr程序map阶段的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //指定本次mr程序reduce阶段的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //设置使用几个Reduce执行 要和下面的WordPartitioner内的分区数相同
            job.setNumReduceTasks(3);
            //设置自定义分区组件
            job.setPartitionerClass(WordPartitioner.class);
    
            //指定本次mr程序处理的数据目录 输出结果的目录
    //        FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
    //        FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
    
            //本地测试用
            FileInputFormat.setInputPaths(job, new Path("D:\wordcount\input"));
            FileOutputFormat.setOutputPath(job, new Path("D:\wordcount\output"));//输出的文件夹不能提前创建 否则会报错
    
            //提交本次mr的job
            //job.submit();
    
            //提交任务 并且追踪打印job的执行情况
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : -1);
        }
    }
  • 相关阅读:
    HashSet源码分析
    Mysql的体系结构和存储引擎
    触发器
    存储过程和函数
    索引
    SpringBoot 中的日志使用
    log4j2
    Logback
    slf4j
    日志门面
  • 原文地址:https://www.cnblogs.com/blazeZzz/p/9681177.html
Copyright © 2011-2022 走看看