zoukankan      html  css  js  c++  java
  • 使用MapReduce实现一些经典的案例

      在工作中,很多时候都是用hive或pig来自动化执行mr统计,但是我们不能忘记原始的mr。本文记录了一些通过mr来完成的经典的案例,有倒排索引、数据去重等,需要掌握。

    一、使用mapreduce实现倒排索引

       倒排索引(Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。

       之所以称之为倒排索引,是因为文章内的单词反向检索获取文章标识,从而完成巨大文件的快速搜索。搜索引擎就是利用倒排索引来进行搜索的,此外,倒排索引也是Lucene的实现原理。

       假设有两个文件,a.txt类容为“hello you hello”,b.txt内容为“hello hans”,则倒排索引后,期望返回如下内容:

    "hello" "a.txt:2;b.txt:1"
    "you" "a.txt:1"
    "hans" "b.txt:1"
    View Code

       从后想前倒退,要输出结果“"hello" "a.txt:2;b.txt:1"”,则reduce输出为<hello,a.txt:2;b.txt:1>,输入为<hello,a.txt:2>、<hello,b.txt:1>。reduce的输入为map的输出,分一下,要map端直接输出<hello,a.txt:2>这种类型的数据是实现不了的。这时,我们可以借助combine作为中间过渡步骤来实现。combine输入数据为<hello:a.txt,1>、<hello:a.txt,1>、<hello:b.txt,1>,可以转化为符合reduce输入要求的数据,此时map端输出<hello:a.txt,1>类型的数据也是很简单的,实现过程如图1所示。

    图1 mapreduce倒排索引实现原理示意图

       实现代码如下:

    package com.hicoor.hadoop.mapreduce.reverse;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.SplitCompressionInputStream;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    //工具类
    class StringUtil {
        public static String getShortPath(String filePath) {
            if (filePath.length() == 0)
                return filePath;
            return filePath.substring(filePath.lastIndexOf("/") + 1);
        }
    
        public static String getSplitByIndex(String str, String regex, int index) {
            String[] splits = str.split(regex);
            if (splits.length < index)
                return "";
            return splits[index];
        }
    }
    
    public class InverseIndex {
    
        public static class ReverseWordMapper extends
                Mapper<LongWritable, Text, Text, Text> {
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                FileSplit split = (FileSplit) context.getInputSplit();
                String fileName = StringUtil.getShortPath(split.getPath()
                        .toString());
                StringTokenizer st = new StringTokenizer(value.toString());
                while (st.hasMoreTokens()) {
                    String word = st.nextToken().toLowerCase();
                    word = word + ":" + fileName;
                    context.write(new Text(word), new Text("1"));
                }
            }
        }
    
        public static class ReverseWordCombiner extends
                Reducer<Text, Text, Text, Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
    
                long sum = 0;
                for (Text value : values) {
                    sum += Integer.valueOf(value.toString());
                }
                String newKey = StringUtil.getSplitByIndex(key.toString(), ":", 0);
                String fileKey = StringUtil
                        .getSplitByIndex(key.toString(), ":", 1);
                context.write(new Text(newKey),
                        new Text(fileKey + ":" + String.valueOf(sum)));
            }
        }
        
        public static class ReverseWordReducer extends Reducer<Text, Text, Text, Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                
                StringBuilder sb = new StringBuilder("");
                for (Text v : values) {
                    sb.append(v.toString()+" ");
                }
                context.write(key, new Text(sb.toString()));
            }
        }
    
        private static final String FILE_IN_PATH = "hdfs://hadoop0:9000/reverse/in/";
        private static final String FILE_OUT_PATH = "hdfs://hadoop0:9000/reverse/out/";
    
        public static void main(String[] args) throws IOException,
                URISyntaxException, ClassNotFoundException, InterruptedException {
            System.setProperty("hadoop.home.dir", "D:\desktop\hadoop-2.6.0");
            Configuration conf = new Configuration();
    
            // 删除已存在的输出目录
            FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
            if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
                fileSystem.delete(new Path(FILE_OUT_PATH), true);
            }
    
            Job job = Job.getInstance(conf, "InverseIndex");
            job.setJarByClass(InverseIndex.class);
            job.setMapperClass(ReverseWordMapper.class);
            job.setCombinerClass(ReverseWordCombiner.class);
            job.setReducerClass(ReverseWordReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
            FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
            job.waitForCompletion(true);
        }
    }
    View Code
    二、使用mapreduce实现TopK查询

       TopK问题指在海量数据中查找某条件排名前K名的记录,如在用户存款记录中查找存款余额最大的前3名用户。当数据量不大时,可以直接加载到单机内存中进行处理,但是当数据量非常庞大时,需要借助mapreduce来分布式处理。可以使用HiveQL来处理,也可以自己编写mapduce程序来处理此问题。

       实现原理:在每个map任务中查询并返回当前处理数据最大的top k条记录,然后将所有map输出的记录交由一个reduce任务处理,查找并返回最终的top k记录,过程如图2所示。

                 图2 mapreduce实现top k过程示意图

       需要注意的是,这里reduce个数只能为1个,并且不需要设置Combiner

       假设存在文件deposit1.txt和deposit2.txt,其内容分别为(列分别表示用户名与存款金额):

    deposit1.txt
    p1    125
    p2    23
    p3    365
    p4    15
    p5    188
    
    deposit2.txt
    p6    236
    p7    115
    p8    18
    p9    785
    p10    214
    View Code

       要求找出存款金额最大的前3位用户,参考实现代码:

    package com.hicoor.hadoop.mapreduce;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.Comparator;
    import java.util.TreeMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MapReduceTopKDemo {
    
        public static final int K = 3;
        
        //默认的TreeMap是按key升序排列 此方法用于获取降序排列的TreeMap
        private static TreeMap<Long, String> getDescSortTreeMap() {
            return new TreeMap<Long, String>(new Comparator<Long>() {
                @Override
                public int compare(Long o1, Long o2) {
                    return o2.compareTo(o1);
                }
            });
        } 
        
        static class TopKMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
            private TreeMap<Long, String> map = getDescSortTreeMap();
            
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                
                String line = value.toString();
                if(line == null || line == "") return;
                String[] splits = line.split("	");
                if(splits.length < 2) return;
                
                
                map.put(Long.parseLong(splits[1]), splits[0]);
                //只保留最大的K个数据
                if(map.size() > K) {
                    //由于记录按照key降序排列 只需删除最后一个记录
                    map.remove(map.lastKey());
                }
            }
            
            @Override
            protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                
                for (Long num : map.keySet()) {
                    context.write(new LongWritable(num), new Text(map.get(num)));
                }
            }
        }
        
        static class TopKReducer extends Reducer<LongWritable, Text, Text, LongWritable> {
            private TreeMap<Long, String> map = getDescSortTreeMap();
            
            @Override
            protected void reduce(LongWritable key, Iterable<Text> value, Reducer<LongWritable, Text, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                
                StringBuilder ps = new StringBuilder();
                for (Text val : value) {
                    ps.append(val.toString());
                }
                
                map.put(key.get(), ps.toString());
                if(map.size() > K) {
                    map.remove(map.lastKey());
                }
            }
            
            @Override
            protected void cleanup(Reducer<LongWritable, Text, Text, LongWritable>.Context context)
                    throws IOException, InterruptedException {
                
                for (Long num : map.keySet()) {
                    context.write(new Text(map.get(num)), new LongWritable(num));
                }
            }
        }
        
        private final static String FILE_IN_PATH = "hdfs://cluster1/topk/in";
        private final static String FILE_OUT_PATH = "hdfs://cluster1/topk/out";
    
        /* TopK问题:在海量数据中查找某条件排名前K名的记录,如在用户存款记录中查找存款余额最大的前3名用户
         * 1) 测试输入数据(列分别表示用户账户与存款余额):
         *         p1    125
         *         p2    23
         *         p3    365
         *         p4    15
         *         p5    188
         *         p6    236
         *         p7    115
         *         p8    18
         *         p9    785
         *         p10    214
         * 2) 输出结果:
         *         p9      785
         *         p3      365
         *         p6      236
         */
        public static void main(String[] args) throws Exception {
            System.setProperty("hadoop.home.dir", "D:\desktop\hadoop-2.6.0");
            Configuration conf = getHAContiguration();
    
            // 删除已存在的输出目录
            FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
            if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
                fileSystem.delete(new Path(FILE_OUT_PATH), true);
            }
    
            Job job = Job.getInstance(conf, "MapReduce TopK Demo");
            job.setMapperClass(TopKMapper.class);
            job.setJarByClass(MapReduceTopKDemo.class);
            job.setReducerClass(TopKReducer.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
            FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
            job.waitForCompletion(true);
        }
    
        private static Configuration getHAContiguration() {
            Configuration conf = new Configuration();
            conf.setStrings("dfs.nameservices", "cluster1");
            conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop1,hadoop2");
            conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "172.19.7.31:9000");
            conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop2", "172.19.7.32:9000");
            conf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
            return conf;
        }
    
    }
    View Code

       执行结果为:

    p9      785
    p3      365
    p6      236
    View Code
  • 相关阅读:
    Docker常用命令总结(不断更新)
    Docker容器简介-与虚拟机的区别及安装步骤
    ELK搭建—安装使用Kibana可视化
    使用CURL与ElasticSearch服务进行通信
    安装部署ElasticSearch单节点在Linux服务器上
    ElasticStack分布式引擎技术栈(ELK)介绍
    为Nginx服务器配置黑(白)名单的防火墙
    php大力力 [026节] php开发状态要随时做好整理工作
    php大力力 [025节] 来不及学习和分类的,大力力认为有价值的一些技术文章合集(大力力二叔公)(2015-08-27)
    php大力力 [024节]PHP中的字符串连接操作(2015-08-27)
  • 原文地址:https://www.cnblogs.com/hanganglin/p/4470968.html
Copyright © 2011-2022 走看看