zoukankan      html  css  js  c++  java
  • MapReduce源码Mapper和Reducer


    1、Mapper类,有setup()、map()、run()、cleanup()这4个方法,如下:
        setup():在run方法执行前首先被调用,且只调用1次,通常用于初始化。
        map():需要重写的方法,此方法中实现业务逻辑。run方法会循环遍历,为每个key、value调用一次这个方法。
        run():此方法由框架控制执行。
        cleanup():map函数处理完所有的key、value会停止遍历,接下来调用此方法1次用于清理。
        
        Mapper类源码如下:    
            public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

              public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
              
              protected void setup(Context context) throws IOException, InterruptedException {}

              @SuppressWarnings("unchecked")
              protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
                context.write((KEYOUT) key, (VALUEOUT) value);
              }

              protected void cleanup(Context context) throws IOException, InterruptedException {}

              public void run(Context context) throws IOException, InterruptedException {
                setup(context);
                try {
                  while (context.nextKeyValue()) {
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                  }
                } finally {
                  cleanup(context);
                }
              }
              
            }

    2、Reducer类,有setup()、reduce()、run()、cleanup()这4个方法,如下:
        setup():在run方法执行前首先被调用,且只调用1次,通常用于初始化。
        reduce():需要重写的方法,此方法中实现业务逻辑。run方法会循环遍历,为每个key、value调用一次这个方法。
        run():此方法由框架控制执行。
        cleanup():reduce函数处理完所有的key、value会停止遍历,接下来调用此方法1次用于清理。
        
        Reducer类源码如下:    
            public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

              public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}

              protected void setup(Context context) throws IOException, InterruptedException {}

              @SuppressWarnings("unchecked")
              protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
                for(VALUEIN value: values) {
                  context.write((KEYOUT) key, (VALUEOUT) value);
                }
              }

              protected void cleanup(Context context) throws IOException, InterruptedException {}

              public void run(Context context) throws IOException, InterruptedException {
                setup(context);
                try {
                  while (context.nextKey()) {
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<VALUEIN> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                      ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
                    }
                  }
                } finally {
                  cleanup(context);
                }
              }
              
            }

  • 相关阅读:
    如何在java类中读取Properties配置文件
    常用网址 转
    我的读书计划
    制作一个半透明遮罩层的库——TipView
    RxJava 中的 subscribeOn 和 observeOn 的区别
    一个操作SQLite数据库的例子
    Java ThreadLocal
    3. 请求与响应
    Java的Volatile关键字
    排序
  • 原文地址:https://www.cnblogs.com/mengyao/p/4869454.html
Copyright © 2011-2022 走看看