zoukankan      html  css  js  c++  java
  • mapreduce (四) MapReduce实现Grep+sort

    1.txt
    dong xi cheng
    xi dong cheng
    wo ai beijing
    tian an men
    qiche
    dong
    dong
    dong
    2.txt
    dong xi cheng
    xi dong cheng
    wo ai beijing
    tian an men
    qiche
    dong
    dong
    dong
    
    import java.io.IOException;
    import java.util.Random;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
    import org.apache.hadoop.mapreduce.lib.map.RegexMapper;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
    
    public class IGrep {
    
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
    
            String dir_in = "hdfs://localhost:9000/input_grep";
            String dir_out = "hdfs://localhost:9000/output_grep";
            String reg = ".ng";//匹配三个字符的字符串,且以ng结尾。
    
            conf.set(RegexMapper.PATTERN, reg);
            conf.setInt(RegexMapper.GROUP, 0);
    
            Path in = new Path(dir_in);
            Path tmp = new Path("grep-temp-"
                    + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
            Path out = new Path(dir_out);
    
            try {
                Job grepJob = new Job(conf, "grep-search");
    
                grepJob.setJarByClass(IGrep.class);
    
                grepJob.setInputFormatClass(TextInputFormat.class);
                grepJob.setMapperClass(RegexMapper.class);
                grepJob.setCombinerClass(LongSumReducer.class);
                grepJob.setPartitionerClass(HashPartitioner.class);
    
                grepJob.setMapOutputKeyClass(Text.class);
                grepJob.setMapOutputValueClass(LongWritable.class);
                FileInputFormat.addInputPath(grepJob, in);
    
                grepJob.setReducerClass(LongSumReducer.class);
                // job.setNumReduceTasks(1);
                grepJob.setOutputFormatClass(SequenceFileOutputFormat.class);
    
                grepJob.setOutputKeyClass(Text.class);
                grepJob.setOutputValueClass(LongWritable.class);
                FileOutputFormat.setOutputPath(grepJob, tmp);
    
                grepJob.waitForCompletion(true);
    
                Job sortJob = new Job(conf, "grep-sort");
    
                sortJob.setJarByClass(IGrep.class);
    
                sortJob.setInputFormatClass(SequenceFileInputFormat.class);
                sortJob.setMapperClass(InverseMapper.class);
                FileInputFormat.addInputPath(sortJob, tmp);
    
                sortJob.setNumReduceTasks(1);【全局排序】
                sortJob.setSortComparatorClass(LongWritable.DecreasingComparator.class);//逆序
    
                FileOutputFormat.setOutputPath(sortJob, out);
    
                sortJob.waitForCompletion(true);
                
            } finally {
                FileSystem.get(conf).delete(tmp, true);
            }
        }
    }


    输出结果:
    10    ong
    4    eng
    2    ing


  • 相关阅读:
    序列终结者
    CF696C PLEASE
    [清华集训]Rmq Problem / mex
    CF786B Legacy
    链表结构详解
    synchronized同步方法和同步代码块的区别
    关于守护线程定义
    线程的优先级
    mysql查询当天的数据
    java以正确的方式停止线程
  • 原文地址:https://www.cnblogs.com/i80386/p/3598864.html
Copyright © 2011-2022 走看看