zoukankan      html  css  js  c++  java
  • 单词共现算法

    如果单词u属于单词w的窗口内,则认为(u,w)出现一次,这里的窗口可以定义为一个固定大小的窗口,或者是前后相连出现、在同一句中出现、在同一个段落中出现的单词,如果窗口中的单词为[w1,w2,w3],则发射((w1,w2),1)和((w1,w3),1)出去,然后窗口向后移动一个单词。Reduce阶段则对发射过来的相同键的值进行简单的累加求和即可

    设有一个英语语句:If you do not learn to think when you are young, you may never learn

    在Map阶段,假设窗口大小为6个单词,那么窗口首先覆盖If you do not learn to,通过计算得到键值对((If,you),1),((If,do),1),((If,not),1),((If,learn),1),((If,to),1),然后发射出去。随后窗口向后滑动一个单词,窗口覆盖到you do not learn to think,通过计算得到键值对((you,do),1),((you,not),1),((you,learn),1),((you,to),1),((you,think),1),然后后发射出去.重复此过程直到文档尾部,滑动窗口则通过将窗口头部向后缩进来进行,直到窗口大小为2,则键值对为((never,learn),1)

    自定义RecordReader,如代码1-1所示

    代码1-1

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeFileInputRecord extends RecordReader<Text, Text> {
    
    	private boolean progress = false;
    	private Text key = new Text();
    	private Text value = new Text();
    	private Configuration conf;
    	private FileSplit fileSplit;
    	private FSDataInputStream fis;
    
    	public WholeFileInputRecord(FileSplit fileSplit, Configuration conf) {
    		this.fileSplit = fileSplit;
    		this.conf = conf;
    	}
    
    	@Override
    	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		Path file = fileSplit.getPath();
    		FileSystem fs = file.getFileSystem(conf);
    		fis = fs.open(file);
    	}
    
    	@Override
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		if (!progress) {
    			byte[] content = new byte[(int) fileSplit.getLength()];
    			Path file = fileSplit.getPath();
    			key.set(file.getName());
    			try {
    				IOUtils.readFully(fis, content, 0, content.length);
    				value.set(content);
    			} catch (Exception e) {
    				e.printStackTrace();
    			} finally {
    				IOUtils.closeStream(fis);
    			}
    			progress = true;
    			return true;
    		}
    		return false;
    	}
    
    	@Override
    	public Text getCurrentKey() throws IOException, InterruptedException {
    		return key;
    	}
    
    	@Override
    	public Text getCurrentValue() throws IOException, InterruptedException {
    		return value;
    	}
    
    	@Override
    	public float getProgress() throws IOException, InterruptedException {
    		return progress ? 1.0f : 0.0f;
    	}
    
    	@Override
    	public void close() throws IOException {
    	}
    
    }
    

    自定义FileInputFormat,如代码1-2所示

    代码1-2

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeFileInputFormat extends FileInputFormat<Text, Text> {
    	
    	@Override
    	protected boolean isSplitable(JobContext context, Path filename) {
    		return false;
    	}
    
    	@Override
    	public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    		// TODO Auto-generated method stub
    		return new WholeFileInputRecord((FileSplit) split, context.getConfiguration());
    	}
    
    }
    

    定义一个WordPair类,用以保存同一个窗口出现的单词,如代码1-3

    代码1-3

    package com.hadoop.mapreduce;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class WordPair implements WritableComparable<WordPair> {
    
    	private String wordA;
    	private String wordB;
    
    	public WordPair() {
    	}
    
    	public WordPair(String wordA, String wordB) {
    		this.wordA = wordA;
    		this.wordB = wordB;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(wordA);
    		out.writeUTF(wordB);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		wordA = in.readUTF();
    		wordB = in.readUTF();
    	}
    
    	@Override
    	public String toString() {
    		return wordA + "," + wordB;
    	}
    
    	@Override
    	public int hashCode() {
    		return (wordA.hashCode() + wordB.hashCode()) * 9;
    	}
    
    	@Override
    	public int compareTo(WordPair o) {
    		if (equals(o)) {
    			return 0;
    		}
    		return (wordA + wordB).compareTo(o.wordA + o.wordB);
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (obj instanceof WordPair) {
    			return false;
    		}
    		WordPair w = (WordPair) obj;
    		if (wordA.equals(w.wordA) && wordB.equals(w.wordB)) {
    			return true;
    		}
    		if (wordA.equals(w.wordB) && wordB.equals(w.wordA)) {
    			return true;
    		}
    		return false;
    	}
    
    }
    

    代码1-4

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordConcurrnceMap extends Mapper<Text, Text, WordPair, IntWritable> {
    
    	private int windowSize = 0;
    	private static final String WORD_REGEX = "([a-zA-Z]{1,})";// 仅仅匹配由字母组成的简单英文单词
    	private static final Pattern WORD_PATTERN = Pattern.compile(WORD_REGEX);// 用于识别英语单词(带连字符-)
    	private Queue<String> queue = new LinkedList<String>();
    	private static final IntWritable ONE = new IntWritable(1);
    
    	protected void setup(Context context) throws java.io.IOException, InterruptedException {
    		Configuration conf = context.getConfiguration();
    		windowSize = conf.getInt("windowSize", 20);
    	};
    
    	protected void map(Text key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		System.out.println("value:" + value);
    		System.out.println("windowSize:" + windowSize);
    		Matcher matcher = WORD_PATTERN.matcher(value.toString());
    		while (matcher.find()) {
    			String word = matcher.group();
    			queue.add(word);
    			if (queue.size() >= windowSize) {
    				wordPair(context);
    			}
    		}
    		while (!(queue.size() <= 1)) {
    			wordPair(context);
    		}
    	};
    
    	private void wordPair(Context context) throws IOException, InterruptedException {
    		Iterator<String> it = queue.iterator();
    		String wordA = it.next();
    		while (it.hasNext()) {
    			String wordB = it.next();
    			context.write(new WordPair(wordA, wordB), ONE);
    		}
    		queue.remove();
    	}
    
    }
    

    代码1-5

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordConcurrnceReduce extends Reducer<WordPair, IntWritable, WordPair, IntWritable> {
    
    	private IntWritable wordSum = new IntWritable();
    
    	protected void reduce(WordPair key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
    		int sum = 0;
    		for (IntWritable val : values) {
    			sum += val.get();
    		}
    		wordSum.set(sum);
    		context.write(key, wordSum);
    	};
    
    }
    

    代码1-6

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class WordConcurrnce {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 3) {
    			throw new RuntimeException("请输入输入路径、输出路径和窗口大小");
    		}
    		Configuration conf = new Configuration();
    		conf.setInt("windowSize", Integer.parseInt(args[2]));
    		Job job = Job.getInstance(conf);
    		job.setJobName("WordConcurrnce");
    		job.setJarByClass(WordConcurrnce.class);
    
    		job.setMapperClass(WordConcurrnceMap.class);
    		job.setMapOutputKeyClass(WordPair.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		job.setReducerClass(WordConcurrnceReduce.class);
    		job.setOutputKeyClass(WordPair.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		job.setInputFormatClass(WholeFileInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    将text文件移到HDFS下的data文件下,如代码1-7

    代码1-7

    root@lejian:/data# cat text 
    If you do not learn to think when you are young, you may never learn
    root@lejian:/data# hadoop fs -put text /data
    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup         69 2017-01-12 20:41 /data/text
    

    运行代码1-6,运行结果如代码1-8

    代码1-8

    root@lejian:/data# hadoop jar wordConcurrnce.jar com.hadoop.mapreduce.WordConcurrnce /data /output 10
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-12 20:55 /output/_SUCCESS
    -rw-r--r--   1 root supergroup        848 2017-01-12 20:55 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    If,are  1
    If,do   1
    If,learn        1
    …………
    you,when        1
    you,you 2
    you,young       2
    
  • 相关阅读:
    Ueditor之SAE移植
    SAE flask及其扩展 bug指南
    SAE 安装未包含的第三方依赖包
    Bootstrap 和 LESS
    Device.js——检测设备平台、操作系统的Javascript 库
    Flask 富文本编辑器
    DDoS攻击
    WPF之数据绑定
    参数测试
    总结 一下UML 类图的关系
  • 原文地址:https://www.cnblogs.com/baoliyan/p/6279385.html
Copyright © 2011-2022 走看看