求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。具体原理如下图所示:
系统环境
Linux Centos 7
jdk 1.8
hadoop-3.2
IDEA代码编译器
实验任务:
求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。具体原理如下图所示:
商品分类 商品点击次数 52127 5 52120 93 52092 93 52132 38 52006 462 52109 28 52109 43 52132 0 52132 34 52132 9 52132 30 52132 45 52132 24 52009 2615 52132 25 52090 13 52132 6 52136 0 52090 10 52024 347
要求使用mapreduce统计出每类商品的平均点击次数。
结果数据如下:
商品分类 商品平均点击次数 52006 462 52009 2615 52024 347 52090 11 52092 93 52109 35 52120 93 52127 5 52132 23 52136 0
步骤:
1.切换到指定目录,启动集群
开启hadoop集群,本地安装的为高可用主从二节点的hadoop集群,集成了各项大数据组件。
先开启zookeeper,再开启hdfs,再开启yarn。
倘若本地安装的是普通分布式或伪分布式集群,直接./start-all.sh启动集群即可。
2.在linux将数据集上传到hdfs中
hadoop fs -mkdir -p /mymapreduce4/in hadoop fs -put /data/mapreduce4/goods_click /mymapreduce4/in
10181 1000481 2010-04-04 16:54:31 20001 1001597 2010-04-07 15:07:52 20001 1001560 2010-04-07 15:08:27 20042 1001368 2010-04-08 08:20:30 20067 1002061 2010-04-08 16:45:33 20056 1003289 2010-04-12 10:50:55 20056 1003290 2010-04-12 11:57:35 20056 1003292 2010-04-12 12:05:29 20054 1002420 2010-04-14 15:24:12 20055 1001679 2010-04-14 19:46:04 20054 1010675 2010-04-14 15:23:53 20054 1002429 2010-04-14 17:52:45 20076 1002427 2010-04-14 19:35:39 20054 1003326 2010-04-20 12:54:44 20056 1002420 2010-04-15 11:24:49 20064 1002422 2010-04-15 11:35:54 20056 1003066 2010-04-15 11:43:01 20056 1003055 2010-04-15 11:43:06 20056 1010183 2010-04-15 11:45:24 20056 1002422 2010-04-15 11:45:49 20056 1003100 2010-04-15 11:45:54 20056 1003094 2010-04-15 11:45:57 20056 1003064 2010-04-15 11:46:04 20056 1010178 2010-04-15 16:15:20 20076 1003101 2010-04-15 16:37:27 20076 1003103 2010-04-15 16:37:05 20076 1003100 2010-04-15 16:37:18 20076 1003066 2010-04-15 16:37:31 20054 1003103 2010-04-15 16:40:14 20054 1003100 2010-04-15 16:40:16
3.创建java工程,将jar包导入进去
为了避免版本冲突,和不必要的麻烦,可将hadoop目录下share/hadoop文件中的所有jar包导入进去。
Mapper代码<<<<
public static class Map extends Mapper<Object , Text , Text , IntWritable>{ private static Text newKey=new Text(); //实现map函数 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ // 将输入的纯文本文件的数据转化成String String line=value.toString(); System.out.println(line); String arr[]=line.split(" "); newKey.set(arr[0]); int click=Integer.parseInt(arr[1]); context.write(newKey, new IntWritable(click)); } }
Reduce代码<<<<<
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ //实现reduce函数 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int num=0; int count=0; for(IntWritable val:values){ num+=val.get(); //每个元素求和num count++; //统计元素的次数count } int avg=num/count; //计算平均数 context.write(key,new IntWritable(avg)); } }
完整代码如下:
package mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyAverage{ public static class Map extends Mapper<Object , Text , Text , IntWritable>{ private static Text newKey=new Text(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split(" "); newKey.set(arr[0]); int click=Integer.parseInt(arr[1]); context.write(newKey, new IntWritable(click)); } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int num=0; int count=0; for(IntWritable val:values){ num+=val.get(); count++; } int avg=num/count; context.write(key,new IntWritable(avg)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job =new Job(conf,"MyAverage"); job.setJarByClass(MyAverage.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/mymapreduce4/in/goods_click"); Path out=new Path("hdfs://localhost:9000/mymapreduce4/out"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
4.执行
执行方式有两种
①直接在本地运行,前提要在本地配置好hadoop环境变量,直接运行即可。
②将此文件打包成jar包,上传到linux中再,用命令运行。
hadoop jar /apps/hadoop/hadoop-mapreduce.jar wordcount /in /out
查看运行结果:
hadoop fs -ls /mymapreduce4/out hadoop fs -cat /mymapreduce4/out/part-r-00000