zoukankan      html  css  js  c++  java
  • MapReduce工作流多种实现方式

     学习 hadoop,必不可少的就是编写 MapReduce 程序。当然,对于简单的分析程序,我们只需一个 MapReduce 任务就能搞定,然而对于比较复杂的分析程序,我们可能需要多个Job或者多个Map或者Reduce进行分析计算。 本课程我们主要学习多个 Job 或者多个 MapReduce 的编程形式。

      MapReduce 的主要有以下几种编程形式。

    迭代式 MapReduce

            MapReduce 迭代方式,通常是将上一个 MapReduce 任务的输出作为下一个 MapReduce 任务的输入,可只保留 MapReduce 任务的最终结果,中间数据可以删除或保留,可根据业务需要自行决定。   迭代式 MapReduce 的示例代码如下所示。

    Configuration conf = new Configuration();
        
    //第一个 MapReduce 任务
    Job job1 = new Job(conf,"job1");
    .....
    FileInputFormat.addInputPath(job1,input);//job1的输入
    FileOutputFromat.setOutputPath(job1,out1);//job1的输出
    job1.waitForCompletion(true);
    
    //第二个 Mapreduce 任务
    Job job2 = new Job(conf,"job2");
    .....
    FileInputFormat.addInputPath(job2,out1);//job1的输出作为job2的输入
    FileOutputFromat.setOutputPath(job2,out2);//job2 的输出
    job2.waitForCompletion(true);
    
    //第三个 Mapreduce 任务
    Job job3 = new Job(conf,"job3");
    .....
    FileInputFormat.addInputPath(job3,out2);//job2的输出作为job3的输入
    FileOutputFromat.setOutputPath(job3,out3);//job3 的输出
    job3.waitForCompletion(true);
    .....

    虽然 MapReduce 的迭代可实现多任务的执行,但是它具有如下两个缺点:

    1、每次迭代,如果所有 Job 对象重复创建,代价将非常高。

    2、每次迭代,数据都要写入本地,然后从本地读取,I/O和网络传输的代价比较大。

    依赖关系式 MapReuce

            依赖关系式 MapReduce主要是由 org.apache.hadoop.mapred.jobcontrol 包中的 JobControl 类来实现。JobControl 的实例表示一个作业的运行图, 你可以加入作业配置,然后告知 JobControl 实例作业之间的依赖关系。在一个线程中运行 JobControl 时,它将按照依赖顺序来执行这些作业。也可以查看进程, 在作业结束后,可以查询作业的所有状态和每个失败相关的错误信息。如果一个作业失败,JobControl 将不执行与之有依赖关系的后续作业。

            依赖关系式 MapReuce 的示例代码如下所示。

    Configuration conf1 = new Configuration();
    Job job1 = new Job(conf1,"Job1");
    .........//job1 其它设置
    
    Configuration conf2 = new Configuration();
    Job job2 = new Job(conf2,"Job2");
    .........//job2 其它设置
    
    Configuration conf3 = new Configuration();
    Job job3 = new Job(conf3,"Job3");
    .........//job3 其它设置
    
    ControlledJob cJob1 = new ControlledJob(conf1);//构造一个 Job
    cJob1.setJob(job1);//设置 MapReduce job
    ControlledJob cJob2 = new ControlledJob(conf2);
    cJob2.setJob(job2);
    ControlledJob cJob3 = new ControlledJob(conf3);
    cJob3.setJob(job3);
    
    cJob3.addDependingJob(cJob1);//设置job3和job1的依赖关系
    cJob3.addDependingJob(cJob2);//设置job3和job2的依赖关系
    
    JobControl JC = new JobControl("123");
    //把三个构造的job加入到JobControl中
    JC.addJob(cJob1);
    JC.addJob(cJob2);
    JC.addJob(cJob3);
    Thread t = new Thread(JC);
    t.start();
    while (true) {
        if (jobControl.allFinished()) {
            jobControl.stop();
            break;
        }
    }

    注意:hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来启动它。直接调用JobControl的run()方法,线程将无法结束。

    线性链式 MapReduce

            大量的数据处理任务涉及对记录的预处理和后处理。

            例如:在处理信息检索的文档时,可能一步是移除 stop words(像a、the和is这样经常出现但不太有意义的词),另一步做stemming(转换一个词的不同形式为相同的形式,例如转换finishing和finished为finish)。

            你可以为预处理与后处理步骤各自编写一个 MapReduce 作业,并把它们链接起来。在这些步骤中可以使用IdentityReducer(或完全不同的 Reducer)。 由于过程中每一个步骤的中间结果都需要占用I/O和存储资源,这种做法是低效的。另一种方法是自己写 mapper去预先调用所有的预处理步骤,再让reducer调用所有的后处理步骤。这将强制你采用模块化和可组合的方式来构建预处理和后处理。因此Hadoop引入了ChainMapper 和ChainReducer类来简化预处理和后处理的构成。

            hadoop提供了专门的链式ChainMapper和ChainReducer来处理线性链式MapReduce任务。在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到后一个Mapper的输入,形成流水线。 其调用形式如下:

    ...        //预处理
    ChainMapper.addMapper(...);
    ChainReducer.setReducer(...);
    ChainReducer.addMapper(...);
    ...            //后处理
    //addMapper()调用的方法形式如下:
    public static void addMapper(Job job,
    Class< extends Mapper> mclass,
    Class< extends K1> inputKeyClass,
    Class< extends V1> inputValueClass,
    Class< extends K2> outputKeyClass,
    Class< extends V2> outputValueClass,
    Configuration conf
    )

      addMapper()方法有8个参数。第一个和最后一个分别为全局的Job和本地的configuration对象。第二个参数是 Mapper类,负责数据处理。余下4个参数 inputKeyClass、inputValueClass、outputKeyClass和outputValueClass是这个Mapper类中输入/输出类的类型。ChainReducer专门提供了一个setReducer()方法来设置整个作业唯一的Reducer,语法与addMapper()方法类似。

            线性链式 MapReduce 的示例代码如下所示。

    public void function throws IOException {
    Configuration conf = new Configuration();
    Job job = new Job(conf);
    
    job.setJobName("chainjob");
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(TextOutputFormat.class);
    
    FileInputFormat.addInputPath(job, in);
    FileOutputFormat.setOutputPath(job, out);
    //在作业中添加 Map1 阶段
    Configuration map1conf = new Configuration(false);
    ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,Text.class, Text.class, true, map1conf);
    //在作业中添加 Map2 阶段
    Configuration map2conf = new Configuration(false);
    ChainMapper.addMapper(job, Map2.class, Text.class, Text.class,LongWritable.class, Text.class, true, map2conf);
    //在作业中添加 Reduce 阶段
    Configuration reduceconf = new Configuration(false);
    ChainReducer.setReducer(job,Reduce.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduceconf);
    //在作业中添加 Map3 阶段
    Configuration map3conf = new Configuration(false);
    ChainReducer.addMapper(job,Map3.class,Text.class,Text.class,LongWritable.class,Text.class,true,map3conf);
    //在作业中添加 Map4 阶段
    Configuration map4conf = new Configuration(false);
    ChainReducer.addMapper(job,Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4conf);
    
    job.waitForCompletion(true);
    }

    注意:对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但是Reduce只能有一个。所以包含多个Reduce的作业,不能使用 ChainMapper/ChainReduce来完成。

  • 相关阅读:
    Windows Server 2008 R2 实现多用户连接远程桌面
    增加远程登录用户登陆个数
    Win2008R2PHP5.4环境加载Zend模块
    Windows 和  Linux 下 禁止ping的方法
    Windows 2003 FastCgi安装环境
    Windows2008下搭建NFS实现windows空间提供linux使用
    Spring + JdbcTemplate + JdbcDaoSupport examples
    Spring Object/XML mapping example
    Spring AOP + AspectJ in XML configuration example
    Spring AOP + AspectJ annotation example
  • 原文地址:https://www.cnblogs.com/qiaoyihang/p/6235903.html
Copyright © 2011-2022 走看看