对单词个数统计的MapReduce的案例
Mapper类:
package main.java.worldClient; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * <KEYIN,VALUEIN,KEYOUT,VALUEOUT> * 分别对应map输入和输出的key和value对应的数据类型 * 默认map的输入,key是改行在文件中的偏移量,value是文件中一行的内容 * @author Lenovo * */ public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ /** * 切分单词,然后输出 */ @Override protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //获取一行信息 String line = value.toString(); String words[] = line.split(" "); LongWritable writable = new LongWritable(1); for(String word:words){ //将输出写入context //write(a,b)中a与mapper(keyin,valuein,keyout,valueout)的keyout与valueout对应 context.write(new Text(word), writable); } } }
Reduce类:
package main.java.worldClient; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * <KEYIN,VALUEIN,KEYOUT,VALUEOUT> * reduce的输入和输出的key和value * 输入的key和value肯定和map输出的key和value一致 * @author Lenovo * */ public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws IOException, InterruptedException { int sum = 0; Iterator<LongWritable> iter = values.iterator(); while(iter.hasNext()){ LongWritable value = iter.next(); sum += value.get(); } context.write(key, new LongWritable(sum)); } }
Runner类:
package main.java.worldClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WCRunner { public static void main(String[] args) { Configuration conf = new Configuration(); try{ Job job = Job.getInstance(conf); job.setJobName("wc MR"); job.setJarByClass(WCRunner.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); /* * 如果map和reduce的输出类型一致可以不设置map的输出 */ //map输出的key,value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //reduce输出的key,value job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); //输出目录必须不存在 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
在windows环境下运行会报控指针错误,目前我正在查找解决方法,所以通过Xshell与Xftp将写好的java导出jar包以及程序需要的文件传到linux虚拟机内(用linux命令将输入文件导入到hadoop的目录下这样会在接下来方便写命令),在linux下运行测试。hadoop jar找到的jar包为本地jar包无法找hdfs上的jar文件(我自己的理解不知道对不对)
主要步骤:
1:bin/hadoop fs -mkdir -p /MRTest/input 在hdfs下创建目录
2:bin/hadoop fs -put ~/WCTest.txt.txt /MRTest/input 将程序需要执行的文件放到input文件夹下
3:bin/hadoop jar ~/wctest.jar main.java.worldClient.WCRunner /MRTest/input /MRTest/output 运行jar包 其中output必须时不存在的文件目录