zoukankan      html  css  js  c++  java
  • 辅助排序和Mapreduce整体流程

    一、辅助排序

      需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。

      思路:1.封装订单类OrderBean,实现WritableComparable接口;

         2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;

         3.自定义分区,根据不同的订单id返回不同的分区值;

         4.自定义Reducer类;

         5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;

         6.书写Driver类;

      代码如下:

    /**
     * @author: PrincessHug
     * @date: 2019/3/25, 21:42
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class OrderBean implements WritableComparable<OrderBean> {
        private int orderId;
        private double orderPrice;
    
        public OrderBean() {
        }
    
        public OrderBean(int orderId, double orderPrice) {
            this.orderId = orderId;
            this.orderPrice = orderPrice;
        }
    
        public int getOrderId() {
            return orderId;
        }
    
        public void setOrderId(int orderId) {
            this.orderId = orderId;
        }
    
        public double getOrderPrice() {
            return orderPrice;
        }
    
        public void setOrderPrice(double orderPrice) {
            this.orderPrice = orderPrice;
        }
    
        @Override
        public String toString() {
            return orderId + "	" + orderPrice;
        }
    
        @Override
        public int compareTo(OrderBean o) {
            int rs ;
            if (this.orderId > o.getOrderId()){
                rs = 1;
            }else if (this.orderId < o.getOrderId()){
                rs = -1;
            }else {
                rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
            }
            return rs;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(orderId);
            out.writeDouble(orderPrice);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            orderId = in.readInt();
            orderPrice = in.readDouble();
        }
    }
    
    public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取数据
            String line = value.toString();
    
            //切割数据
            String[] fields = line.split("	");
    
            //封装数据
            int orderId = Integer.parseInt(fields[0]);
            double orderPrice = Double.parseDouble(fields[2]);
            OrderBean orderBean = new OrderBean(orderId, orderPrice);
    
            //发送数据
            context.write(orderBean,NullWritable.get());
        }
    }
    
    public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
        @Override
        public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
            //构造参数中i的值为reducetask的个数
            return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
        }
    }
    
    public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key,NullWritable.get());
        }
    }
    
    public class OrderGrouptingComparator extends WritableComparator {
        //必须使用super调用父类的构造方法来定义对比的类为OrderBean
        protected OrderGrouptingComparator(){
            super(OrderBean.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean aBean = (OrderBean)a;
            OrderBean bBean = (OrderBean)b;
    
            int rs ;
            if (aBean.getOrderId() > bBean.getOrderId()){
                rs = 1;
            }else if (aBean.getOrderId() < bBean.getOrderId()){
                rs = -1;
            }else {
                rs = 0;
            }
            return rs;
        }
    }
    
    public class OrderDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //配置信息,Job对象
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //执行类
            job.setJarByClass(OrderBean.class);
    
            //设置Mapper、Reducer类
            job.setMapperClass(OrderMapper.class);
            job.setReducerClass(OrderReducer.class);
    
            //设置Mapper输出数据类型
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //设置Reducer输出数据类型
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            //设置辅助排序
            job.setGroupingComparatorClass(OrderGrouptingComparator.class);
    
            //设置分区类
            job.setPartitionerClass(OrderPartitioner.class);
    
            //设置reducetask数量
            job.setNumReduceTasks(3);
    
            //设置文件输入输出流
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\order\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\order\out"));
    
            //提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
        }
    }
    

      由于这是敲了很多次的代码,没有加太多注释,请谅解!

    二、Mapreduce整体的流程

      1.有一块200M的文本文件,首先将待处理的数据提交客户端;

      2.客户端会向Yarn平台提交切片信息,然后Yarn计算出所需要的maptask的数量为2;

      3.程序默认使用FileInputFormat的TextInputFormat方法将文件数据读到maptask;

      4.maptask运行业务逻辑,然后将数据通过InputOutputContext写入到环形缓冲区;

      5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100M的80%时,发生溢写;

      6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释)

      7.分区排序后的数据块可以选择进行Combiner合并,然后写入本地磁盘;

      8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);

      9.Reducer一次读取一组数据,然后使用默认的TextOutputFormat方法将数据写出到结果文件。

  • 相关阅读:
    float保留指定位数的小数
    springmvc中拦截器的使用
    springmvc文件上传
    spring注入
    mybatis动态代理
    2017《JAVA技术》预备作业02 计科1502 郎春雨
    2017《JAVA技术》预备作业01 计科1502 郎春雨
    字符串占位符的使用
    Pyenv虚拟环境的创建(虚拟机)
    Git的基本使用
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10617937.html
Copyright © 2011-2022 走看看