zoukankan      html  css  js  c++  java
  • 运用mapreduce计算tf-idf

    问题描写叙述:给定一个大文件,文件里的内容每一行为:文档名,文档内容。

    input

    文档名1,word1 Word2 .......

    文档名2,word1 Word2 .......

    output

    word  文档名  tfidf值

    package com.elex.mapreduce;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Map;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.elex.mapreduce.TFIDF_4.IDFMap;
    import com.elex.mapreduce.TFIDF_4.IDFReduce;
    import com.elex.utils.DataClean;
    import com.google.common.io.Closeables;
    
    public class TFIDF_5 {
    	public static String hdfsURL = "hdfs://namenode:8020";
    	public static String fileURL = "/tmp/usercount";
    
    	public static class TFMap extends Mapper<Object, Text, Text, Text> {
    		public void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			String userWordstmp = value.toString();
    			StringTokenizer userWords = new StringTokenizer(userWordstmp, "
    ");
    			while (userWords.hasMoreTokens()) {
    				String userWordFragtmp = userWords.nextToken();
    				StringTokenizer userWordFrag = new StringTokenizer(
    						userWordFragtmp, ",");
    				String user = userWordFrag.nextToken();
    				Text outputKey = new Text();
    				Text outputValue = new Text();
    				while (userWordFrag.hasMoreTokens()) {
    					String words = userWordFrag.nextToken();
    					HashMap<String, Integer> wordMap = DataClean.clean(words,
    							"!total");
    					int wordTotal = wordMap.get("!total");
    					wordMap.remove("!total");
    					for (Map.Entry<String, Integer> wordEntry : wordMap
    							.entrySet()) {
    						String word = wordEntry.getKey();
    						int wordCount = wordEntry.getValue();
    						float tf = (float) wordCount / (float) wordTotal;
    						String outputStr = word + " " + Float.toString(tf)
    								+ ",";
    						byte[] bytes = outputStr.getBytes();
    						outputValue.append(bytes, 0, bytes.length);
    					}
    				}
    				outputKey.set(user);
    				context.write(outputKey, outputValue);
    			}
    		}
    	}
    
    	public static class TFReduce extends Reducer<Text, Text, Text, Text> {
    		public void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    			// StringBuffer sb = new StringBuffer();
    			Iterator<Text> iter = values.iterator();
    			while (iter.hasNext()) {
    				// sb.append(iter.next().toString() + "	");
    				context.write(key, iter.next());
    			}
    			// Text outputValue = new Text();
    			// outputValue.set(sb.toString());
    			// context.write(key, outputValue);
    		}
    	}
    
    	public static class IDFMap extends Mapper<Object, Text, Text, Text> {
    		public void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			String valuesTmp = value.toString();
    			StringTokenizer userWordFrag = new StringTokenizer(valuesTmp, "
    ");
    			while (userWordFrag.hasMoreTokens()) {
    				// String userWordtmp = userWordFrag.nextToken();
    				StringTokenizer userWords = new StringTokenizer(
    						userWordFrag.nextToken(), "	");
    				String user = userWords.nextToken();
    				while (userWords.hasMoreTokens()) {
    					StringTokenizer wordTFs = new StringTokenizer(
    							userWords.nextToken(), ",");
    					while (wordTFs.hasMoreTokens()) {
    						StringTokenizer wordTF = new StringTokenizer(
    								wordTFs.nextToken());
    						String word = wordTF.nextToken();
    						String tf = wordTF.nextToken();
    						Text outputKey = new Text();
    						Text outputValue = new Text();
    						outputKey.set(word);
    						outputValue.set(user + "	" + tf);
    						context.write(outputKey, outputValue);
    					}
    				}
    			}
    
    		}
    	}
    
    	public static class IDFReduce extends Reducer<Text, Text, Text, Text> {
    		long userCount = 0;
    
    		public void setup(Context context) throws IOException {
    			Configuration conf = context.getConfiguration();
    			Path path = new Path(fileURL);
    			FileSystem fs = FileSystem.get(URI.create(hdfsURL), conf);
    			if (!fs.isFile(path)) {
    				FSDataOutputStream output = fs.create(path, true);
    				output.close();
    			}
    			FSDataInputStream input = fs.open(path);
    			StringBuffer sb = new StringBuffer();
    			byte[] bytes = new byte[1024];
    			int status = input.read(bytes);
    			while (status != -1) {
    				sb.append(new String(bytes));
    				status = input.read(bytes);
    			}
    			if (!"".equals(sb.toString())) {
    				userCount = Long.parseLong(sb.toString().trim());
    			}
    			input.close();
    		}
    
    		public void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    			LinkedList<String> userList = new LinkedList<String>();
    			Iterator<Text> iter = values.iterator();
    			long wordCount = 0;
    			while (iter.hasNext()) {
    				wordCount++;
    				userList.add(iter.next().toString());
    			}
    			float idf = (float) Math.log((float) userCount
    					/ (float) (wordCount + 1));
    			Iterator<String> userIter = userList.iterator();
    			Text outputValue = new Text();
    			while (userIter.hasNext()) {
    				String usertftmp = userIter.next();
    				StringTokenizer usertf = new StringTokenizer(usertftmp, "	");
    				String user = usertf.nextToken();
    				String tfStr = usertf.nextToken();
    				float tf = Float.parseFloat(tfStr.trim().toString());
    				float tfidf = tf * idf;
    				String outputTmp = user + "	" + tfidf;
    				outputValue.set(outputTmp);
    				context.write(key, outputValue);
    			}
    		}
    	}
    
    	public static class UserCountMap extends Mapper<Object, Text, Text, Text> {
    
    		public void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			String userWordtmp = value.toString();
    			StringTokenizer userWord = new StringTokenizer(userWordtmp, "
    ");
    			while (userWord.hasMoreTokens()) {
    				userWord.nextToken();
    				Text outputKey = new Text();
    				outputKey.set("usercount");
    				Text one = new Text();
    				one.set("1");
    				context.write(outputKey, one);
    			}
    		}
    	}
    
    	public static class UserCountCombine extends
    			Reducer<Text, Text, Text, Text> {
    		public void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    			long user = 0;
    			for (Text value : values) {
    				String valueTmp = value.toString();
    				user += Long.parseLong(valueTmp);
    			}
    			Text outputValue = new Text();
    			outputValue.set(Long.toString(user));
    			context.write(key, outputValue);
    		}
    	}
    
    	public static class UserCountReduce extends Reducer<Text, Text, Text, Text> {
    		int userCount = 0;
    
    		public void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    			for (Text value : values) {
    				String valueTmp = value.toString();
    				userCount += Long.parseLong(valueTmp);
    			}
    		}
    
    		public void cleanup(Context context) throws IOException {
    			Configuration conf = context.getConfiguration();
    			FileSystem fs = FileSystem.get(URI.create(hdfsURL), conf);
    			Path path = new Path(fileURL);
    			FSDataOutputStream output = fs.create(path, true);
    			String content = Long.toString(userCount);
    			output.write(content.getBytes());
    			output.flush();
    			output.close();
    		}
    	}
    
    	public static void main(String[] args) throws IOException,
    			ClassNotFoundException, InterruptedException {
    		// TODO Auto-generated method stub
    		Configuration conf = new Configuration();
    		// conf.set("mapred.child.java.opts", "-Xmx4096m");
    		Job tfJob = Job.getInstance(conf, "tfjob");
    		tfJob.setJarByClass(TFIDF_5.class);
    		tfJob.setMapperClass(TFMap.class);
    		// tfJob.setCombinerClass(TFCombine.class);
    		tfJob.setReducerClass(TFReduce.class);
    		tfJob.setOutputKeyClass(Text.class);
    		tfJob.setOutputValueClass(Text.class);
    		FileInputFormat.setInputPaths(tfJob, new Path(args[0]));
    		FileOutputFormat.setOutputPath(tfJob, new Path(args[1]));
    		tfJob.waitForCompletion(true);
    
    		// Job userCountJob = Job.getInstance(conf, "usercountjob");
    		// userCountJob.setJarByClass(TFIDF_5.class);
    		// userCountJob.setMapperClass(UserCountMap.class);
    		// userCountJob.setCombinerClass(UserCountCombine.class);
    		// userCountJob.setReducerClass(UserCountReduce.class);
    		// userCountJob.setOutputKeyClass(Text.class);
    		// userCountJob.setOutputValueClass(Text.class);
    		// FileInputFormat.setInputPaths(userCountJob, new Path(args[1]));
    		// FileOutputFormat.setOutputPath(userCountJob, new Path(args[2]));
    		// userCountJob.waitForCompletion(true);
    <span style="white-space: pre;">		</span>//计算文档数,并暂时储存到hdfs上
    		Counter ct = tfJob.getCounters().findCounter(
    				"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS");
    		System.out.println(ct.getValue());
    		Iterable<String> groupNames = tfJob.getCounters().getGroupNames();
    		for (String groupName : groupNames) {
    			System.out.println(groupName);
    		}
    		FileSystem fs = FileSystem.get(URI.create(hdfsURL), conf);
    		Path path = new Path(fileURL);
    		FSDataOutputStream output = fs.create(path, true);
    		String content = Long.toString(ct.getValue());
    		output.write(content.getBytes());
    		output.flush();
    		output.close();
    
    		Job idfJob = Job.getInstance(conf, "idfjob");
    		idfJob.setJarByClass(TFIDF_5.class);
    		idfJob.setMapperClass(IDFMap.class);
    		idfJob.setReducerClass(IDFReduce.class);
    		idfJob.setOutputKeyClass(Text.class);
    		idfJob.setOutputValueClass(Text.class);
    		FileInputFormat.setInputPaths(idfJob, new Path(args[1]));
    		FileOutputFormat.setOutputPath(idfJob, new Path(args[3]));
    		System.exit(idfJob.waitForCompletion(true) ? 0 : 1);
    
    	}
    
    }
    

    最初运用了一个单独的job计算文档数,后面经过公司前辈的指点,能够通过计算tf的时候运用输入数据的条数来巧妙的计算文档数。


  • 相关阅读:
    LeetCode "Palindrome Partition II"
    LeetCode "Longest Substring Without Repeating Characters"
    LeetCode "Wildcard Matching"
    LeetCode "Best Time to Buy and Sell Stock II"
    LeetCodeEPI "Best Time to Buy and Sell Stock"
    LeetCode "Substring with Concatenation of All Words"
    LeetCode "Word Break II"
    LeetCode "Word Break"
    Some thoughts..
    LeetCode "Longest Valid Parentheses"
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/5127332.html
Copyright © 2011-2022 走看看