zoukankan      html  css  js  c++  java
  • 分区(partition)和全排序(WritableComparable)

    一、概念

      1、分区:

        Hadoop默认分区是根据key的hashCode对ReduceTask个数取模得到的,用户无法控制哪个key存储到哪个分区。

       想要控制哪个key存储到哪个分区,需要自定义类继承Partitioner<KEY, VALUE>,
       泛型
    KEY, VALUE分别对应Mapper里的输出key,value,因为分区是在map()之后,环形缓冲区溢写时完成的。
        提示:如果ReduceTask的数量大于自定义类中重写的
    getPartition()设置的分区数时,会产生空的输出文件part-r-00000
           如果ReduceTask的数量小于自定义类中重写的getPartition()设置的分区数时,有一部分分区数据无处安放,就会报错
           如果ReduceTask的数量等于1,则不会走自定义的分区方法,系统默认分区就是1,最终只会输出一个分区文件
           分区号必须从0开始,逐一增加

      2、全排序:
        
    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,
        因为一台机器
    处理所有文件,完全丧失了MapReduce所提供的并行架构。
    二、项目举例
      1、待处理文本
      
          data.txt
       2、需求:
        分别统计出各市感染人员信息,输出到对应文件中(说明:武汉的人员信息统一输出到一个文件,十堰的人员信息统一输出到一个文件),
        输出结果按照感染人员的年龄做倒叙排列。
    输出结果举例:
        地区 姓名 年龄 性别  
       
    武汉 张三 70 女

        武汉 李四 50 男
          武汉  王五   60   女 
          武汉  赵六   55   男
      3、
    Person2Bean.java    
          
    package com.jh.work8;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class Person2Bean implements WritableComparable<Person2Bean> {
        private String area; // 感染地区
        private String name; // 感染姓名
        private Integer age; // 感染年龄
        private String sex; // 感染性别
    
        public Person2Bean() {
        }
    
        @Override
        public String toString() {
            return area + "	" + name + "	" + age + "	" + sex;
        }
    
        public String getArea() {
            return area;
        }
    
        public void setArea(String area) {
            this.area = area;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Integer getAge() {
            return age;
        }
    
        public void setAge(Integer age) {
            this.age = age;
        }
    
        public String getSex() {
            return sex;
        }
    
        public void setSex(String sex) {
            this.sex = sex;
        }
    
        // 排序
        @Override
        public int compareTo(Person2Bean o) {
            // 按感染年龄倒序排序(这里主要是区内排序)
            return o.getAge().compareTo(this.age);
        }
    
        // 序列化
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(area);
            out.writeUTF(name);
            out.writeUTF(sex);
            out.writeInt(age);
        }
    
        // 反序列化
        @Override
        public void readFields(DataInput in) throws IOException {
            area = in.readUTF();
            name = in.readUTF();
            sex = in.readUTF();
            age = in.readInt();
        }
    }
    Person2Bean
      4、Person2Mapper.java
          
    package com.jh.work8;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class Person2Mapper extends Mapper<LongWritable,Text,Person2Bean,NullWritable> {
        private Person2Bean bean = new Person2Bean();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 取文本每行内容,并分割
            String[] split = value.toString().split("	");
    
            // 赋值
            bean.setArea(split[0]);
            bean.setAge(Integer.parseInt(split[1]));
            bean.setSex(split[2]);
            bean.setName(split[3]);
            
            context.write(bean,NullWritable.get());
        }
    }
    Person2Mapper
    
    
    
      5、PersonPartition.java
          
    package com.jh.work8;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class PersonPartition extends Partitioner<Person2Bean,NullWritable> {
        /*
            自定义分区,继承Partitioner
            泛型对应Mapper端的输出
         */
        @Override
        public int getPartition(Person2Bean person2Bean, NullWritable nullWritable, int numPartitions) {
            // 根据感染地区做三个分区
            switch (person2Bean.getArea()){
                case "武汉市":
                    return 0;
                case "黄石市":
                    return 1;
                default:
                    return 2;
            }
        }
    }
    PersonPartition
    
    
    
      6、Person2Reduce.java
          
    package com.jh.work8;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class Person2Reduce extends Reducer<Person2Bean,NullWritable,Person2Bean,NullWritable> {
        @Override
        protected void reduce(Person2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            // 输出所有
            for (NullWritable value : values) {
                context.write(key,NullWritable.get());
            }
        }
    }
    Person2Reduce
    
    
    
      7、Person2Driver.java
          
    package com.jh.work8;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class Person2Driver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 获取配置文件和job对象
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 设置jar的路径
            job.setJarByClass(Person2Driver.class);
    
            // 设置mapper类和reducer类
            job.setMapperClass(Person2Mapper.class);
            job.setReducerClass(Person2Reduce.class);
    
            // 设置mapper输出的key和value的数据类型
            job.setMapOutputKeyClass(Person2Bean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            // 设置reduce输出的key和value的数据类型
            job.setOutputKeyClass(Person2Bean.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 设置自定义分区的类
            job.setPartitionerClass(PersonPartition.class);
            // 设置ReduceTask的数量
            job.setNumReduceTasks(3);
    
            // 设置要处理文件的输入路径
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            // 设置计算完毕后的数据文件的输出路径
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            // 提交计算任务(job)
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 0:1);
        }
    }
    Person2Driver
    
    
    
      8、最终输出为三个文件:
            
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    






     



     
     
    
    
    
    
     
    
    
    
    
    
    
    
    
  • 相关阅读:
    关于使用Java Mail进行邮件发送,抛出Could not connect to SMTP host: xx@xxx.com, port: 25的异常可能
    百度地图和solr展示资源和附近等功能的实现 四
    Python爬虫入门-3
    Python爬虫入门-2
    Python爬虫入门-1
    Python装饰器专题-限制函数调用次数(10s调用一次)
    32个Python爬虫项目让你一次吃到撑
    时间复杂度趣图分析
    各类数据库默认端口总结
    ansible使用三(ansible roles)
  • 原文地址:https://www.cnblogs.com/si-137/p/13415188.html
Copyright © 2011-2022 走看看