hadoop-04
1.流量案例分析
- 统计每个人总流量
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Flow {
static class FlowMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
Text k = new Text();
LongWritable v = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// 行数据切割
String[] split = line.split("\s+");
String tel = split[0];
String url = split[1];
long upFlow = Long.parseLong(split[2]);
long downFlow = Long.parseLong(split[3]);
long sumFlow = upFlow+downFlow;
k.set(tel);
v.set(sumFlow);
// map写出数据
context.write(k, v);
}
}
static class FlowReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
// 初始化v,v为行的流量
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> value,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sumFlow = 0;
for (LongWritable longWritable : value) {
sumFlow += longWritable.get();
}
v.set(sumFlow);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(conf);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/http.txt"));
FileOutputFormat.setOutputPath(job, new Path("d:/data/flow/"));
boolean b = job.waitForCompletion(true);
// 程序退出 0正常退出 ,非0异常退出
System.exit(b?0:-1);
}
}
2.流量案例分析对象
- FlowBean.java
package com.xjk.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable {
private String tel;
private String url;
private long upFlow;
private long downFlow;
@Override
public String toString() {
return "FlowBean [tel=" + tel + ", url=" + url + ", upFlow=" + upFlow + ", downFlow=" + downFlow + "]";
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
// 序列化写数据
public void write(DataOutput out) throws IOException {
out.writeUTF(tel);
out.writeUTF(url);
out.writeLong(upFlow);
out.writeLong(downFlow);
}
// 反序列化读取
public void readFields(DataInput in) throws IOException {
// 读取数据的顺序和写出的顺序一致
this.tel = in.readUTF() ;
this.url = in.readUTF() ;
this.upFlow = in.readLong() ;
this.downFlow = in.readLong() ;
}
}
- Flow2.java
package com.xjk.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Flow2 {
static class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// 行数据切割
String[] split = line.split("\s+");
String tel = split[0];
String url = split[1];
long upFlow = Long.parseLong(split[2]);
long downFlow = Long.parseLong(split[3]);
// long sumFlow = upFlow+downFlow;
k.set(tel);
v.setUrl(url);
v.setTel(tel);
v.setUpFlow(upFlow);
v.setDownFlow(downFlow);
// map写出数据
context.write(k, v);
}
}
static class FlowReducer extends Reducer<Text,FlowBean,Text,LongWritable>{
// 初始化v,v为行的流量
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0;
for (FlowBean flowBean : values) {
long upFlow = flowBean.getUpFlow();
long downFlow = flowBean.getDownFlow();
sum += (upFlow + downFlow);
}
v.set(sum);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(conf);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/http.txt"));
FileOutputFormat.setOutputPath(job, new Path("d:/data/flow2/"));
boolean b = job.waitForCompletion(true);
// 程序退出 0正常退出 ,非0异常退出
System.exit(b?0:-1);
}
}
3.电影案例分析评分最高N部电影
- RateTopN.java
package MovieDemo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.google.gson.Gson;
public class RateTopN {
/*
* 读取数据
* 电影id为key,整个电影为value 输出
*
* */
static class RateTopNMapper extends Mapper<LongWritable, Text, Text, MoiveBean>{
Gson gs = new Gson();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MoiveBean>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// 行数据转换json对象
MoiveBean mb = gs.fromJson(line, MoiveBean.class);
// 写入电影id
k.set(mb.getMovie());// 同一个电影id,分到同一组,同一部电影分布到同一个reduce端
context.write(k, mb);
}
}
static class RateTopNReducer extends Reducer<Text, MoiveBean, MoiveBean, NullWritable>{
@Override
protected void reduce(Text arg0, Iterable<MoiveBean> values,
Reducer<Text, MoiveBean, MoiveBean, NullWritable>.Context context)
throws IOException, InterruptedException {
// 将mb中存储list中,按照分数教育,获取前n个数据
try {
List<MoiveBean> list = new ArrayList<>();
for (MoiveBean moiveBean : values) {
MoiveBean m = new MoiveBean();
// 将moiveBean属性拷贝到m上去
BeanUtils.copyProperties(m, moiveBean);
list.add(m);
}
// 对list排序
Collections.sort(list, new Comparator<MoiveBean>() {
@Override
public int compare(MoiveBean o1, MoiveBean o2) {
// 降序排列, 比较double类型
return Double.compare(o2.getRate(), o1.getRate());
}
});
//
//Math.min(1, 4);//用于返回一个小的数据
//Math.max(2, 10);// 用于返回一个大的数据
for (int i = 0; i < Math.min(3, list.size()); i++) {
MoiveBean moiveBean = list.get(i);
// 写数据
context.write(moiveBean, NullWritable.get());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
Job job = Job.getInstance(configuration);
// 指定map和reduce的类
job.setMapperClass(RateTopNMapper.class);
job.setReducerClass(RateTopNReducer.class);
// map 输出k-v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MoiveBean.class);
// reduce 输出k-v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 输入数据文件路经
FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/simple"));
//输出数据路径
FileOutputFormat.setOutputPath(job, new Path("d:/data/ratetopn"));
// 设置reduce数量
job.setNumReduceTasks(2);
// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
job.waitForCompletion(true);
}
}
- MoiveBean.java
package MovieDemo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class MoiveBean implements Writable {
private String movie;
private double rate;
private String timeStamp;
private int uid;
@Override
public String toString() {
return "MoiveBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
}
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
// 序列化写数据
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(movie);
out.writeDouble(rate);
out.writeUTF(timeStamp);
out.writeInt(uid);
}
// 反序列化读数据
@Override
public void readFields(DataInput in) throws IOException {
this.movie = in.readUTF();
this.rate = in.readDouble();
this.timeStamp = in.readUTF();
this.uid = in.readInt();
}
}
4.求评论次数最多前5个人
以uid为key,movie为value
- reduce方法下面有一个cleanUp方法,它特点在task执行完之后调用一次。同样setup方法是在启动task时候调用一次,这样reduce处理完的数据进行一个全局排序。
-
代码
package MovieDemo; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.google.gson.Gson; import MovieDemo.RateTopN.RateTopNMapper; import MovieDemo.RateTopN.RateTopNReducer; public class HitTopN { /* * uid key * movie value * */ static class HitTopNMapper extends Mapper<LongWritable,Text,Text,MoiveBean>{ Gson gs = new Gson(); Text k = new Text(); MoiveBean v = new MoiveBean(); protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,MoiveBean>.Context context) throws IOException, InterruptedException { try { String line = value.toString(); // 转换json v = gs.fromJson(line,MoiveBean.class); k.set(v.getUid() + " "); context.write(k, v); } catch (Exception e) { // TODO: handle exception } } } static class HitTopNReducer extends Reducer<Text, MoiveBean, Text, IntWritable>{ Map<String,Integer> map = new HashMap<>(); @Override protected void reduce(Text key, Iterable<MoiveBean> values, Reducer<Text, MoiveBean, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (MoiveBean moiveBean : values) { count++; } map.put(key.toString(), count); } // cleanup只执行一次 @Override protected void cleanup(Reducer<Text, MoiveBean, Text, IntWritable>.Context context) throws IOException, InterruptedException { // map转entry Set<Entry<String,Integer>> entrySet = map.entrySet(); // 再转list ArrayList<Entry<String,Integer>> list = new ArrayList<>(entrySet); // 排序,重写compare方法 Collections.sort(list, new Comparator<Entry<String,Integer>>(){ @Override public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) { return o2.getValue() - o1.getValue(); } }); // 输出前5个 for (int i = 0; i < Math.min(5, list.size()); i++) { Entry<String,Integer> entry = list.get(i); context.write(new Text(entry.getKey()), new IntWritable(entry.getValue())); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); Job job = Job.getInstance(configuration); // 指定map和reduce的类 job.setMapperClass(HitTopNMapper.class); job.setReducerClass(HitTopNReducer.class); // map 输出k-v job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MoiveBean.class); // reduce 输出k-v job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 输入数据文件路经 FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/rating.json")); //输出数据路径 FileOutputFormat.setOutputPath(job, new Path("d:/data/hittopn")); // 设置reduce数量 // job.setNumReduceTasks(2); // 将任务提交,默认在本地运行true将job执行消息打印在控制台上。 job.waitForCompletion(true); } }
-
这里执行是一个reduce,而如果设置2个reduce,会得到2个文件的结果,只是针对当前reduce进行前5名排序。也就是cleanup只是对当前reduce排序。如果要求出的最终前5名,需要在这多个reduce中再进行排序。
5.统计每个单词在每个文件中出现次数
-
我们现在有三个文件:a.html,b.html,c.html
a.html ---> hello 1, hello 1---> hello 2 b.html ---> hello 1, hello1 ---> hello 2 c.html ---> hello 1 ---> hello 1
-
这里首先通过第一个mapreduce处理来获取每个文件的每个单词的次数。然后再以第一个mapreduce输出文件为第二个mapreduce输入文件,进行聚合操作:
- index1.java
package index; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class index1 { static class Index1Mapper extends Mapper<LongWritable,Text,Text,IntWritable>{ String fileName = null; // 获取当前任务处理文件名 @Override protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 获取文件名 FileSplit f = (FileSplit)context.getInputSplit(); fileName = f.getPath().getName(); } Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word:words) { k.set(word + "-" + fileName); v.set(1); context.write(k, v);//获得: hello-a.html 1 } } } static class Index1Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count ++; } v.set(count); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); Job job = Job.getInstance(configuration); // 指定map和reduce的类 job.setMapperClass(Index1Mapper.class); job.setReducerClass(Index1Reducer.class); // map 输出k-v job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // reduce 输出k-v job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 输入数据文件路经 FileInputFormat.setInputPaths(job, new Path("d:/data/index/input")); //输出数据路径 FileOutputFormat.setOutputPath(job, new Path("d:/data/index/output")); // 设置reduce数量 job.setNumReduceTasks(2); // 将任务提交,默认在本地运行true将job执行消息打印在控制台上。 job.waitForCompletion(true); } }
- index2.java
package index; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import MovieDemo.MoiveBean; import index.index1.Index1Mapper; import index.index1.Index1Reducer; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; // hello-a.html 4 ---> 输出: hello b.html -->2 public class index2 { static class Index2Mapper extends Mapper<LongWritable,Text,Text,Text>{ Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); // reduce产生k-v之间使用 分割的 String[] split = line.split(" "); String word = split[0].split("-")[0]; String fileName = split[0].split("-")[1]; String count = split[1]; k.set(word); v.set(fileName + "-->" + count); context.write(k, v); } } // 拿到数据:hello b.html-->2 static class Index2Reducer extends Reducer<Text, Text, Text, Text>{ Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text text : values) { String v = text.toString(); sb.append(v + " "); } v.set(sb.toString().trim()); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); Job job = Job.getInstance(configuration); // 指定map和reduce的类 job.setMapperClass(Index2Mapper.class); job.setReducerClass(Index2Reducer.class); // map 输出k-v job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // reduce 输出k-v job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 输入数据文件路经 上一步输出结果为第二步输入结果 FileInputFormat.setInputPaths(job, new Path("d:/data/index/output")); //输出数据路径 FileOutputFormat.setOutputPath(job, new Path("d:/data/index/output2")); // 设置reduce数量 job.setNumReduceTasks(2); // 将任务提交,默认在本地运行true将job执行消息打印在控制台上。 job.waitForCompletion(true); } }
6.mapreduce内部处理数据流程
- 在读取任务之前先划分任务切片,根据输入路径文件和文件大小,
Map处理:
FileInputFormat
TextInputForamt
LineRecordReader
getCurrentKey-> LongWritable
此处while循环nextKeyValue方法判断是否有下一个k,v
在循环体内是一个map(ki,vi,ko,vo)方法,执行完数据然后context.write(ko,vo) 将数据写出,写到缓存中数组中MapOutputBuffer,然后通过collect(k,v,p)进行分区,而缓存数组是一个环形,当环形数组(100M byte[])就不往里写了,当环形数组Spill溢出(写)之前对数据进行哈希分区(hashPartition)和分区排序(sort快排)。将溢出的数据进行归并和排序(Merger),归并后数据在同一个数组中,只不过根据不同分区进行标识,将数据以二进制方式写入到本地磁盘中(SequenceFileOutputFormat,根据分区写入到不同文件,临时文件缓冲),
getCurrentValue ->Text
Reduce处理:
通过shuffle下载,Facher下载将同一分区的数据进行归并和排序(Merger,sort),然后进行分组比较(GroupingPartiton comparator(prek,nkey),比较前一个key和后一个key是否相同,如果相同key在一个Iterator中),然后reducer(key,iters)遍历,并进行聚合。然后通过context.write(ko,vo)写出操作(TextFileOutputFormat ),write(ko,vo)写入到文件夹(part_0000)。
a 6
b 4
c 3