zoukankan      html  css  js  c++  java
  • MapReduce的自定义排序、分区和分组

    自定义排序(WritableComparable)

    当写mr程序来处理文本时,经常会将处理后的信息封装到我们自定义的bean中,并将bean作为map输出的key来传输

    而mr程序会在处理数据的过程中(传输到reduce之前)对数据排序(如:map端生成的文件中的内容分区且区内有序)。

    操作:

    自定义bean来封装处理后的信息,可以自定义排序规则用bean中的某几个属性来作为排序的依据

    代码节段:

    自定义的bean实现WritableComparable接口,并对compareTo的实现

     //先按照年龄排序,再按性别(年龄小,sex大的在前)

    @Override
    public int compareTo(Person o) {
      if(o.age==this.age){
        if(o.sex==this.sex){
          return 0;
        }else{
          return o.sex-this.sex;
        }
      }else{
      return this.age-o.age;
      }
    }

    注意: 1.hadoop开发了一套自己的序列化和反序列化策略(Writable),因为map端的文件要下载到reduce端的话如果不在同一台节点上是会走网络进行传输(hadoop-rpc),所以对象需要序列化。

        2.如果空构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。

    自定义分区(Partitioner)

    Mapreduce中会将maptask输出的kv对,默认(HashPartitioner)根据key的hashcode%reducetask数来分区。如果要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner继承抽象类:Partitioner。 

    操作:

    在job对象中,设job.setPartitionerClass(自定义分区类.class)

    节选代码:

    public class CustomPartitioner extends Partitioner<Text,Text>{

      /*
      * numPartitions其实我们可以设置,在job.setNumReduceTasks(n)设置。
      * 假如job.setNumReduceTasks(5),那么这里的numPartitions=5,那么默认的HashPartitioner的机制就是用key的hashcode%numPartitions来决定分区属于哪个分区,所以分区数量就等于我们设置的reduce数量5个。
      */
      @Override
      public int getPartition(Text key, Text value, int numPartitions) {
        Integer hash = numMap.get(key.toString().substring(0, 1));
        //将没有匹配到的数据放入3号分区
        return hash==null?3:hash;
      }
    }

    自定义分组(GroupingComparator)

    假设把bean作为key发送给reduce,而在reduce端我们希望将年龄相同的kv聚合成组,那么就可以如下方式实现。

    1.自定义分组要继承WritableComparator,然后重写compare方法。

    2.定义完成后要设置job.setGroupingComparatorClass(CustomGroupingComparator.class);
    代码节选

    public class CustomGroupingComparator extends WritableComparator{
      protected CustomGroupingComparator() {
        super(Person.class, true);
      }
      @Override
      public int compare(WritableComparable a, WritableComparable b) {
        Bean abean = (Bean) a;
        Bean bbean = (Bean) b;
        //将item_id相同的bean都视为相同,从而聚合为一组
        return abean.getAge()-bbean.getAge();
      }
    }

     

    自定义分组排序(SortGroupingComparator)尽快补充上

    自定义分组排序(SortComparator)尽快补充上

     每个分区内都会调用job.setSortComparatorClass()设置的key比较函数类排序;

    如果没有通过job.setSortComparatorClass()设置key比较函数类,则使用key的实现的compareTo方法

     

  • 相关阅读:
    Cocos Creator 镜头跟随(cc.follow)
    Cocos Creator 虚拟摇杆
    Creator Cocos 获取舞台尺寸 (屏幕大小)
    Cocos Creator中的计时器 (setTimeOut ,setInterval,Schedule )
    Cocos Creator Touch_End的触发条件 (Mouse事件)
    Cocos Creator Spine骨骼动画 (局部换装、全局换装)
    Cocos Creator 粒子效果
    Cocos Creator 的Http和WebSocket
    Android中图表AChartEngine学习使用与例子
    Android应用加入微信分享
  • 原文地址:https://www.cnblogs.com/zyanrong/p/10884239.html
Copyright © 2011-2022 走看看