分布式编程相对复杂,而Hadoop本身蒙上大数据、云计算等各种面纱,让很多初学者望而却步。可事实上,Hadoop是一个很易用的分布式编程框架,经过良好封装屏蔽了很多分布式环境下的复杂问题,因此,对普通开发者来说很容易,容易到可以照葫芦画瓢。
大多数Hadoop程序的编写可以简单的依赖于一个模板及其变种。当编写一个新的MapReduce程序时,我们通常采用一个现有的MapReduce程序,通过修改达到我们希望的功能就行了。对于写大部分的Hadoop程序来说几乎就是照葫芦画瓢。这个瓢到底是什么样子呢?还是和小讲一起看看吧。
使用 Java 语言编写 MapReduce 非常方便,因为 Hadoop 的 API 提供了 Mapper 和 Reducer 抽象类,对开发人员来说,只需要继承这两个抽象类,然后实现抽象类里面的方法就可以了。
有一份CSV格式专利引用数据,超过1600万行,某几行如下:
"CITING(引用)","CITED(被引用)"
3858241,956203
3858241,1324234
3858241,3398406
3858242,1515701
3858242,3319261
3858242,3707004
3858243,1324234
2858244,1515701
...
对每个专利,我们希望找到引用它的专利并合并,输出如下:
1324234 3858243,3858241
1515701 2858244,3858242
3319261 3858242
3398406 3858241
3707004 3858242
956203 3858241
...
下边的程序就实现了一个这样的功能。很强大的功能,代码就这么少,没想到吧???
下面是一个典型的Hadoop程序模板
package com.dajiangtai.hadoop.junior; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Hadoop程序基础模板 */ public class HadoopTpl extends Configured implements Tool { public static class MapClass extends Mapper< Text,Text,Text,Text> { public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(value, key); } } public static class ReduceClass extends Reducer< Text, Text, Text, Text> { public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException { String csv = ""; for(Text val:values) { if(csv.length() > 0) csv += ","; csv += val.toString(); } context.write(key, new Text(csv)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); //读取配置文件 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); Job job = new Job(conf, "HadoopTpl");//新建一个任务 job.setJarByClass(HadoopTpl.class);//主类 Path in = new Path(args[0]); Path out = new Path(args[1]); FileSystem hdfs = out.getFileSystem(conf); if (hdfs.isDirectory(out)) { hdfs.delete(out, true); } FileInputFormat.setInputPaths(job, in);//文件输入 FileOutputFormat.setOutputPath(job, out);//文件输出 job.setMapperClass(MapClass.class);//Mapper job.setReducerClass(ReduceClass.class);//Reducer job.setInputFormatClass(KeyValueTextInputFormat.class);//文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//文件输出格式 job.setOutputKeyClass(Text.class);//设置作业输出值 Key 的类 job.setOutputValueClass(Text.class);//设置作业输出值 Value 的类 System.exit(job.waitForCompletion(true)?0:1);//等待作业完成退出 return 0; } /** * @param args 输入文件、输出路径,可在Eclipse的Run Configurations中配Arguments如: * hdfs://single.hadoop.dajiangtai.com:9000/junior/patent.txt * hdfs://single.hadoop.dajiangtai.com:9000/junior/patent-out/ */ public static void main(String[] args) { try { int res = ToolRunner.run(new Configuration(), new HadoopTpl(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
可以想像,一份超过1600万的数据,实现这样一个功能,如果我们自己写算法处理,效率和资源耗费很难想像。可使用Hadoop处理起来就是这么简单。是不是很强大?加紧学习吧,少年!