词频统计(word count)
- 一篇文章用哈希表统计即可
- 对互联网所有网页的词频进行统计(Google搜索引擎的需求),无法将所有网页读入内存
- map:将单词提取出来,对每个单词输入一个<word,1>这样的<k,v>对,进而将相同的数据放在一起,形成<word,<1,1,1,...>>这样的<k,v集合>
- reduce:将集合里的1求和,再将单词和这个和组成<word,sum>输出
- 一个map函数仅对一个HDFS数据块上的数据进行计算,从而实现大数据的分布式计算
- 在分布式集群中调度执行MapReduce程序的计算框架也叫MapReduce
WordCountMain.java

1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 7 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 8 9 public class WordCountMain { 10 public static void main(String[] args) throws Exception{ 11 12 Job job = Job.getInstance(new Configuration()); 13 job.setJarByClass(WordCountMain.class); 14 15 job.setMapperClass(WordCountMapper.class); 16 job.setOutputKeyClass(Text.class); 17 job.setMapOutputValueClass(IntWritable.class); 18 19 job.setReducerClass(WordCountReducer.class); 20 job.setOutputKeyClass(Text.class); 21 job.setMapOutputValueClass(IntWritable.class); 22 23 FileInputFormat.setInputPaths(job, new Path(args[0])); 24 FileOutputFormat.setOutputPath(job, new Path(args[1])); 25 } 26 }
WordCountMapper.java

1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 7 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ 8 9 @Override 10 protected void map(LongWritable key1, Text value1, Context context) 11 throws IOException, InterruptedException { 12 String data = value1.toString(); 13 String[] words = data.split(" "); 14 for(String w:words) { 15 context.write(new Text(w),new IntWritable(1)); 16 } 17 } 18 }
WordCountReducer.java

