(一)MapReduce简单介绍
- MapReduce是一种分布式的计算模型,主要用于搜索领域,解决海量数据的计算问题
- 它主要由两个阶段组成:Map和Reduce,用户只要实现map()和reduce()两个函数,就可以实现分布式计算
- 这两个函数的形参是key,value对,表示函数的输入信息
(二)步骤分析
map过程
1.1 读取输入文件内容,解析成key、value对。对输入的文件的每一行,解析成key、value对。每一个键值对调用一次map函数。 2.2 写出自己的算法逻辑,对输入的key、value对进行处理,转换成新的key、value输出。
reduce过程
2.1 在reduce之前,shuffle会对多个map任务的输出进行合并、排序。 2.2 在reduce函数中写出自己的算法逻辑,对输入的key、value对进行处理,转换成新的key、value输出。 2.3 把输出保存到文件当中。
(三)WordCount例子学习
准备需要的文件
在这里,我写了一个source.txt文件并且通过
hdfs dfs -put /home/txp/桌面/source.txt /wc/srcdata
上传到HDFS中去(当然此时已经启动hadoop集群)
- 定义一个WcMapper类继承Mapper
public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//接受数据
String line=value.toString();
//切分数据
String [] words=StringUtils.split(line," ");
//循环
for (String w :words) {
//出现一次 记个1
context.write(new Text(w), new LongWritable(1));
}
}
}
对文件中的数据是一行一行的处理。
- 定义一个WcReducer类继承Reducer
public class WcReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> value,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//定义一个计算器
long count=0;
//循环
for(LongWritable v:value) {
count+=v.get();
}
//输出
context.write(key, new LongWritable(count));
}
}
4.创建WordCount 类具体实现
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
//conf.set("mapreduce.job.jar", "wc.jar");
//新建一个job对象
Job job = Job.getInstance(conf);
//加载main方法所在的类
job.setJarByClass(WordCount.class);
//设置mapper与reducer类
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
//Combiner组件
//指定job所使用的combiner类定义
job.setCombinerClass(WcReducer.class);
//设置mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reducer输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://Master:9000/wc/srcdata"));
//如果已经有了output了 删除
FileSystem fs=FileSystem.get(conf);
Path output=new Path("/wc/output");
if(fs.exists(output)) {
fs.delete(output,true);
}
//指定处理之后的结果输出到那个路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://Master:9000/wc/output"));
boolean res = job.waitForCompletion(true);
System.out.println(res?1:0);
}
}
5.运行模式
5.1提交到集群中运行
将开发好的该程序打成jar包,上传到服务器,执行命令
hadoop jar 包名.主类 输入目录 输出目录
5.2在本地运行
直接在WordCount中启动,这种也可以在yarn集群中运行