zoukankan      html  css  js  c++  java
  • MapReduce Java API-多输入路径方式

    场景

    MapReduce Java API实例-统计单词出现频率:

    https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119410169

    在上面实现统计单次出现的频率的基础上。

    数据集只是单路径,如果有多个数据集文件,即有多个txt文件,要怎么实现。

    多文件输入采用MultipleInputs.addInputPath方法即可完成。

    注:

    博客:
    https://blog.csdn.net/badao_liumang_qizhi
    关注公众号
    霸道的程序猿
    获取编程相关电子书、教程推送与免费下载。

    实现

    map和reduce的代码基本和上面的一致

    1、map类

    package com.badao.multinput;
    
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    public class MultInputMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        //1、编写map函数,通过继承Mapper类实现里面的map函数
        //   Mapper类当中的第一个函数是Object,也可以写成Long
        //   第一个参数对应的值是行偏移量
    
        //2、第二个参数类型通常是Text类型,Text是Hadoop实现的String 类型的可写类型
        //   第二个参数对应的值是每行字符串
    
        //3、第三个参数表示的是输出key的数据类型
    
        //4、第四个参数表示的是输出value的数据类型,IntWriable 是Hadoop实现的int类型的可写数据类型
    
        public final static IntWritable one = new IntWritable(1);
        public Text word = new Text();
    
        //key 是行偏移量
        //value是每行字符串
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
            while (stringTokenizer.hasMoreTokens())
            {
                //stringTokenizer.nextToken()是字符串类型,使用set函数完成字符串到Text数据类型的转换
                word.set(stringTokenizer.nextToken());
                //通过write函数写入到本地文件
                context.write(word,one);
            }
        }
    }

    2、reduce类

    package com.badao.multinput;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    
    //第一个参数类型是输入值key的数据类型,map中间输出key的数据类型
    //第二个参数类型是输入值value的数据类型,map中间输出value的数据类型
    //第三个参数类型是输出值key的数据类型,他的数据类型要跟job.setOutputKeyClass(Text.class) 保持一致
    //第四个参数类型是输出值value的数据类型,它的数据类型要跟job.setOutputValueClass(IntWriable.class) 保持一致
    
    public class MultInputReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    
        public IntWritable result = new IntWritable();
    
    
        //key就是单词  values是单词出现频率列表
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for(IntWritable val:values)
            {
                //get就是取出IntWriable的值
                sum += val.get();
            }
            result.set(sum);
            context.write(key,result);
        }
    }

    3、job类

    job这里不同,单路径时

    FileInputFormat.addInputPath(job,new Path("D:\words.txt"));

    多路径时

            Path path1 = new Path("D:\words.txt");
            Path path2 = new Path("D:\words2.txt");
    
            MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class);
            MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class);

    完整代码

    package com.badao.multinput;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
    
    import java.io.IOException;
    
    public class MultInputJob {
        public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
            wordCountLocal();
        }
    
        public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException
        {
            Configuration conf = new Configuration();
            //实例化一个作业,word count是作业的名字
            Job job = Job.getInstance(conf, "multinputwordcount");
            //指定通过哪个类找到对应的jar包
            job.setJarByClass(MultInputJob.class);
    
            //为job设置Mapper类
            job.setMapperClass(MultInputMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            //为job设置reduce类
            job.setReducerClass(MultInputReducer.class);
    
            //为job的输出数据设置key类
            job.setOutputKeyClass(Text.class);
            //为job输出设置value类
            job.setOutputValueClass(IntWritable.class);
    
            //多个输入路径
            Path path1 = new Path("D:\words.txt");
            Path path2 = new Path("D:\words2.txt");
    
            MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class);
            MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class);
    
            //为job设置输出路径
            FileOutputFormat.setOutputPath(job,new Path("D:\mulinputOut"));
            job.waitForCompletion(true);
        }
    
    }

    运行job类查看效果

    博客园: https://www.cnblogs.com/badaoliumangqizhi/ 关注公众号 霸道的程序猿 获取编程相关电子书、教程推送与免费下载。
  • 相关阅读:
    zeromq学习记录(五)vc下多线程
    zeromq学习记录(七)订阅发布消息封装
    Mozilla研究—从输入URL到显示内容的基本过程
    Mozilla研究—深入理解mozilla所需的背景知识
    Mozilla研究—组件加载机制
    Mozilla研究—传输协议
    Mozilla研究—mozilla中的设计亮点
    GTK+主循环(main loop)的工作原理
    Mozilla研究—mozilla能为我们做什么
    Mozilla研究—组件的创建过程
  • 原文地址:https://www.cnblogs.com/badaoliumangqizhi/p/15108177.html
Copyright © 2011-2022 走看看