zoukankan      html  css  js  c++  java
  • Hadoop基础-Map端链式编程之MapReduce统计TopN示例

             Hadoop基础-Map端链式编程之MapReduce统计TopN示例

                                        作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.项目需求

      对“temp.txt”中的数据进行分析,统计出各个年份(第15~19列)总排行前十的最高气温(第87~92列),由于博客园无法上传大文件的文本,因此我把该文本的内容放在博客园的另一个链接了(需要的戳我)。,如果网页打不开的话也就可以去百度云盘里下载副本,链接:链接:https://pan.baidu.com/s/12aZFcO2XoegUGMAbS--n6Q 密码:7n91。

    二.代码实现

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.io.WritableComparable;
     9 
    10 import java.io.DataInput;
    11 import java.io.DataOutput;
    12 import java.io.IOException;
    13 
    14 public class CompKey implements WritableComparable<CompKey> {
    15 
    16     private String year;
    17     private int temp;
    18     /**
    19      * 重写CompKey对年份和气温排序
    20      */
    21     public int compareTo(CompKey o) {
    22         if(this.getYear().equals(o.getYear())){
    23             return o.getTemp() - this.getTemp();
    24         }
    25         return this.getYear().compareTo(o.getYear());
    26 
    27     }
    28 
    29     public void write(DataOutput out) throws IOException {
    30         out.writeUTF(year);
    31         out.writeInt(temp);
    32 
    33     }
    34 
    35     public void readFields(DataInput in) throws IOException {
    36         year = in.readUTF();
    37         temp = in.readInt();
    38 
    39     }
    40 
    41     public String getYear() {
    42         return year;
    43     }
    44 
    45     public void setYear(String year) {
    46         this.year = year;
    47     }
    48 
    49     public int getTemp() {
    50         return temp;
    51     }
    52 
    53     public void setTemp(int temp) {
    54         this.temp = temp;
    55     }
    56 
    57     @Override
    58     public String toString() {
    59         return year + '	' +temp ;
    60     }
    61 }
    CompKey.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.io.WritableComparable;
     9 import org.apache.hadoop.io.WritableComparator;
    10 
    11 public class MyGroupComparator extends WritableComparator {
    12 
    13     public MyGroupComparator() {
    14         super(CompKey.class,true);
    15     }
    16 
    17     public int compare(WritableComparable a, WritableComparable b) {
    18         CompKey ck1 = (CompKey) a;
    19         CompKey ck2 = (CompKey) b;
    20         return ck1.getYear().compareTo(ck2.getYear());
    21     }
    22 }
    MyGroupComparator.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.LongWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 
    13 import java.io.IOException;
    14 
    15 public class ChainMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
    16 
    17     @Override
    18     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    19 
    20         String line = value.toString();
    21 
    22         //得到年份
    23         String year = line.substring(15, 19);
    24 
    25         //得到气温
    26         int temp = Integer.parseInt(line.substring(87, 92));
    27 
    28         context.write(new Text(year), new IntWritable(temp));
    29 
    30     }
    31 }
    ChainMapper1.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 
    13 import java.io.IOException;
    14 
    15 public class ChainMapper2 extends Mapper<Text,IntWritable,CompKey,NullWritable> {
    16 
    17     @Override
    18     protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
    19 
    20         int i = value.get();
    21 
    22         if( i != 9999){
    23             CompKey ck = new CompKey();
    24             ck.setYear(key.toString());
    25             ck.setTemp(i);
    26             context.write(ck,NullWritable.get());
    27         }
    28     }
    29 }
    ChainMapper2.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 
    13 import java.io.IOException;
    14 import java.util.Iterator;
    15 
    16 public class ChainReducer1 extends Reducer<CompKey, NullWritable, Text, IntWritable> {
    17 
    18     //由于分组对比器设定,相同的year放在一个分组,因此,在一个reduce循环中,得到的数据均为同一年份的数据
    19     protected void reduce(CompKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    20         String year = key.getYear();
    21         Iterator<NullWritable> it = values.iterator();
    22         int i = 0;
    23         while (it.hasNext()){
    24             System.out.println(key.toString());
    25             int temp = key.getTemp();
    26             context.write(new Text(year), new IntWritable(temp));
    27             it.next();
    28             i++;
    29             if(i >= 10){
    30                 break;
    31             }
    32         }
    33     }
    34 }
    ChainReducer1.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.io.IntWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Mapper;
    11 
    12 import java.io.IOException;
    13 
    14 public class ChainReducer2 extends Mapper<Text, IntWritable, Text,IntWritable> {
    15     protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
    16         int temp = value.get();
    17         //取得奇数气温
    18         if( temp % 2 == 1 ){
    19             context.write(key, new IntWritable(temp));
    20         }
    21 
    22     }
    23 }
    ChainReducer2.java 文件内容
     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.mrchain;
     7 
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.fs.FileSystem;
    10 import org.apache.hadoop.fs.Path;
    11 import org.apache.hadoop.io.IntWritable;
    12 import org.apache.hadoop.io.LongWritable;
    13 import org.apache.hadoop.io.NullWritable;
    14 import org.apache.hadoop.io.Text;
    15 import org.apache.hadoop.mapreduce.Job;
    16 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
    17 import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
    18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    20 
    21 public class ChainApp {
    22 
    23 
    24     public static void main(String[] args) throws Exception {
    25 
    26         Configuration conf = new Configuration();
    27         conf.set("fs.defaultFS","file:///");
    28 
    29         Job job = Job.getInstance(conf);
    30 
    31         FileSystem fs = FileSystem.get(conf);
    32 
    33         job.setJobName("Chain");
    34 
    35         job.setJarByClass(ChainApp.class);
    36         //在MapChain中,一个Map后面可以跟n多Map
    37         ChainMapper.addMapper(job,ChainMapper1.class,LongWritable.class, Text.class,
    38                 Text.class, IntWritable.class,conf);
    39 
    40         ChainMapper.addMapper(job,ChainMapper2.class,Text.class,IntWritable.class,
    41                 CompKey.class,NullWritable.class,conf);
    42 
    43         //在ReduceChain中,一个Reduce中不能跟reduce,只能跟map
    44         ChainReducer.setReducer(job,ChainReducer1.class,CompKey.class,NullWritable.class,
    45                 Text.class,IntWritable.class,conf);
    46 
    47         ChainReducer.addMapper(job,ChainReducer2.class, Text.class, IntWritable.class,
    48                 Text.class,IntWritable.class, conf);
    49 
    50         job.setGroupingComparatorClass(MyGroupComparator.class);
    51 
    52         //判断是否存在,如果存在则删除
    53         Path outPath = new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\out");
    54         if(fs.exists(outPath)){
    55             fs.delete(outPath,true);
    56         }
    57 
    58         //输入路径
    59         FileInputFormat.addInputPath(job,new Path("D:\10.Java\IDE\yhinzhengjieData\MyHadoop\temp"));
    60 
    61         //输出路径
    62         FileOutputFormat.setOutputPath(job,outPath);
    63 
    64         job.waitForCompletion(true);
    65     }
    66 }
  • 相关阅读:
    Android_AsyncTask
    table隔行变色【转】
    添加对WCF的调用(内网状态下)。
    【转】IDEA 2017破解 license server激活
    C# LIst去重
    框架内事务的近期发现,以后再研究
    启动、停止、删除Windows服务
    软件项目总结中的经验总结
    iis最大连接数和队列长度
    在一个千万级的数据库查寻中,如何提高查询效率?
  • 原文地址:https://www.cnblogs.com/yinzhengjie/p/9333164.html
Copyright © 2011-2022 走看看