一、顺序式
执行完一个mapreduce,再执行一个mapreduce
Configuration conf1 = new Configuration(); conf1.set("mapred.job.tracker", "192.168.1.164:9001"); String[] ars=new String[]{"T2G_input","T2G_output1"}; String[] otherArgs = new GenericOptionsParser(conf1, ars).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job1 = new Job(conf1, "Job1"); job1.setJarByClass(T2G.class); job1.setMapperClass(Map.class); job1.setReducerClass(Reduce.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job1, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job1, new Path(otherArgs[1])); job1.waitForCompletion(true); //sub Mapreduce Configuration conf2 = new Configuration(); conf2.set("mapred.job.tracker", "192.168.1.164:9001"); ars=new String[]{"T2G_output1","T2G_output2"}; otherArgs = new GenericOptionsParser(conf2, ars).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job2 = new Job(conf2, "Job2"); job2.setJarByClass(T2G.class); job2.setMapperClass(Map2.class); job2.setReducerClass(Reduce2.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job2, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job2, new Path(otherArgs[1]));
job2.waitForCompletion(true);
二、依赖关系组合式
一个mapreduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合式mapreduce。hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,执行JobControl的run()方法即可运行程序。
Configuration job1conf = new Configuration(); Job job1 = new Job(job1conf,"Job1"); .........//job1 其他设置 Configuration job2conf = new Configuration(); Job job2 = new Job(job2conf,"Job2"); .........//job2 其他设置 Configuration job3conf = new Configuration(); Job job3 = new Job(job3conf,"Job3"); .........//job3 其他设置 job3.addDepending(job1);//设置job3和job1的依赖关系 job3.addDepending(job2); JobControl JC = new JobControl("123"); JC.addJob(job1);//把三个job加入到jobcontorl中 JC.addJob(job2); JC.addJob(job3); JC.run();
三、链式
首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为用一个单独的Mapreduce任务可以实现,单增加了多个Mapreduce作业,将增加整个作业处理的周期,还增加了I/O操作,因而处理效率不高。
一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的Mapreudce过程合并为一个链式的Mapreduce,从而完成整个作业。hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。其调用形式如下:
ChainMapper.addMapper(...); ChainReducer.addMapper(...); //addMapper()调用的方法形式如下: public static void addMapper(JOb job, Class<? extends Mapper> mclass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration conf ){ }
其中,ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。
note:这些Mapper和Reducer之间传递的键和值都必须保持一致。
下面举个例子:用ChainMapper把Map1加如并执行,然后用ChainReducer把Reduce和Map2加入到Reduce过程中。代码如下:Map1.class 要实现map方法
public void function throws IOException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("ChianJOb"); // 在ChainMapper里面添加Map1 Configuration map1conf = new Configuration(false); ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); // 在ChainReduce中加入Reducer,Map2; Configuration reduceConf = new Configuration(false); ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); Configuration map2Conf = new Configuration(); ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); job.waitForCompletion(true); }
The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.
通过ChainMapper可以将多个map类合并成一个map任务。
下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。
源文件
100 tom 90
101 mary 85
102 kate 60
map00的结果,过滤掉100的记录
101 mary 85
102 kate 60
map01的结果,过滤掉101的记录
102 kate 60
reduce结果
102 kate 60
package org.myorg;
import java.io.IOException;
import java.util.*;
import java.lang.String;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
public class WordCount
{
public static class Map00 extends MapReduceBase implements Mapper
{
public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
{
Text ft = new Text(“100″);
if(!key.equals(ft))
{
output.collect(key, value);
}
}
}
public static class Map01 extends MapReduceBase implements Mapper
{
public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
{
Text ft = new Text(“101″);
if(!key.equals(ft))
{
output.collect(key, value);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
{
while(values.hasNext())
{
output.collect(key, values.next());
}
}
}
public static void main(String[] args) throws Exception
{
JobConf conf = new JobConf(WordCount.class);
conf.setJobName(“wordcount00″);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
ChainMapper cm = new ChainMapper();
JobConf mapAConf = new JobConf(false);
cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf);
JobConf mapBConf = new JobConf(false);
cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf);
conf.setReducerClass(Reduce.class);
conf00.setOutputKeyClass(Text.class);
conf00.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}