1 package com.laowang.mapreduce;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.Path;
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Job;
9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13
14 import java.io.IOException;
15 import java.util.StringTokenizer;
16
17 public class MR {
18 /**
19 * @author laowang
20 * @version v1.0.0
21 * @apiNote Mapper
22 * @since 2018/4/27 10:44
23 * <p>
24 * KEYIN, VALUEIN, KEYOUT, VALUEOUT 输入key类型,输入value类型,输出KEY类型,输出value类型
25 */
26 static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
27 private final static IntWritable ONE = new IntWritable(1);
28 private Text word = new Text();
29
30 @Override
31 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
32 //获取每一行的数据
33 String lineStr = value.toString();
34 //以 空格、/t、/n、/r、/f 分割
35 StringTokenizer stringTokenizer = new StringTokenizer(lineStr);
36 //遍历
37 while (stringTokenizer.hasMoreTokens()) {
38 //获取截取后的每一个字符串
39 String wordValue = stringTokenizer.nextToken();
40 //拼接到word里面去
41 word.set(wordValue);
42 //写入到输出中
43 context.write(word, ONE);
44 }
45 }
46 }
47
48 /**
49 * @author laowang
50 * @version v1.0.0
51 * @apiNote Reducer
52 * @since 2018/4/27 10:44
53 * <p>
54 * KEYIN, VALUEIN, KEYOUT, VALUEOUT 输入key类型,输入value类型,输出KEY类型,输出value类型
55 */
56 static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
57 private IntWritable result = new IntWritable();
58 @Override
59 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
60 int sum = 0;
61 for (IntWritable value : values) {
62 sum += value.get();
63 }
64 result.set(sum);
65 context.write(key, result);
66 }
67 }
68
69 /**
70 * @author laowang
71 * @version v1.0.0
72 * @apiNote Client
73 * @since 2018/4/27 10:47
74 */
75 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
76 //获取配置信息
77 Configuration configuration = new Configuration();
78 //创建job
79 Job job = new Job(configuration,"wc");
80 //设置JOB运行的类
81 job.setJarByClass(MR.class);
82 //设置Mapper和Reducer
83 job.setMapperClass(MyMapper.class);
84 job.setReducerClass(MyReducer.class);
85 //设置输入和输出路径
86 FileInputFormat.addInputPath(job,new Path(args[0]));
87 FileOutputFormat.setOutputPath(job,new Path(args[1]));
88 //设置输出key和value的类型
89 job.setOutputKeyClass(Text.class);
90 job.setOutputValueClass(IntWritable.class);
91 //提交job
92 boolean b = job.waitForCompletion(true);
93 //结束程序
94 System.exit(b ? 0 : 1);
95 }
96 }