zoukankan      html  css  js  c++  java
  • 在map端使用关联数组实现wordcount

      今天看Data-Intensive Text Processing with MapReduce 这本书的第三章的时候,里面有写到在map端优化wordcount。

      对数据密集型数据进行分布式处理的时候,影响数据处理速度的非常重要的一个方面就是map的输出中间结果,在传送到reduce的过程中,很多的中间数据需要进行交换以及包括一些相应的处理,然后再交给相应的reduce。其中中间数据需要在网络中传输,另外中间数据在发送到网络上之前还要写到本地磁盘上,因为网络带宽和磁盘I/O是非常耗时的相比与其他的操作,所以减少中间数据的传输将会增加算法的执行效率,通过使用combiner函数或者其他的方式减少key-value对的个数。下面是一个改进的wordcount算法。

      基本的思想是:

      在map处理的时候定义一个关联数组,然后对文档进行处理,将<word,次数>加入到关联数组中,word存在,则将相应的次数加1,不存在则直接加入到关联数组中。所有的map任务结束后,然后再在run函数中输出处理结果。

    伪代码:

    class Mapper

      method Map(docid a,doc d)

                H =new AssociativeArray

         for all term t 属于doc  d  do

                         H{t}=H{t}+1;

                     for all term t 属于 H do

                    EMIT(term t,count H{t})

    class REDUCER

         method REDUCE(term t,counts[c1,c2,...])

                    sum=0

                   for  all count c 属于 counts[c1,c2,...]  do

                       sum+=c

                 EMIT(term t,count sum)

    代码如下:

    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.StringTokenizer;
    import java.util.Map.Entry;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.LineReader;


    public class Mapper extends
    org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {

    int c;
    HashMap<String,IntWritable> map=new HashMap<String,IntWritable>();
    @Override
    protected void map(LongWritable key, Text value,
    Context context)
    throws IOException, InterruptedException {
    String str=value.toString();
    StringTokenizer token=new StringTokenizer(str);
    while(token.hasMoreTokens()){
    String value1=token.nextToken();
    if(map.containsKey(value1)){
    //System.out.println("ni");
    int p=map.get(value1).get()+1;
    map.remove(value1);
    map.put(value1, new IntWritable(p));
    }
    else{
    //System.out.println("ni");
    map.put(value1, new IntWritable(1));
    }
    }
    // TODO Auto-generated method stub

    c++;
    System.out.println(c);



    }
    @Override
    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
    throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    System.out.println("cleanup");
    super.cleanup(context);
    }

    @Override
    public void run(Context context) throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    super.run(context);
    System.out.println("run");
    Iterator it=map.entrySet().iterator();
    while(it.hasNext()){
    //System.out.println("nihe");
    Map.Entry<String, IntWritable> entry=(Map.Entry<String, IntWritable>) it.next();
    //System.out.println("nihe");
    context.write(new Text(entry.getKey()), entry.getValue());

    }

    }

    @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
    throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    // System.out.println(context.getInputSplit().toString());
            // System.out.println(context.getJobID());
      // FileSplit input=(FileSplit)context.getInputSplit();
    // String path=input.getPath().toString();
    // Configuration conf=new Configuration();
      // System.out.println(input.getPath().toString());
       // FileSystem fs=FileSystem.get(URI.create(path), conf);
    // FSDataInputStream filein=fs.open(input.getPath());
       //  LineReader in=new LineReader(filein,conf);
    // Text line=new Text();
    //  int cd=in.readLine(line);
    //   System.out.println(line);
         }
     }



    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;


    public class Reducer extends
    org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
    Context context)
    throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    int sum=0;
    for(IntWritable it:values){
    sum+=it.get();
    }
    context.write(key, new IntWritable(sum));
    }




    }



    import java.io.IOException;
    import java.net.URI;



    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


    public class Word {

    /**
    *
    @param args
    *
    @throws IOException
    *
    @throws ClassNotFoundException
    *
    @throws InterruptedException
    */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    // TODO Auto-generated method stub
    Job job=new Job();
    Configuration conf=new Configuration();

    Path in=new Path(args[0]);
    Path out=new Path(args[1]);

    FileSystem fs=FileSystem.get(URI.create(args[1]), conf);
    fs.delete(out);
    FileInputFormat.addInputPath(job, in);
    FileOutputFormat.setOutputPath(job, out);
    job.setMapperClass(Mapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);



    job.waitForCompletion(false);



    }

    }




  • 相关阅读:
    ProviderManager
    C#.NET常见问题(FAQ)-如何把定义存放类实例的数组
    C#.NET常见问题(FAQ)-命名空间namespace如何理解
    C#.NET常见问题(FAQ)-索引器indexer有什么用
    C#.NET常见问题(FAQ)-构造器constructor有什么用
    C#.NET常见问题(FAQ)-public private protectd internal有什么区别
    C#.NET常见问题(FAQ)-override覆盖和virtual虚类如何理解
    C#.NET常见问题(FAQ)-如何使用右下角托盘图标notifyIcon
    C#.NET常见问题(FAQ)-如何使用变量访问控件属性
    C#.NET常见问题(FAQ)-如何使用变量动态添加控件
  • 原文地址:https://www.cnblogs.com/dlutxm/p/2223055.html
Copyright © 2011-2022 走看看