zoukankan      html  css  js  c++  java
  • 辅助排序和Mapreduce整体流程

    一、辅助排序

      需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。

      思路:1.封装订单类OrderBean,实现WritableComparable接口;

         2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;

         3.自定义分区,根据不同的订单id返回不同的分区值;

         4.自定义Reducer类;

         5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;

         6.书写Driver类;

      代码如下:

    /**
     * @author: PrincessHug
     * @date: 2019/3/25, 21:42
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class OrderBean implements WritableComparable<OrderBean> {
        private int orderId;
        private double orderPrice;
    
        public OrderBean() {
        }
    
        public OrderBean(int orderId, double orderPrice) {
            this.orderId = orderId;
            this.orderPrice = orderPrice;
        }
    
        public int getOrderId() {
            return orderId;
        }
    
        public void setOrderId(int orderId) {
            this.orderId = orderId;
        }
    
        public double getOrderPrice() {
            return orderPrice;
        }
    
        public void setOrderPrice(double orderPrice) {
            this.orderPrice = orderPrice;
        }
    
        @Override
        public String toString() {
            return orderId + "	" + orderPrice;
        }
    
        @Override
        public int compareTo(OrderBean o) {
            int rs ;
            if (this.orderId > o.getOrderId()){
                rs = 1;
            }else if (this.orderId < o.getOrderId()){
                rs = -1;
            }else {
                rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
            }
            return rs;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(orderId);
            out.writeDouble(orderPrice);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            orderId = in.readInt();
            orderPrice = in.readDouble();
        }
    }
    
    public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取数据
            String line = value.toString();
    
            //切割数据
            String[] fields = line.split("	");
    
            //封装数据
            int orderId = Integer.parseInt(fields[0]);
            double orderPrice = Double.parseDouble(fields[2]);
            OrderBean orderBean = new OrderBean(orderId, orderPrice);
    
            //发送数据
            context.write(orderBean,NullWritable.get());
        }
    }
    
    public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
        @Override
        public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
            //构造参数中i的值为reducetask的个数
            return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
        }
    }
    
    public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key,NullWritable.get());
        }
    }
    
    public class OrderGrouptingComparator extends WritableComparator {
        //必须使用super调用父类的构造方法来定义对比的类为OrderBean
        protected OrderGrouptingComparator(){
            super(OrderBean.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean aBean = (OrderBean)a;
            OrderBean bBean = (OrderBean)b;
    
            int rs ;
            if (aBean.getOrderId() > bBean.getOrderId()){
                rs = 1;
            }else if (aBean.getOrderId() < bBean.getOrderId()){
                rs = -1;
            }else {
                rs = 0;
            }
            return rs;
        }
    }
    
    public class OrderDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //配置信息,Job对象
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //执行类
            job.setJarByClass(OrderBean.class);
    
            //设置Mapper、Reducer类
            job.setMapperClass(OrderMapper.class);
            job.setReducerClass(OrderReducer.class);
    
            //设置Mapper输出数据类型
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //设置Reducer输出数据类型
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            //设置辅助排序
            job.setGroupingComparatorClass(OrderGrouptingComparator.class);
    
            //设置分区类
            job.setPartitionerClass(OrderPartitioner.class);
    
            //设置reducetask数量
            job.setNumReduceTasks(3);
    
            //设置文件输入输出流
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\order\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\order\out"));
    
            //提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
        }
    }
    

      由于这是敲了很多次的代码,没有加太多注释,请谅解!

    二、Mapreduce整体的流程

      1.有一块200M的文本文件,首先将待处理的数据提交客户端;

      2.客户端会向Yarn平台提交切片信息,然后Yarn计算出所需要的maptask的数量为2;

      3.程序默认使用FileInputFormat的TextInputFormat方法将文件数据读到maptask;

      4.maptask运行业务逻辑,然后将数据通过InputOutputContext写入到环形缓冲区;

      5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100M的80%时,发生溢写;

      6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释)

      7.分区排序后的数据块可以选择进行Combiner合并,然后写入本地磁盘;

      8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);

      9.Reducer一次读取一组数据,然后使用默认的TextOutputFormat方法将数据写出到结果文件。

  • 相关阅读:
    Apache 虚拟主机 VirtualHost 配置
    EAX、ECX、EDX、EBX寄存器的作用
    Python中文文档 目录(转载)
    八度
    POJ 3268 Silver Cow Party (最短路)
    POJ 2253 Frogger (求每条路径中最大值的最小值,Dijkstra变形)
    2013金山西山居创意游戏程序挑战赛——复赛(1) HDU 4557 非诚勿扰 HDU 4558 剑侠情缘 HDU 4559 涂色游戏 HDU 4560 我是歌手
    HDU 4549 M斐波那契数列(矩阵快速幂+欧拉定理)
    UVA 11624 Fire! (简单图论基础)
    HDU 3534 Tree (树形DP)
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10617937.html
Copyright © 2011-2022 走看看