zoukankan      html  css  js  c++  java
  • Mapreduce实例--去重

    数据去重”主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。

    数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后交给reduce。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求(可以设置为空)。当reduce接收到一个<key,value-list>时就直接将输入的key复制到输出的key中,并将value设置成空值,然后输出<key,value>。

    MaprReduce去重流程如下图所示:

    操作环境

    Centos 7  #搭建有Hadoop集群

    jdk1.8

    hadoop 3.2.0

    IDEA 2019

    操作任务:

    现有含有三个元素的数据集,它们通过" "分割,下面截取部分数据仅供参考

    用户id   商品id    收藏日期
    10181   1000481   2010-04-04 16:54:31
    20001   1001597   2010-04-07 15:07:52
    20001   1001560   2010-04-07 15:08:27
    20042   1001368   2010-04-08 08:20:30
    20067   1002061   2010-04-08 16:45:33
    20056   1003289   2010-04-12 10:50:55
    20056   1003290   2010-04-12 11:57:35
    20056   1003292   2010-04-12 12:05:29
    20054   1002420   2010-04-14 15:24:12
    20055   1001679   2010-04-14 19:46:04
    20054   1010675   2010-04-14 15:23:53
    20054   1002429   2010-04-14 17:52:45
    20076   1002427   2010-04-14 19:35:39
    20054   1003326   2010-04-20 12:54:44
    20056   1002420   2010-04-15 11:24:49
    20064   1002422   2010-04-15 11:35:54
    20056   1003066   2010-04-15 11:43:01
    20056   1003055   2010-04-15 11:43:06
    20056   1010183   2010-04-15 11:45:24
    20056   1002422   2010-04-15 11:45:49
    20056   1003100   2010-04-15 11:45:54
    20056   1003094   2010-04-15 11:45:57
    20056   1003064   2010-04-15 11:46:04
    20056   1010178   2010-04-15 16:15:20
    20076   1003101   2010-04-15 16:37:27
    20076   1003103   2010-04-15 16:37:05
    20076   1003100   2010-04-15 16:37:18
    20076   1003066   2010-04-15 16:37:31
    20054   1003103   2010-04-15 16:40:14
    20054   1003100   2010-04-15 16:40:16

    操作要求用java编写Mapreduce程序,根据Id进行去重,统计用户收藏商品中都有哪些商品被收藏,统计数据如下:

    商品id
    1000481
    1001368
    1001560
    1001597
    1001679
    1002061
    1002420
    1002422
    1002427
    1002429
    1003055
    1003064
    1003066
    1003094
    1003100
    1003101
    1003103
    1003289
    1003290
    1003292
    1003326
    1010178
    1010183
    1010675

    操作步骤:

    首先启动Hadoop集群,将数据集上传到Hdfs

    ./start-all.sh

    hadoop fs -mkdir -p /mymapreduce2/in hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in

    在IDEA中建立Java工程,为了避免错误,我们使用hadoop安装文件中的Jar包。

    再编写代码,数据去重的目的是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。那么就将相同的key值的所有value记录到一台reduce机器,让其无论出现多少次,最终结果只输出一次,具体就是reduce的输出应该以数据作为key,而value-key没有要求,当reduce接收到一个时,就直接将Key复制到key中,将value设置为空。

    具体代码:

    package mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    public class Filter{
    
        public static class Map extends Mapper<Object , Text , Text , NullWritable>{
            //map将输入中个value复制到输出数据的Key上,并直接输出
            //从输入中得到的每行的数据的类型
            private static Text newKey=new Text();
            public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
                //实现map函数
                //获取并输出每一次的处理过程
                String line=value.toString();
                System.out.println(line);
                String arr[]=line.split("	");
                newKey.set(arr[1]);
                context.write(newKey, NullWritable.get());
                System.out.println(newKey);
            }
        }
        public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
            public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{
                //获得并输出每一次的处理过程
                context.write(key,NullWritable.get());
            }
        }
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
            Configuration conf=new Configuration();
            System.out.println("start");
            Job job =new Job(conf,"filter");
            job.setJarByClass(Filter.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");
            Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");
            FileInputFormat.addInputPath(job,in);
            FileOutputFormat.setOutputPath(job,out);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    当执行完毕后查看结果:

     hadoop fs -ls /mymapreduce2/out
     hadoop fs -cat /mymapreduce2/out/part-r-00000

  • 相关阅读:
    【LOJ】#2071. 「JSOI2016」最佳团体
    【LOJ】#2070. 「SDOI2016」平凡的骰子
    【LOJ】#2069. 「SDOI2016」齿轮
    【LOJ】#2068. 「SDOI2016」探险路线
    【LOJ】#2067. 「SDOI2016」硬币游戏
    【LOJ】#2066. 「SDOI2016」墙上的句子
    【LOJ】#2065. 「SDOI2016」模式字符串
    【LOJ】#2064. 「HAOI2016」找相同字符
    【LOJ】#2063. 「HAOI2016」字符合并
    【LOJ】#2062. 「HAOI2016」地图
  • 原文地址:https://www.cnblogs.com/jake-jin/p/11884836.html
Copyright © 2011-2022 走看看