zoukankan      html  css  js  c++  java
  • MapReduceTopK TreeMap

    版权声明: https://blog.csdn.net/zhangxiango/article/details/33319281

    MapReduce TopK统计加排序中介绍的TopK在mapreduce的实现。

    本案例省略的上面案例中的Sort步骤,改用TreeMap来实现获取前K个词

    package TopK1;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 统计词频
     * @author zx
     * zhangxian1991@qq.com
     */
    public class WordCount {
    	
    	/**
    	 * 读取单词
    	 * @author zx
    	 *
    	 */
    	public static class Map extends Mapper<Object,Text,Text,IntWritable>{
    
    		IntWritable count = new IntWritable(1);
    		
    		@Override
    		protected void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			StringTokenizer st = new StringTokenizer(value.toString());
    			while(st.hasMoreTokens()){	
    				String word = st.nextToken().replaceAll(""", "").replace("'", "").replace(".", "");
    				context.write(new Text(word), count);
    			}
    		}
    		
    	}
    	
    	/**
    	 * 统计词频
    	 * @author zx
    	 *
    	 */
    	public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
    
    		@SuppressWarnings("unused")
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
    				throws IOException, InterruptedException {
    			int count = 0;
    			for (IntWritable intWritable : values) {
    				count ++;
    			}
    			context.write(key,new IntWritable(count));
    		}
    		
    	}
    	
    	@SuppressWarnings("deprecation")
    	public static boolean run(String in,String out) throws IOException, ClassNotFoundException, InterruptedException{
    		
    		FileUtil.deleteFile(out);
    
    		Configuration conf = new Configuration();
    		
    		Job job = new Job(conf,"WordCount1");
    		job.setJarByClass(WordCount.class);
    		job.setMapperClass(Map.class);
    		job.setReducerClass(Reduce.class);
    		
    		// 设置Map输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 设置Reduce输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 设置输入和输出文件夹
            FileInputFormat.addInputPath(job, new Path(in));
            FileOutputFormat.setOutputPath(job, new Path(out));
            
            return job.waitForCompletion(true);
    	}
    	
    }
    

    package TopK1;
    
    import java.io.IOException;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.TreeMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
    
    /**
     * 
     * @author zx
     *zhangxian1991@qq.com
     */
    public class TopK {
    
    	public static class TopKMap extends Mapper<Object, Text, IntWritable, IntWritable>{
    
    		TreeMap<Integer,String> tm = new TreeMap<Integer,String>(new Comparator<Integer>() {
    
    			/**
    			 * treeMap中的元素逆序排列
    			 * @param o1
    			 * @param o2
    			 * @return
    			 */
    			@Override
    			public int compare(Integer o1, Integer o2) {
    				return o2.compareTo(o1);
    			}
    			
    		});
    		int k = 0;
    		
    		@Override
    		protected void cleanup(Context context) throws IOException,
    				InterruptedException {
    			Configuration conf = context.getConfiguration();
    			Path topKPath = new Path(conf.get("topKOut"));
    			FileSystem fs = topKPath.getFileSystem(conf);
    			FSDataOutputStream fsDOS = fs.create(topKPath);
    			Iterator<Integer> it = tm.keySet().iterator();
    			while(it.hasNext()){
    				Integer key = it.next();
    				String value = tm.get(key).toString();
    				String line = value + "	" + key + "
    ";
    				fsDOS.write(line.getBytes(), 0, line.length());
    			}
    			fsDOS.flush();
    			fsDOS.close();
    		}
    
    		@Override
    		protected void setup(Context context) throws IOException,
    				InterruptedException {
    			k = Integer.parseInt(context.getConfiguration().get("K"));
    		}
    
    		@Override
    		protected void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			String[] parts = value.toString().split("	");
    			tm.put(Integer.parseInt(parts[1]),parts[0]);
    			if(tm.size() > k){
    				tm.remove(tm.lastKey());
    			}
    		}
    		
    	}
    	
    	@SuppressWarnings("deprecation")
    	public static void main(String args[]) throws ClassNotFoundException, IOException, InterruptedException{
    		
    		if(args.length < 4){
    			throw new IllegalArgumentException("要有4个參数:1,要统计的文本文件名称。2。统计后的结果路径。3,topK的结果文件夹,4,K");
    		}
    		
    		FileUtil.deleteFile(args[2]);
    		
    		//要统计字数的文本文件名称
    		String in = args[0];
    		
    		//统计字数后的结果
    		String wordCount = args[1];
    
    		in = FileUtil.loadFile(wordCount, "TopK1", in);
    		
    		//假设统计字数的job完毕后就開始求topK
    		if(WordCount.run(in, wordCount)){
    			
    			int k = Integer.parseInt(args[3]);
    			Configuration conf = new Configuration();
    			
    			FileUtil.deleteFile(args[2]);
    			conf.set("topKOut", args[2]);
    			conf.set("K", k+"");
    			
    			Job job = new Job(conf,"TopK1");
    			
    			job.setJarByClass(TopK.class);
    			job.setMapperClass(TopKMap.class);
    			
    			job.setOutputKeyClass(IntWritable.class);
    			job.setOutputValueClass(IntWritable.class);
    			
    			FileInputFormat.addInputPath(job, new Path(wordCount));
    			job.setOutputFormatClass(NullOutputFormat.class);
    			
    			System.exit(job.waitForCompletion(true)?

    0:1); } } }


    package TopK1;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    /**
     * 
     * @author zx
     *
     */
    public class FileUtil {
    
    	/**
    	 * 上传数据文件到hdfs
    	 * @param inputPath
    	 * @param fileName
    	 * @return
    	 * @throws IOException
    	 */
    	public static String loadFile(String inputPath,String folder,String fileName) throws IOException{
    		
    		//获取数据文件的全路径
    		
    		
    		if(null != folder && !"".equals(folder)){
    			folder = folder + "/";
    		}
    		
    		String srcPathDir = FileUtil.class.getProtectionDomain().getCodeSource().getLocation()
                    .getFile() + folder + fileName;
    		
    		Path srcpath = new Path("file:///" + srcPathDir);
    		
    		Path dstPath = new Path(getJobRootPath(inputPath) + fileName);
    		
    		Configuration conf = new Configuration();
    		
    		FileSystem fs = dstPath.getFileSystem(conf);
    		
    		fs.delete(dstPath, true);
    		
    		fs.copyFromLocalFile(srcpath, dstPath);
    		
    		fs.close();
    		
    		return getJobRootPath(inputPath) + fileName;
    	}
    	
    	/**
    	 * 假设路径的最后不包哈“/”就加一个“/”
    	 * @param path
    	 * @return
    	 */
    	public static String getJobRootPath(String path){
    		if(path.lastIndexOf("/") == path.length()-1){
    			path = path.substring(0, path.lastIndexOf("/"));
    		}
    		return path.substring(0, path.lastIndexOf("/")+1);
    	}
    	
    	public static void deleteFile(String ...filePath) throws IOException{
    		Configuration conf = new Configuration();
    		for (int i = 0; i < filePath.length; i++) {
    			Path path = new Path(filePath[i]);
    			FileSystem fs = path.getFileSystem(conf);
    			fs.delete(path,true);
    		}
    	}
    	
    }
    


查看全文
  • 相关阅读:
    有道翻译js解密(1)
    Python面试题之Python正则表达式re模块
    go语言从例子开始之Example4.常量
    go语言从例子开始之Example3.变量
    go语言从例子开始之Example2.类型
    go语言从例子开始之Example1.helloworld
    python模块打补丁
    gevent协程之猴子补丁带来的坑
    charles抓包小程序
    httptesting HTTP(s)接口自动化测试框架
  • 原文地址:https://www.cnblogs.com/ldxsuanfa/p/10666707.html
  • Copyright © 2011-2022 走看看