众所周知,因为shuffle的存在,mr要求传入的key必须能够实现排序这个功能,那么最简单有效的方式就是在bean中实现writeableComparable接口,从而实现compareTo方法
那么还有没有其他的办法呢,这几天在追Hadoop的源码时发现
public RawComparator getOutputKeyComparator() { //尝试获取参数中配置的mapreduce.job.output.key.comparator.class,作为比较器, //如果没有定义,默认为null,定义的话必须是RawComparator类型 Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null, RawComparator.class); //如果用户配置,就实例化此类型的一个对象 if (theClass != null) return ReflectionUtils.newInstance(theClass, this); // 判断Mapper输出的key是否是writableComparable类型的子类, //如果是,就默认由系统提供比较器,如果不是就抛异常! return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); }
简而言之,就是用户可以通过设置比较器的方法来自定义排序
job.setSortComparatorClass(MyComparator.class);
注意:定义的比较器必须是RowComparator的实现类!具体实现方式为
public class MyRowCompare implements RawComparator<FlowBean2> { FlowBean2 o1 = new FlowBean2(); FlowBean2 o2 = new FlowBean2(); DataInputBuffer buffer = new DataInputBuffer(); @Override public int compare(FlowBean2 o1, FlowBean2 o2) { return o2.getSumFlow() - o1.getSumFlow(); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { buffer.reset(b1,s1,l1); //载入缓冲区 try { o1.readFields(buffer); //反序列化为对象 } catch (IOException e) { e.printStackTrace(); } buffer.reset(b2,s2,l2); try { o2.readFields(buffer); } catch (IOException e) { e.printStackTrace(); } return compare(o1, o2); // 将对象传入比较器比较 } }