MR之排序
概述:
1. MapTask和ReduceTask均会为数据按照 key 进行排序,该操作属于为Hadoop的默认行为
2. 任何程序中的数据都会被排序,而不管逻辑上是否需要
3. 默认排序是按照字典顺序排序,且实现改排序的方法是 快速排序
拷贝阶段中的排序:
在reducetask中,它从每个MapTask远程拷贝相应的数据文件,如果文件大小超过一定的阈值,则溢写磁盘上,否则存储在内存中
如果磁盘上的文件数量达到一定的阈值,则进行一次 归并排序 合成一个更大的文件
所有数据拷贝完成之后,ReduceTask统一对内存和磁盘的所有数据进行一次 归并排序
分类:
1. 部分排序:MapReduce根据输入记录的键对数据集进行排序,保证输出的每个文件都有序
2. 全排序:最终输出结果只有一个文件,且文件内部有序,即reducetask只设置为一个。在处理大型文件时,效率低下
3. 辅助排序:在reduce端对 key 进行分组,应用于在接受的key为bean对象时,想让一个或者几个字段相同的key进入到同一个reduce方法时,可以采用分组排序
4. 二次排序:在自定义排序过程中,如果CompareTo中的判断条件为两个即为二次排序
5. 自定义排序 WriteableComparable
5.1 原理分析:bean对象作为key传输,需要实现WriteableComparable接口重写CompareTo方法即可
5.2 代码实现:
@Override public int compareTo(FlowBean o){ int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()){ result = -1; }else if (sumFlow < bean.getSumFlow()){ result = 1; }else { result = 0; } return result; }
案例演示:
需求:找出订单中最贵的商品
数据准备:
00000001 Pdt_01 222.8
00000002 Pdt_05 722.4
00000001 Pdt_02 33.8
00000003 Pdt_06 232.8
00000003 Pdt_02 33.8
00000002 Pdt_03 522.8
00000002 Pdt_04 122.4
设计订单的JavaBean对象,属性包括:订单id、订单价格
public class OrderBean implements WritableComparable<OrderBean> { private int order_id; // 订单id private double price; // 订单价格 public OrderBean() { } public OrderBean(int order_id, double price) { this.order_id = order_id; this.price = price; } @Override public int compareTo(OrderBean bean) { // 先按照订单id升序排序,如果相同按照价格的降序排序 int result; if (order_id > bean.getOrder_id()){ result = 1; }else if (order_id < bean.getOrder_id()){ result = -1; }else { if (price > bean.getPrice()){ result = -1; }else if (price < bean.getPrice()){ result = 1; }else { result = 0; } } return result; } // 序列化 @Override public void write(DataOutput output) throws IOException { output.writeInt(order_id); output.writeDouble(price); } // 反序列化 @Override public void readFields(DataInput input) throws IOException { order_id = input.readInt(); price = input.readDouble(); } @Override public String toString() { return order_id + " " + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
Mapper类:切割,封装对象
public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一行数据 String line = value.toString(); // 切割 String[] fields = line.split(" "); // 封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 写出 context.write(k,NullWritable.get()); } }
Reducer类:写出数据
public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
OrderGroupingComparato:分组排序
public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { // 要求只要id相同就认为是相同的key OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }
驱动类Driver:
public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"E:/input/group","E:/output"}; // 获取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderSortMapper.class); job.setReducerClass(OrderSortReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定分组类 job.setGroupingComparatorClass(OrderGroupingComparator.class); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }