zoukankan      html  css  js  c++  java
  • 二次排序

    一:二次排序自定义GroupingComparator

        1.问题:现有如下订单数据,需要求出每一个订单中成交金额最大的一笔交易

    订单id

    商品id

    成交金额

    Order_0000001

    Pdt_01

    222.8

    Order_0000001

    Pdt_05

    25.8

    Order_0000002

    Pdt_03

    522.8

    Order_0000002

    Pdt_04

    122.4

    Order_0000002

    Pdt_05

    722.4

    Order_0000003

    Pdt_01

    222.8

    Order_0000003

    Pdt_02

    22.8

    Order_0000004

    Pdt_03

    522.8

    Order_0000004

    Pdt_04

    122.4

    Order_0000004

    Pdt_05

    1034.4

       2.分析

        a、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

        b、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

      3.实现

         

     1 package com.oracle.www.secondarySort_Max2;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.FileSystem;
     7 import org.apache.hadoop.fs.Path;
     8 import org.apache.hadoop.io.LongWritable;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Job;
    12 import org.apache.hadoop.mapreduce.Mapper;
    13 import org.apache.hadoop.mapreduce.Reducer;
    14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;;
    16 
    17 public class SecondarySort {
    18     static class MyMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    19         @Override
    20         protected void map(LongWritable key, Text value,
    21                 Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
    22                 throws IOException, InterruptedException {
    23             String[] datas = value.toString().split("	");
    24             OrderBean bean = new OrderBean(datas[0], datas[1], Double.parseDouble(datas[2]));
    25             context.write(bean, NullWritable.get());
    26         }
    27     }
    28 
    29     static class MyReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    30         @Override
    31         protected void reduce(OrderBean key, Iterable<NullWritable> value,
    32                 Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
    33                 throws IOException, InterruptedException {
    34             context.write(key, NullWritable.get());
    35         }
    36     }
    37 
    38     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
    39         Configuration conf = new Configuration();
    40         try {
    41             Job job = Job.getInstance();
    42 
    43             job.setJarByClass(SecondarySort.class);
    44             job.setMapperClass(MyMapper.class);
    45             job.setReducerClass(MyReducer.class);
    46             job.setPartitionerClass(MyPartition.class);
    47             job.setGroupingComparatorClass(MyGroup.class);
    48 
    49             job.setMapOutputKeyClass(OrderBean.class);
    50             job.setMapOutputValueClass(NullWritable.class);
    51 
    52             job.setOutputKeyClass(OrderBean.class);
    53             job.setOutputValueClass(NullWritable.class);
    54 
    55             job.setNumReduceTasks(3);
    56 
    57             Path outPath = new Path("hdfs://192.168.9.13:8020/Order_item_data2");
    58             FileSystem fs = outPath.getFileSystem(conf);
    59             if (fs.exists(outPath)) {
    60                 fs.delete(outPath, true);
    61             }
    62             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/Order_item"));
    63             FileOutputFormat.setOutputPath(job, outPath);
    64             job.waitForCompletion(true);
    65 
    66         } catch (IOException e) {
    67             // TODO Auto-generated catch block
    68             e.printStackTrace();
    69         }
    70 
    71     }
    72 
    73 }
     1 package com.oracle.www.secondarySort_Max2;
     2 
     3 import org.apache.hadoop.io.NullWritable;
     4 import org.apache.hadoop.mapreduce.Partitioner;
     5 
     6 public class MyPartition extends Partitioner<OrderBean, NullWritable> {
     7 
     8     @Override
     9     public int getPartition(OrderBean key, NullWritable value, int numPartitions) {
    10         return key.getOrderId().hashCode() & Integer.MAX_VALUE % numPartitions;
    11     }
    12 
    13 }
     1 package com.oracle.www.secondarySort_Max2;
     2 
     3 import org.apache.hadoop.io.WritableComparable;
     4 import org.apache.hadoop.io.WritableComparator;
     5 
     6 public class MyGroup extends WritableComparator {
     7     public MyGroup() {
     8          super();
     9     }
    10 
    11     @Override
    12     public int compare(WritableComparable a, WritableComparable b) {
    13         OrderBean bean1 = (OrderBean) a;
    14         OrderBean bean2 = (OrderBean) b;
    15         return bean1.getOrderId().compareTo(bean2.getOrderId());
    16 
    17     }
    18 
    19 }
     1 package com.oracle.www.secondarySort_Max2;
     2 
     3 import java.io.DataInput;
     4 import java.io.DataOutput;
     5 import java.io.IOException;
     6 
     7 import org.apache.hadoop.io.WritableComparable;
     8 
     9 public class OrderBean implements WritableComparable<OrderBean> {
    10     private String orderId;
    11     private String productId;
    12     private double price;
    13 
    14     public OrderBean() {
    15 
    16     }
    17 
    18     public OrderBean(String orderId, String productId, double price) {
    19         this.orderId = orderId;
    20         this.productId = productId;
    21         this.price = price;
    22     }
    23 
    24     public String getOrderId() {
    25         return orderId;
    26     }
    27 
    28     public void setOrderId(String orderId) {
    29         this.orderId = orderId;
    30     }
    31 
    32     public String getProductId() {
    33         return productId;
    34     }
    35 
    36     public void setProductId(String productId) {
    37         this.productId = productId;
    38     }
    39 
    40     public double getPrice() {
    41         return price;
    42     }
    43 
    44     public void setPrice(double price) {
    45         this.price = price;
    46     }
    47 
    48     /*
    49      * (non-Javadoc)
    50      * 
    51      * @see java.lang.Object#toString()
    52      */
    53     @Override
    54     public String toString() {
    55         return "OrderBean [orderId=" + orderId + ", productId=" + productId + ", price=" + price + "]";
    56     }
    57 
    58     @Override
    59     public void write(DataOutput out) throws IOException {
    60         out.writeUTF(orderId);
    61         out.writeUTF(productId);
    62         out.writeDouble(price);
    63     }
    64 
    65     @Override
    66     public void readFields(DataInput in) throws IOException {
    67         this.orderId = in.readUTF();
    68         this.productId = in.readUTF();
    69         this.price = in.readDouble();
    70 
    71     }
    72 
    73     @Override
    74     public int compareTo(OrderBean o) {
    75         // 先按订单id进行排序,再按销售进行排序
    76         int temp = this.orderId.compareTo(o.orderId);
    77         if (temp == 0) {
    78             // this.-o.
    79             // 0:顺序不发生改变(set集合会覆盖,list集合顺序不发生改变)
    80             // 1(>0):当前对象大,顺序往后放
    81             // -1(<0):当前对象小,顺序往前放
    82             double error = this.getPrice() - o.getPrice();
    83             if (error > 0) {
    84                 return -1;
    85             } else {
    86                 return 1;
    87             }
    88         }
    89         return temp;
    90     }
    91 
    92 }
  • 相关阅读:
    极光推送
    ModelAndView跳转页面的时候,显示了页面的源码问题
    关于字符串比较时候出现的空指针问题的坑
    JAVA的extends用法
    C# 平面文件批量导数据到DB(二)
    C# 平面文件批量导数据到DB(一)
    C#操作文件和文件夹的类介绍
    C# 实现监控文件夹和里面文件的变化
    SQL Server 2012 Throw关键字
    SQL SERVER EXCEPT 和 INTERSECT
  • 原文地址:https://www.cnblogs.com/le-ping/p/7783519.html
Copyright © 2011-2022 走看看