zoukankan      html  css  js  c++  java
  • MapReduce编程实例


    1. WordCount单词统计
    2. 数据去重
    3. 倒排索引

    1. WordCount单词统计

    (1) 输入输出


    hellod world
    hellod hadoop


    hadoop    1
    hello    2
    world    1

    (2) 代码实现及分析

    package com.hadoop.kwang;
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class WordCount {
         * Mapper类
         * Object和Text是输入数据的<key,value>类型
         * Text和IntWritable是输出数据的<key,value>类型
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    context.write(word, one);    
         * Reducer类
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                int sum = 0; 
                //values是某个key对应的value的集合,即<key,value-list>,比如<hello, <1,1>>,values是值的集合
                for (IntWritable val : values) {
                    sum += val.get();        
                context.write(key, result);
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String input = "hdfs://";
            String output = "hdfs://";
            Job job = new Job(conf, "word count");
            job.setMapperClass(TokenizerMapper.class);        //为job设置Mapper类
            job.setCombinerClass(IntSumReducer.class);        //为job设置Conbiner类
            job.setReducerClass(IntSumReducer.class);        //为job设置Reducer类
            job.setOutputKeyClass(Text.class);                //设置输出key类型
            job.setOutputValueClass(IntWritable.class);        //设置输出value类型
            FileInputFormat.addInputPath(job, new Path(input));        //设置数据输入路径
            FileOutputFormat.setOutputPath(job, new Path(output));    //设置数据输出路径
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    View Code

     2. 数据去重

    (1) 输入输出


    2017-12-09 a
    2017-12-10 a
    2017-12-11 a
    2017-12-12 b
    2017-12-13 b
    2017-12-09 b
    2017-12-10 b
    2017-12-11 b
    2017-12-12 b
    2017-12-13 b


    2017-12-09 a
    2017-12-09 b
    2017-12-10 a
    2017-12-10 b
    2017-12-11 a
    2017-12-11 b
    2017-12-12 b
    2017-12-13 b 

    (2) 代码实现及分析

    import java.io.IOException;
    import java.net.URI;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class DedupClean {
         * Mapper类
        public static class DedupCleanMapper extends Mapper<LongWritable, Text, Text, Text> {
            private static Text line = new Text();
            private static Text nullString = new Text("");
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                line = value;
                context.write(line, nullString);
         * Recuder类
        public static class DedupCleanReducer extends Reducer<Text, Text, Text, Text> {
            protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                context.write(key, new Text(""));
        public static void main(String[] args) throws Exception {
            final String FILE_IN_PATH = "hdfs://";
            final String FILE_OUT_PATH = "hdfs://";
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI(FILE_OUT_PATH), conf);
            if (fs.exists(new Path(FILE_OUT_PATH))) {
                fs.delete(new Path(FILE_OUT_PATH), true);
            Job job = Job.getInstance(conf, "DedupClean");
            FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
            FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    View Code

    3. 倒排索引

    (1) 介绍



    正排索引(forward index):从文档角度看其中的单词,标识每个文档(用文档ID标识)都含有哪些单词,以及每个单词出现了多少次(词频)及出现的位置(相对于文档首部的偏移量)。

    倒排索引(inverted index):从单词角度看文档,标识每个单词分别在哪些文档中出现(文档ID),以及在各自的文档中每个单词分别出现了多少次(词频)及其出现的位置(相对于该文档首部的偏移量)。



    正排索引:文档 ——> 单词

    倒排索引:单词 ——> 文档



    (2) 输入输出及原理图


    hello you hello
    hello hans


    hans    b.txt:1
    hello    b.txt:1;a.txt:2
    you    a.txt:1


    (3) 代码实现及分析

    import java.io.IOException;
    import java.net.URI;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class InvertedIndex {
         * Mapper类
         *     输出<word:filename, value>格式,如<hello:a.txt, 1>
         *                                   <hello:a.txt, 1>
         *                                   <hello:b.txt, 1>
        public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                //文件路径:hdfs:// (split.getPath()方法)
                FileSplit split = (FileSplit)context.getInputSplit();
                String fileName = StringUtil.getShortPath(split.getPath().toString());
                //以<word:filename, value>形式存储 (便于Combiner中统计统一文件中相同单词数量)
                StringTokenizer st = new StringTokenizer(value.toString());
                while(st.hasMoreTokens()) {
                    String word = st.nextToken().toLowerCase();
                    word = word + ":" + fileName;
                    context.write(new Text(word), new Text("1"));
         * Conbiner类
         *     输入<word:filename, value>格式,如<hello:a.txt, 1>
         *                                   <hello:a.txt, 1>
         *                                   <hello:b.txt, 1>
         *     输出<word, filename:values>格式,如<hello, a.txt:2>
         *                                    <hello, b.txt:1>
        public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
            protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                long sum = 0;
                for(Text val : values) {
                    sum += Integer.valueOf(val.toString());
                //将key(hello:a.txt) 分割为newKey(hello)和fileKey(a.txt)
                String newKey = StringUtil.getSplitByIndex(key.toString(), ":", 0);
                String fileKey = StringUtil.getSplitByIndex(key.toString(), ":", 1);
                context.write(new Text(newKey), new Text(fileKey + ":" + String.valueOf(sum)));
         * Recuder类
         *     输入<word, filename:values>格式,如<hello, a.txt:2>
         *                                    <hello, b.txt:1>
         *     输出<word, filename1:values;filename2:values>格式,如<hello, a.txt:2;b.txt:1>
        public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
            protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                StringBuilder sb = new StringBuilder();
                for(Text val : values) {
                    sb.append(val.toString() + ";");
                context.write(key, new Text(sb.toString()));
        private static final String FILE_IN_PATH  = "hdfs://";
        private static final String FILE_OUT_PATH = "hdfs://"; 
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI(FILE_OUT_PATH), conf);
            if (fs.exists(new Path(FILE_OUT_PATH))) {
                fs.delete(new Path(FILE_OUT_PATH), true);
            Job job = Job.getInstance(conf, "InvertedIndex");
            FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
            FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
     * 工具类
     *     获取文件路径
    class StringUtil {
         * 获取文件路径名
        public static String getShortPath(String filePath) {
            if (filePath.length() == 0) {
                return filePath;
            return filePath.substring(filePath.lastIndexOf("/") + 1);
         * 根据regex分割str,并返回index位置的值
        public static String getSplitByIndex(String str, String regex, int index) {
            String[] splits = str.split(regex);
            if (splits.length < index) {
                return "";
            return splits[index];
    View Code
  • 相关阅读:
    关于 Profile
    Vim Editor
    C++ Note
    Android NDK Sample
    Dealing with the ! when you import android project
    File attributes and Authority of Linux
    The source code list of Android Git project
    Enable Android progurad in the case of multiple JAR lib
  • 原文地址:https://www.cnblogs.com/lemonu/p/9669631.html
Copyright © 2011-2022 走看看