简介:这里先手写一个MR程序,大致实现一个单词计数程序。帮助后面学习MapReduce组件。
1:先自定义一个Mapper接口
package it.dawn.HDFSPra.HandWritingMR;
/**
* @author Dawn
* @date 2019年4月30日23:28:00
* @version 1.0
*
* 思路?
* 接口设计
*/
public interface Mapper {
//通用方法
public void map(String line,Context context);
}
2:定义一个Context类:
该类主要实现数据的传输,和数据的封装(这里用的一个HashMap进行封装的)
package it.dawn.HDFSPra.HandWritingMR;
/**
* @author Dawn
* @date 2019年4月30日23:18:13
* @version 1.0
*
* 思路?
* 数据传输的类
* 封装数据
* 集合
* <单词,1>
*/
import java.util.HashMap;
public class Context {
//数据封装
private HashMap<Object, Object> contextMap=new HashMap<>();
//写数据
public void write(Object key,Object value) {
//放数据到map中
contextMap.put(key, value);
}
//定义根据key拿到值方法
public Object get(Object key) {
return contextMap.get(key);
}
//拿到map当中的数据内容
public HashMap<Object, Object> getContextMap(){
return contextMap;
}
}
3:实现Mapper类(其实这里就是简化的Map和Reduce阶段)
package it.dawn.HDFSPra.HandWritingMR;
/**
* @author Dawn
* @date 2019年4月30日23:22:35
* @version 1.0
*
* 思路:
* 添加一个map方法 单词切分 相同key的value ++
*/
public class WordCountMapper implements Mapper{
@Override
public void map(String line, Context context) {
//1.拿到这行数据 切分
String[] words=line.split(" ");
//2.拿到单词 相同的key value++ hello 1 itstar 1
for(String word:words) {
Object value=context.get(word);
//相对于插入数据到HashMap中
if(null==value) {
context.write(word, 1);
}else {
//HashMap不为空
int v=(int) value;
context.write(word, v+1);
}
}
}
}
4:写一个总程序将这几个串起来(相当于是一个MR中的那个Driver程序,指定Map和Reduce的类。总程序入口)
package it.dawn.HDFSPra.HandWritingMR;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
/**
* @author Dawn
* @date 2019年4月30日23:07:18
* @version 1.0
*
* 需求:文件(hello itstar hello hunter hello hunter henshuai ) 统计每个单词出现的次数?
* 数据存储在hdfs、统计出来的结果存储到hdfs
*
* 2004google:dfs/bigtable/mapreduce
*
* 大数据解决的问题?
* 1.海量数据的存储
* hdfs
* 2.海量数据的计算
* mapreduce
*
* 思路?
* hello 1
* itstar 1
* hello 1
* ...
*
* 基于用户体验:
* 用户输入数据(hdfs)
* 用户处理的方式
* 用户指定结果数据存储位置
*
*/
public class HdfsWordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException {
//反射
Properties pro=new Properties();
//加载配置文件
pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties"));
Path inpath=new Path(pro.getProperty("IN_PATH"));
Path outpath=new Path(pro.getProperty("OUT_PATH"));
Class<?> mapper_class=Class.forName(pro.getProperty("MAPPER_CLASS"));
//实例化
Mapper mapper=(Mapper) mapper_class.newInstance();
Context context=new Context();
//1.构建hdfs客户端对象
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://bigdata11:9000"), conf, "root");
//2.读取用户输入的文件
//读取到的是改文件下的所有的txt文件
RemoteIterator<LocatedFileStatus> iter=fs.listFiles(inpath, false);
while(iter.hasNext()) {
LocatedFileStatus file=iter.next();
//打开路径 获取输入流
FSDataInputStream in=fs.open(file.getPath());
BufferedReader br=new BufferedReader(new InputStreamReader(in, "utf-8"));
String line = null;
while((line=br.readLine()) != null) {
//调用map方法执行业务逻辑
mapper.map(line, context);
}
br.close();
in.close();
}
//如果用户输入的结果路径不存在 则创建一个
Path out = new Path("/wc/out/");
if(!fs.exists(out))
fs.mkdirs(out);
//将缓存的结果放入hdfs中存储
HashMap<Object, Object> contextMap=context.getContextMap();
FSDataOutputStream out1=fs.create(outpath);
//遍历HashMap
Set<Entry<Object, Object>> entrySet = contextMap.entrySet();
for(Entry<Object, Object> entry:entrySet) {
//写数据
out1.write((entry.getKey().toString()+" "+entry.getValue()+"
").getBytes());
}
out1.close();
fs.close();
System.out.println("数据统计结果完成....");
}
}
配置文件(job.properties)如下:
IN_PATH=/dawn
OUT_PATH=/wc/out/rs.txt
MAPPER_CLASS=it.dawn.HDFSPra.HandWritingMR.WordCountMapper