1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 // k3 v3 k4 v4 7 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 8 9 @Override 10 protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException { 11 int total = 0; 12 for(IntWritable v:v3) { 13 total = total + v.get(); 14 } 15 context.write(k3, new IntWritable(total)); 16 } 17 }
求部门工资总额
- SQL:select deptno,sum(sal) from emp gruop by deptno;
- 分析数据类型,套用模板重写map()、reduce()
- 导出jar包,指定main class
- 把数据保存在hdfs中
- hadoop jar s1.jar /input/emp.csv /output/0910/s1
SalaryTotalMain.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 10 public class SalaryTotalMain { 11 12 public static void main(String[] args) throws Exception { 13 //1、创建任务、指定任务的入口 14 Job job = Job.getInstance(new Configuration()); 15 job.setJarByClass(SalaryTotalMain.class); 16 17 //2、指定任务的map和map输出的数据类型 18 job.setMapperClass(SalaryTotalMapper.class); 19 job.setMapOutputKeyClass(IntWritable.class); 20 job.setMapOutputValueClass(IntWritable.class); 21 22 //3、指定任务的reducer和reducer输出的类型 23 job.setReducerClass(SalaryTotalReducer.class); 24 job.setOutputKeyClass(IntWritable.class); 25 job.setOutputValueClass(IntWritable.class); 26 27 //4、指定任务输入路径和输出路径 28 FileInputFormat.setInputPaths(job, new Path(args[0])); 29 FileOutputFormat.setOutputPath(job, new Path(args[1])); 30 31 //5、执行任务 32 job.waitForCompletion(true); 33 } 34 }
SalaryTotalMapper.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { 9 10 @Override 11 protected void map(LongWritable key1, Text value1,Context context) 12 throws IOException, InterruptedException { 13 // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 14 String data = value1.toString(); 15 16 //分词 17 String[] words = data.split(","); 18 19 //输出:k2 部门号 v2员工的工资 20 context.write(new IntWritable(Integer.parseInt(words[7])), 21 new IntWritable(Integer.parseInt(words[5]))); 22 } 23 }
SalaryTotalReducer.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class SalaryTotalReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 7 8 @Override 9 protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context) 10 throws IOException, InterruptedException { 11 // 求v3求和 12 int total = 0; 13 for(IntWritable v:v3){ 14 total = total + v.get(); 15 } 16 17 //输出 k4 部门号 v4是部门的工资总额 18 context.write(k3, new IntWritable(total)); 19 } 20 21 }
数据去重
- select distinct job from emp;
- 用MR实现
- 只有Mapper没有Reducer(排序)
DistinctMain.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.NullWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 12 public class DistinctMain { 13 14 public static void main(String[] args) throws Exception { 15 //1、创建一个任务 16 Job job = Job.getInstance(new Configuration()); 17 job.setJarByClass(DistinctMain.class); //任务的入口 18 19 //2、指定任务的map和map输出的数据类型 20 job.setMapperClass(DistinctMapper.class); 21 job.setMapOutputKeyClass(Text.class); //k2的数据类型 22 job.setMapOutputValueClass(NullWritable.class); //v2的类型 23 24 //3、指定任务的reduce和reduce的输出数据的类型 25 job.setReducerClass(DistinctReducer.class); 26 job.setOutputKeyClass(Text.class); //k4的类型 27 job.setOutputValueClass(NullWritable.class); //v4的类型 28 29 //4、指定任务的输入路径、任务的输出路径 30 FileInputFormat.setInputPaths(job, new Path(args[0])); 31 FileOutputFormat.setOutputPath(job, new Path(args[1])); 32 33 //5、执行任务 34 job.waitForCompletion(true); 35 } 36 }
DistinctMapper.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.NullWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 // k2 职位job 9 public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> { 10 11 @Override 12 protected void map(LongWritable key1, Text value1, Context context) 13 throws IOException, InterruptedException { 14 //数据:7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30 15 String data = value1.toString(); 16 17 //分词 18 String[] words = data.split(","); 19 20 //输出:把职位job作为key2 21 context.write(new Text(words[2]), NullWritable.get()); 22 } 23 }
DistinctReducer.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.NullWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 8 9 @Override 10 protected void reduce(Text k3, Iterable<NullWritable> v3,Context context) throws IOException, InterruptedException { 11 // 直接把k3输出即可 12 context.write(k3, NullWritable.get()); 13 } 14 15 }
多表查询
- select dname,ename from dept, emp where emp.deptno=dept.deptno;
- 关系型数据库的子查询会转换成多表查询(通过执行计划看SQL语句的执行过程和效率)
- 笛卡尔积:列数相加,行数相乘,得到全集
- 用连接条件(如 emp.deptno=dept.deptno)去掉全集中的错误数据
- 连接条件至少N-1个(N为表的个数)
- 根据连接条件不同,分为:
- 等值连接 / 不等值连接
- 外连接 / 自连接
- 分析输入输出-->确定数据类型-->写mapper和reducer程序
- 通过MR实现连接
EqualJoinMain.java

1 package day0917.mr.equaljoin; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.NullWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 14 public class EqualJoinMain { 15 16 public static void main(String[] args) throws Exception { 17 //1、创建一个任务 18 Job job = Job.getInstance(new Configuration()); 19 job.setJarByClass(EqualJoinMain.class); //任务的入口 20 21 //2、指定任务的map和map输出的数据类型 22 job.setMapperClass(EqualJoinMapper.class); 23 job.setMapOutputKeyClass(IntWritable.class); //k2的数据类型 24 job.setMapOutputValueClass(Text.class); //v2的类型 25 26 //3、指定任务的reduce和reduce的输出数据的类型 27 job.setReducerClass(EqualJoinReducer.class); 28 job.setOutputKeyClass(Text.class); //k4的类型 29 job.setOutputValueClass(Text.class); //v4的类型 30 31 //4、指定任务的输入路径、任务的输出路径 32 FileInputFormat.setInputPaths(job, new Path(args[0])); 33 FileOutputFormat.setOutputPath(job, new Path(args[1])); 34 35 //5、执行任务 36 job.waitForCompletion(true); 37 38 } 39 40 }
EqualJoinMapper.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> { 9 10 @Override 11 protected void map(LongWritable key1, Text value1, Context context) 12 throws IOException, InterruptedException { 13 //数据可能是部门,也可能是员工 14 String data = value1.toString(); 15 16 //分词 17 String[] words = data.split(","); 18 19 //判断数组的长度 20 if(words.length == 3){ 21 //得到是部门数据:部门号 部门名称 22 context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1])); 23 }else{ 24 //员工数据 : 员工的部门号 员工的姓名 25 context.write(new IntWritable(Integer.parseInt(words[7])), new Text(words[1])); 26 } 27 28 } 29 }
EqualJoinReducer.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 public class EqualJoinReducer extends Reducer<IntWritable, Text, Text, Text> { 8 9 @Override 10 protected void reduce(IntWritable k3, Iterable<Text> v3, Context context) 11 throws IOException, InterruptedException { 12 // 处理v3:可能是部门名称、也可能是员工的姓名 13 String dname = ""; 14 String empNameList = ""; 15 16 for(Text value:v3){ 17 String str = value.toString(); 18 //判断是否存在* 19 int index = str.indexOf("*"); 20 if(index >= 0){ 21 //代表是部门的名称 22 dname = str.substring(1); 23 }else{ 24 //代表是员工的名称 25 empNameList = str + ";" + empNameList; 26 } 27 } 28 29 //输出 30 context.write(new Text(dname), new Text(empNameList)); 31 } 32 33 }
自连接
- 通过表的别名,将同一张表视为多张表
- 一个人是下级的老板,同时是上级的员工
- 同一条数据输出两次,一次作为老板,一次作为员工(看作两张表)
- 相同的k2,其value2会被同一个reducer处理
- 存在非法数据需进行清洗(如把大老板的老板编号置为-1)
- 老板和员工同时存在才输出(员工树的根节点和叶子节点不显示)
SelfJoinMain.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 public class SelfJoinMain { 12 13 public static void main(String[] args) throws Exception { 14 //1、创建一个任务 15 Job job = Job.getInstance(new Configuration()); 16 job.setJarByClass(SelfJoinMain.class); //任务的入口 17 18 //2、指定任务的map和map输出的数据类型 19 job.setMapperClass(SelfJoinMapper.class); 20 job.setMapOutputKeyClass(IntWritable.class); //k2的数据类型 21 job.setMapOutputValueClass(Text.class); //v2的类型 22 23 //3、指定任务的reduce和reduce的输出数据的类型 24 job.setReducerClass(SelfJoinReducer.class); 25 job.setOutputKeyClass(Text.class); //k4的类型 26 job.setOutputValueClass(Text.class); //v4的类型 27 28 //4、指定任务的输入路径、任务的输出路径 29 FileInputFormat.setInputPaths(job, new Path(args[0])); 30 FileOutputFormat.setOutputPath(job, new Path(args[1])); 31 32 //5、执行任务 33 job.waitForCompletion(true); 34 35 } 36 }
SelfJoinMapper.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> { 9 10 @Override 11 protected void map(LongWritable key1, Text value1, Context context) 12 throws IOException, InterruptedException { 13 // 数据: 7566,JONES,MANAGER,7839,1981/4/2,2975,0,20 14 String data = value1.toString(); 15 16 //分词操作 17 String[] words = data.split(","); 18 19 //输出数据 20 //1、作为老板表 员工号 21 context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1])); 22 23 //2、作为员工表 老板的员工号 24 context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1])); 25 /* 26 * 注意一个问题:如果数据存在非法数据,一定处理一下(数据清洗) 27 * 如果产生例外,一定捕获 28 */ 29 } 30 }
SelfJoinReducer.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 public class SelfJoinReducer extends Reducer<IntWritable, Text, Text, Text> { 8 9 @Override 10 protected void reduce(IntWritable k3, Iterable<Text> v3, Context context) 11 throws IOException, InterruptedException { 12 //定义变量保存:老板的姓名、员工的姓名 13 String bossName = ""; 14 String empNameList = ""; 15 16 for(Text t:v3){ 17 String str = t.toString(); 18 19 //判断是否存在*号 20 int index = str.indexOf("*"); 21 if(index >= 0 ){ 22 //老板的姓名 23 bossName = str.substring(1); 24 }else{ 25 //员工的姓名 26 empNameList = str + ";" + empNameList; 27 } 28 } 29 30 //输出:如果存在老板,也存在员工,才进行输出 31 if(bossName.length() > 0 && empNameList.length() > 0) 32 context.write(new Text(bossName), new Text(empNameList)); 33 } 34 }
倒排索引
- 关系型数据库的索引
- 数据存储在HDFS后会建立索引,提高查找效率
- MR实现倒排索引
- 记录一个单词在一个文件中出现的次数
- Combiner对同一文件中重复出现的单词进行求和
- Reducer对不同文件中出现的单词进行汇总
- 保证有无Combiner前后数据类型一样
RevertedIndexMain.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 12 public class RevertedIndexMain { 13 14 public static void main(String[] args) throws Exception { 15 //1、创建一个任务 16 Job job = Job.getInstance(new Configuration()); 17 job.setJarByClass(RevertedIndexMain.class); //任务的入口 18 19 //2、指定任务的map和map输出的数据类型 20 job.setMapperClass(RevertedIndexMapper.class); 21 job.setMapOutputKeyClass(Text.class); //k2的数据类型 22 job.setMapOutputValueClass(Text.class); //v2的类型 23 24 //指定任务的Combiner 25 job.setCombinerClass(RevertedIndexCombiner.class); 26 27 //3、指定任务的reduce和reduce的输出数据的类型 28 job.setReducerClass(RevertedIndexReducer.class); 29 job.setOutputKeyClass(Text.class); //k4的类型 30 job.setOutputValueClass(Text.class); //v4的类型 31 32 //4、指定任务的输入路径、任务的输出路径 33 FileInputFormat.setInputPaths(job, new Path(args[0])); 34 FileOutputFormat.setOutputPath(job, new Path(args[1])); 35 36 //5、执行任务 37 job.waitForCompletion(true); 38 } 39 40 }
RevertedIndexMapper.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 7 8 public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { 9 10 @Override 11 protected void map(LongWritable key1, Text value1, Context context) 12 throws IOException, InterruptedException { 13 //数据:/indexdata/data01.txt 14 //得到对应文件名 15 String path = ((FileSplit)context.getInputSplit()).getPath().toString(); 16 17 //解析出文件名 18 //得到最后一个斜线的位置 19 int index = path.lastIndexOf("/"); 20 String fileName = path.substring(index+1); 21 22 //数据:I love Beijing and love Shanghai 23 String data = value1.toString(); 24 String[] words = data.split(" "); 25 26 //输出 27 for(String word:words){ 28 context.write(new Text(word+":"+fileName), new Text("1")); 29 } 30 } 31 }
RevertedIndexCombiner.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> { 7 8 @Override 9 protected void reduce(Text k21, Iterable<Text> v21, Context context) 10 throws IOException, InterruptedException { 11 // 求和:对同一个文件中的单词进行求和 12 int total = 0; 13 for(Text v:v21){ 14 total = total + Integer.parseInt(v.toString()); 15 } 16 17 //k21是:love:data01.txt 18 String data = k21.toString(); 19 //找到:冒号的位置 20 int index = data.indexOf(":"); 21 22 String word = data.substring(0, index); //单词 23 String fileName = data.substring(index + 1); //文件名 24 25 //输出: 26 context.write(new Text(word), new Text(fileName+":"+total)); 27 } 28 }
RevertedIndexReducer.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> { 7 8 @Override 9 protected void reduce(Text k3, Iterable<Text> v3, Context context) 10 throws IOException, InterruptedException { 11 String str = ""; 12 13 for(Text t:v3){ 14 str = "("+t.toString()+")"+str; 15 } 16 17 context.write(k3, new Text(str)); 18 } 19 20 }
Hadoop自带例题
- start-all.sh
- /root/training/hadoop-2.7.3/share/hadoop/mapreduce
- hadoop jar hadoop-mapreduce-examples-2.7.3.jar
- wordcount: A map/reduce program that counts the words in the input files.
- hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount
- Usage: wordcount <in> [<in>...] <out>
- hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input/data.txt /output/0402/wc
参考
https://dpb-bobokaoya-sm.blog.csdn.net/article/details/88984195