zoukankan      html  css  js  c++  java
  • 多个map对应一个reduce记录

    /* 说明,对于数据来源的格式不一致,获取源头一处问题,我们采取多端的输入,一个reduce的处理*/

    按单词统计来说

    第一步 :App

    /**
    *把多个map封装到job中,启动job
    */
    public class WCApp {
    public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    conf.set("fs.defaultFS", "file:///");

    Job job = Job.getInstance(conf);

    //设置job的各种属性
    job.setJobName("WCAppMulti"); //作业名称
    job.setJarByClass(WCApp.class); //搜索类

    //多个输入
    MultipleInputs.addInputPath(job,new Path("file:///d:/mr/txt"),TextInputFormat.class, WCTextMapper.class);
    MultipleInputs.addInputPath(job,new Path("file:///d:/mr/seq"), SequenceFileInputFormat.class,WCSeqMapper.class);

    //设置输出
    FileOutputFormat.setOutputPath(job,new Path(args[0]));

    job.setReducerClass(WCReducer.class); //reducer类
    job.setNumReduceTasks(3); //reduce个数

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class); //

    job.waitForCompletion(true);
    }
    }

    第二步:map的处理

    /**

    *文本类型的文件的读取
    * WCTextMapper
    */
    public class WCSeqMapper extends Mapper<IntWritable, Text, Text, IntWritable>{
    protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {
    Text keyOut = new Text();
    IntWritable valueOut = new IntWritable();
    String[] arr = value.toString().split(" ");
    for(String s : arr){
    keyOut.set(s);
    valueOut.set(1);
    context.write(keyOut,valueOut);
    }
    }

    }

    /**

    * hadoop的压缩文件读取
    *SeqMapper
    */
    public class WCSeqMapper extends Mapper<IntWritable, Text, Text, IntWritable>{
    protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {
    Text keyOut = new Text();
    IntWritable valueOut = new IntWritable();
    String[] arr = value.toString().split(" ");
    for(String s : arr){
    keyOut.set(s);
    valueOut.set(1);
    context.write(keyOut,valueOut);
    }
    }
    }

    第三步:对读取的数据集进行聚集

    /**
    * Reducer
    */
    public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    /**
    * reduce
    */
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int count = 0 ;
    for(IntWritable iw : values){
    count = count + iw.get() ;
    }
    String tno = Thread.currentThread().getName();
    System.out.println(tno + " : MaxTempReducer :" + key.toString() + "=" + count);
    context.write(key,new IntWritable(count));
    }
    }

  • 相关阅读:
    MySQL 获得当前日期时间 函数
    Jquery 将表单序列化为Json对象
    Eclipse远程调试(远程服务器端监听)
    使用Eclipse进行远程调控
    Java基础教程(3)--回顾HelloWorld
    Java基础教程(2)--Java开发环境
    Java基础教程(1)--概述
    4.9上机
    4.2上机
    第四周作业
  • 原文地址:https://www.cnblogs.com/zyanrong/p/10828794.html
Copyright © 2011-2022 走看看