zoukankan      html  css  js  c++  java
  • 多输入的wordcount

    1、处理序列的mapper

    package com.cr.hdfs;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class Seqmap extends Mapper<LongWritable,Text,Text,IntWritable> {
    
        /**
         * WordCountMapper 处理文本为<k,v>对
         * @param key 每一行字节数的偏移量
         * @param value 每一行的文本
         * @param context 上下文
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text keyOut = new Text();
            IntWritable valueout = new IntWritable();
            String[] arr = value.toString().split(" ");
            for(String s : arr){
                keyOut.set(s);
                valueout.set(1);
                context.write(keyOut,valueout);
            }
            System.out.println("come into mapper...");
    
        }
    }
    

    2、处理文本的mapper

    package com.cr.hdfs;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class Textmap extends Mapper<IntWritable,Text,Text,IntWritable> {
    
        /**
         * WordCountMapper 处理文本为<k,v>对
         * @param key 每一行字节数的偏移量
         * @param value 每一行的文本
         * @param context 上下文
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text keyOut = new Text();
            IntWritable valueout = new IntWritable();
            String[] arr = value.toString().split(" ");
            for(String s : arr){
                keyOut.set(s);
                valueout.set(1);
                context.write(keyOut,valueout);
            }
            System.out.println("come into mapper...");
    
        }
    }
    

    3、reducer聚合

    package com.cr.hdfs;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            System.out.println("come into reduce...");
            int count = 0;
            for(IntWritable iw : values){
                count += iw.get();
            }
    
            //获取当前线程
            String tno = Thread.currentThread().getName();
            System.out.println("线程==>"+ tno + "===>  reducer ===>  " + key.toString() + "===>" + count);
            context.write(key,new IntWritable(count));
    
        }
    }
    

    4、wordcountApp(多输入)

    package com.cr.hdfs;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    
    /**
     * wordcount单词统计 多个输入
     */
    public class wordcount1 {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //单例作业
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
    
            //设置job的各种属性
            job.setJobName("wordcountAPP");                 //设置job名称
            job.setJarByClass(wordcount1.class);              //设置搜索类
    
            //多个输入
            MultipleInputs.addInputPath(job,new Path("file:///D:/wordcout/text/1.txt"), TextInputFormat.class,Textmap.class);
            MultipleInputs.addInputPath(job,new Path("file:///D:/wordcout/seq/1.seq"), SequenceFileInputFormat.class,Seqmap.class);
    
            //设置输出
            FileOutputFormat.setOutputPath(job,new Path("file:///D:/wordcout/out"));
    
            job.setReducerClass(reduce.class);               //设置reduecer类
            job.setNumReduceTasks(3);                         //设置reduce个数
    
            job.setMapOutputKeyClass(Text.class);            //设置之map输出key
            job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
            job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
            job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
            job.waitForCompletion(true);
        }
    }
    欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
  • 相关阅读:
    USACO Milk2 区间合并
    Codeforces 490B Queue【模拟】
    HDU 3974 Assign the task 简单搜索
    HDU 5119 Happy Matt Friends(2014北京区域赛现场赛H题 裸背包DP)
    Cin、Cout 加快效率方法
    POJ 1159 回文LCS滚动数组优化
    POJ 2479 不相交最大子段和
    POJ 1458 最长公共子序列 LCS
    在阿里最深刻的,还是职场之道给我的震撼
    精细化
  • 原文地址:https://www.cnblogs.com/flyingcr/p/10326968.html
Copyright © 2011-2022 走看看