zoukankan      html  css  js  c++  java
  • MapReduce实例-倒排索引

    环境:
      Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境

      数据:任意数量、格式的文本文件(我用的四个.java代码文件)

    方案目标:

      根据提供的文本文件,提取出每个单词在哪个文件中出现了几次,组成倒排索引,格式如下

      Ant FaultyWordCount.java : 1 , WordCount.java : 1 

    思路:

      因为这个程序需要用到三个变量:单词、文件名、出现的频率,因此需要自定义Writable类,以单词为key,将文件名和出现的频率打包。

      1.先将每行文本的单词进行分割,以K/V=Word/Filename:1的格式分割。

      2.利用Combiner类,将本Map一个文件的先进行一次计数,减少传输量

      3.在Reduce中对Combiner中传输过来的同一个单词的在不同文件出现的频率数据进行组合。

    难点:这个程序主要是用到了一个Combiner和自定义了Writable类。在实现的时候,需要注意的是Writable必须默认无参构造函数。

    主调用Main类:

    package ren.snail;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Main extends Configured implements Tool {
    
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            int result = ToolRunner.run(new Configuration(), new Main(), args);
            System.exit(result);
        }
    
        @Override
        public int run(String[] arg0) throws Exception {
            // TODO Auto-generated method stub
            Configuration configuration = getConf();
            Job job = new Job(configuration, "InvertIndex");
            job.setJarByClass(Main.class);
            FileInputFormat.addInputPath(job, new Path(arg0[0]));
            FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
    
            job.setMapperClass(InvertMapper.class);
            job.setCombinerClass(Combinner.class);  //设置Combiner类
            job.setReducerClass(InvertReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FileFreqWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
    
    }
    View Code

    自定义Writbale类

    package ren.snail;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    
    public class FileFreqWritable implements Writable {
        Text documentID;
        IntWritable fequence;
    
        public FileFreqWritable()    //必须提供无参构造函数
        {
            this.documentID = new Text();
            this.fequence = new IntWritable();
        }
        public FileFreqWritable(Text id,IntWritable feq) {
            // TODO Auto-generated constructor stub
            this.documentID=id;
            this.fequence =feq;
        }
    
        public void set(String id,int feq)
        {
            this.documentID.set(id);
            this.fequence.set(feq);
        }
        
        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            documentID.readFields(in);
            fequence.readFields(in);
    
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            documentID.write(out);
            fequence.write(out);
        }
    
        public Text getDocumentID() {
            return documentID;
        }
    
        public String toString()
        {
            return documentID.toString()+" : "+fequence.get();
        }
        public IntWritable getFequence() {
            return fequence;
        }
    
         
    
    }
    View Code

    Map

    package ren.snail;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class InvertMapper extends Mapper<LongWritable, Text, Text, FileFreqWritable>{
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
        {
            String data = value.toString().replaceAll("[^a-zA-Z0-9]+", " ");   //将不需要的其他字符都设为空
            String[] values = data.split(" ");
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String filename = fileSplit.getPath().getName();
            for (String temp : values) {
                FileFreqWritable obj = new FileFreqWritable(new Text(filename),new IntWritable(1));
                context.write(new Text(temp), obj);
            }
            
        }
    }
    View Code

    Combiner

    package ren.snail;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class Combinner extends Reducer<Text, FileFreqWritable, Text, FileFreqWritable>{
        public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException
        {
            int count = 0 ;
            String id = "";
            for (FileFreqWritable temp : values) {
                count++;
                if(count == 1)
                {
                    id=temp.getDocumentID().toString();
                }
            }
            context.write(key,new FileFreqWritable(new Text(id), new IntWritable(count)));
        }
    }
    View Code

    Reduce

    package ren.snail;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class InvertReducer extends Reducer<Text, FileFreqWritable, Text, Text> {
    
        public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException {
            StringBuilder value = new StringBuilder();
            for (FileFreqWritable fileFreqWritable : values) {
                String temp = fileFreqWritable.toString();
                value.append(temp+" , ");
            }
            context.write(key,new Text(value.toString()));
        }
    }
    View Code

    其实我的Reduce实现思路可能有点问题,不过大致是这样

  • 相关阅读:
    如何优雅地结束线程的生命周期
    线程的interrupt()
    线程的join()方法
    守护线程Daemon
    使用多线程模拟一个银行叫号窗口
    多线程介绍
    十八、MySQL 数据排名查询某条数据是总数据的第几条
    spring cloud stream整合
    036线程进程(重要)
    035server端并发聊天
  • 原文地址:https://www.cnblogs.com/ren-jie/p/5399447.html
Copyright © 2011-2022 走看看