zoukankan      html  css  js  c++  java
  • 单词共现算法

    如果单词u属于单词w的窗口内,则认为(u,w)出现一次,这里的窗口可以定义为一个固定大小的窗口,或者是前后相连出现、在同一句中出现、在同一个段落中出现的单词,如果窗口中的单词为[w1,w2,w3],则发射((w1,w2),1)和((w1,w3),1)出去,然后窗口向后移动一个单词。Reduce阶段则对发射过来的相同键的值进行简单的累加求和即可

    设有一个英语语句:If you do not learn to think when you are young, you may never learn

    在Map阶段,假设窗口大小为6个单词,那么窗口首先覆盖If you do not learn to,通过计算得到键值对((If,you),1),((If,do),1),((If,not),1),((If,learn),1),((If,to),1),然后发射出去。随后窗口向后滑动一个单词,窗口覆盖到you do not learn to think,通过计算得到键值对((you,do),1),((you,not),1),((you,learn),1),((you,to),1),((you,think),1),然后后发射出去.重复此过程直到文档尾部,滑动窗口则通过将窗口头部向后缩进来进行,直到窗口大小为2,则键值对为((never,learn),1)

    自定义RecordReader,如代码1-1所示

    代码1-1

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeFileInputRecord extends RecordReader<Text, Text> {
    
    	private boolean progress = false;
    	private Text key = new Text();
    	private Text value = new Text();
    	private Configuration conf;
    	private FileSplit fileSplit;
    	private FSDataInputStream fis;
    
    	public WholeFileInputRecord(FileSplit fileSplit, Configuration conf) {
    		this.fileSplit = fileSplit;
    		this.conf = conf;
    	}
    
    	@Override
    	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		Path file = fileSplit.getPath();
    		FileSystem fs = file.getFileSystem(conf);
    		fis = fs.open(file);
    	}
    
    	@Override
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		if (!progress) {
    			byte[] content = new byte[(int) fileSplit.getLength()];
    			Path file = fileSplit.getPath();
    			key.set(file.getName());
    			try {
    				IOUtils.readFully(fis, content, 0, content.length);
    				value.set(content);
    			} catch (Exception e) {
    				e.printStackTrace();
    			} finally {
    				IOUtils.closeStream(fis);
    			}
    			progress = true;
    			return true;
    		}
    		return false;
    	}
    
    	@Override
    	public Text getCurrentKey() throws IOException, InterruptedException {
    		return key;
    	}
    
    	@Override
    	public Text getCurrentValue() throws IOException, InterruptedException {
    		return value;
    	}
    
    	@Override
    	public float getProgress() throws IOException, InterruptedException {
    		return progress ? 1.0f : 0.0f;
    	}
    
    	@Override
    	public void close() throws IOException {
    	}
    
    }
    

    自定义FileInputFormat,如代码1-2所示

    代码1-2

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeFileInputFormat extends FileInputFormat<Text, Text> {
    	
    	@Override
    	protected boolean isSplitable(JobContext context, Path filename) {
    		return false;
    	}
    
    	@Override
    	public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		return new WholeFileInputRecord((FileSplit) split, context.getConfiguration());
    	}
    
    }
    

    定义一个WordPair类,用以保存同一个窗口出现的单词,如代码1-3

    代码1-3

    package com.hadoop.mapreduce;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class WordPair implements WritableComparable<WordPair> {
    
    	private String wordA;
    	private String wordB;
    
    	public WordPair() {
    	}
    
    	public WordPair(String wordA, String wordB) {
    		this.wordA = wordA;
    		this.wordB = wordB;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(wordA);
    		out.writeUTF(wordB);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		wordA = in.readUTF();
    		wordB = in.readUTF();
    	}
    
    	@Override
    	public String toString() {
    		return wordA + "," + wordB;
    	}
    
    	@Override
    	public int hashCode() {
    		return (wordA.hashCode() + wordB.hashCode()) * 9;
    	}
    
    	@Override
    	public int compareTo(WordPair o) {
    		if (equals(o)) {
    			return 0;
    		}
    		return (wordA + wordB).compareTo(o.wordA + o.wordB);
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (obj instanceof WordPair) {
    			return false;
    		}
    		WordPair w = (WordPair) obj;
    		if (wordA.equals(w.wordA) && wordB.equals(w.wordB)) {
    			return true;
    		}
    		if (wordA.equals(w.wordB) && wordB.equals(w.wordA)) {
    			return true;
    		}
    		return false;
    	}
    
    }
    

    代码1-4

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordConcurrnceMap extends Mapper<Text, Text, WordPair, IntWritable> {
    
    	private int windowSize = 0;
    	private static final String WORD_REGEX = "([a-zA-Z]{1,})";// 仅仅匹配由字母组成的简单英文单词
    	private static final Pattern WORD_PATTERN = Pattern.compile(WORD_REGEX);// 用于识别英语单词(带连字符-)
    	private Queue<String> queue = new LinkedList<String>();
    	private static final IntWritable ONE = new IntWritable(1);
    
    	protected void setup(Context context) throws java.io.IOException, InterruptedException {
    		Configuration conf = context.getConfiguration();
    		windowSize = conf.getInt("windowSize", 20);
    	};
    
    	protected void map(Text key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		System.out.println("value:" + value);
    		System.out.println("windowSize:" + windowSize);
    		Matcher matcher = WORD_PATTERN.matcher(value.toString());
    		while (matcher.find()) {
    			String word = matcher.group();
    			queue.add(word);
    			if (queue.size() >= windowSize) {
    				wordPair(context);
    			}
    		}
    		while (!(queue.size() <= 1)) {
    			wordPair(context);
    		}
    	};
    
    	private void wordPair(Context context) throws IOException, InterruptedException {
    		Iterator<String> it = queue.iterator();
    		String wordA = it.next();
    		while (it.hasNext()) {
    			String wordB = it.next();
    			context.write(new WordPair(wordA, wordB), ONE);
    		}
    		queue.remove();
    	}
    
    }
    

    代码1-5

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordConcurrnceReduce extends Reducer<WordPair, IntWritable, WordPair, IntWritable> {
    
    	private IntWritable wordSum = new IntWritable();
    
    	protected void reduce(WordPair key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
    		int sum = 0;
    		for (IntWritable val : values) {
    			sum += val.get();
    		}
    		wordSum.set(sum);
    		context.write(key, wordSum);
    	};
    
    }
    

    代码1-6

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class WordConcurrnce {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 3) {
    			throw new RuntimeException("请输入输入路径、输出路径和窗口大小");
    		}
    		Configuration conf = new Configuration();
    		conf.setInt("windowSize", Integer.parseInt(args[2]));
    		Job job = Job.getInstance(conf);
    		job.setJobName("WordConcurrnce");
    		job.setJarByClass(WordConcurrnce.class);
    
    		job.setMapperClass(WordConcurrnceMap.class);
    		job.setMapOutputKeyClass(WordPair.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		job.setReducerClass(WordConcurrnceReduce.class);
    		job.setOutputKeyClass(WordPair.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		job.setInputFormatClass(WholeFileInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    将text文件移到HDFS下的data文件下,如代码1-7

    代码1-7

    root@lejian:/data# cat text 
    If you do not learn to think when you are young, you may never learn
    root@lejian:/data# hadoop fs -put text /data
    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup         69 2017-01-12 20:41 /data/text
    

    运行代码1-6,运行结果如代码1-8

    代码1-8

    root@lejian:/data# hadoop jar wordConcurrnce.jar com.hadoop.mapreduce.WordConcurrnce /data /output 10
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-12 20:55 /output/_SUCCESS
    -rw-r--r--   1 root supergroup        848 2017-01-12 20:55 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    If,are  1
    If,do   1
    If,learn        1
    …………
    you,when        1
    you,you 2
    you,young       2
    
  • 相关阅读:
    现代软件工程 第一章 概论 第4题——邓琨
    现代软件工程 第一章 概论 第9题——邓琨
    现代软件工程 第一章 概论 第7题——张星星
    现代软件工程 第一章 概论 第5题——韩婧
    hdu 5821 Ball 贪心(多校)
    hdu 1074 Doing Homework 状压dp
    hdu 1074 Doing Homework 状压dp
    hdu 1069 Monkey and Banana LIS变形
    最长上升子序列的初步学习
    hdu 1024 Max Sum Plus Plus(m段最大子列和)
  • 原文地址:https://www.cnblogs.com/baoliyan/p/6279385.html
Copyright © 2011-2022 走看看