zoukankan      html  css  js  c++  java
  • MapReduce实例(数据去重)

    数据去重:

          原理(理解):Mapreduce程序首先应该确认<k3,v3>,根据<k3,v3>确定<k2,v2>,原始数据中出现次数超过一次的数据在输出文件中只出现一次。Reduce的输出是不重复的数据,也就是每一行数据作为key,即k3。而v3为空或不需要设值。根据<k3,v3>得到k2为每一行的数据,v2为空。根据MapReduce框架设值可知,k1为每行的起始位置,v1为每行的内容。因此,v1需要赋值给k2,使得原来的v1作为新的k2,从而两个或更多文件通过在Reduce端聚合,得到去重后的数据。

     数据:

    file1.txt

    2016-6-1 b
    2016-6-2 a
    2016-6-3 b
    2016-6-4 d
    2016-6-5 a
    2016-6-6 c
    2016-6-7 d
    2016-6-3 c

    file2.txt

    2016-6-1 a
    2016-6-2 b
    2016-6-3 c
    2016-6-4 d
    2016-6-5 a
    2016-6-6 b
    2016-6-7 c
    2016-6-3 c

    *创建文件夹dedup_in并创建上述两文件,将该文件夹上传到HDFS中

    [root@neusoft-master filecontent]# hadoop dfs -put dedup_in/ /neusoft/

    [root@neusoft-master filecontent]# hadoop dfs -ls /neusoft

    (1)自定义Mapper任务   

     1     private static class MyMapper extends Mapper<Object, Text, Text, Text>{
     2         private static Text line=new Text();
     3         @Override
     4         protected void map(Object k1, Text v1,
     5                 Mapper<Object, Text, Text, Text>.Context context)
     6                 throws IOException, InterruptedException {
     7             line=v1;//v1为每行数据,赋值给line
     8             context.write(line, new Text(""));
     9         }
    10     }

    (2)自定义Reduce任务

    1 private static class MyReducer extends Reducer<Text, Text, Text, Text>
    2     {
    3         @Override
    4         protected void reduce(Text k2, Iterable<Text> v2s,
    5                 Reducer<Text, Text, Text, Text>.Context context)
    6                 throws IOException, InterruptedException {
    7             context.write(k2, new Text(""));
    8         }
    9     }

    (3)主函数(组织map和reduce)

     1 public static void main(String[] args) throws Exception {
     2         //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
     3                 //2将自定义的MyMapper和MyReducer组装在一起
     4                 Configuration conf=new Configuration();
     5                 String jobName=DataDeduplication.class.getSimpleName();
     6                 //1首先寫job,知道需要conf和jobname在去創建即可
     7                 Job job = Job.getInstance(conf, jobName);
     8                 
     9                 //*13最后,如果要打包运行改程序,则需要调用如下行
    10                 job.setJarByClass(DataDeduplication.class);
    11                 
    12                 //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
    13                 FileInputFormat.setInputPaths(job, new Path(args[0]));
    14                 //4指定解析<k1,v1>的类(谁来解析键值对)
    15                 //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
    16                 job.setInputFormatClass(TextInputFormat.class);
    17                 //5指定自定义mapper类
    18                 job.setMapperClass(MyMapper.class);
    19                 //6指定map输出的key2的类型和value2的类型  <k2,v2>
    20                 //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
    21                 job.setMapOutputKeyClass(Text.class);
    22                 job.setMapOutputValueClass(Text.class);
    23                 //7分区(默认1个),排序,分组,规约 采用 默认
    24                 job.setCombinerClass(MyReducer.class);
    25                 //接下来采用reduce步骤
    26                 //8指定自定义的reduce类
    27                 job.setReducerClass(MyReducer.class);
    28                 //9指定输出的<k3,v3>类型
    29                 job.setOutputKeyClass(Text.class);
    30                 job.setOutputValueClass(Text.class);
    31                 //10指定输出<K3,V3>的类
    32                 //*下面这一步可以省
    33                 job.setOutputFormatClass(TextOutputFormat.class);
    34                 //11指定输出路径
    35                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
    36                 
    37                 //12写的mapreduce程序要交给resource manager运行
    38                 job.waitForCompletion(true);
    39     }

    数据去重源代码:

     1 package Mapreduce;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapreduce.Job;
     9 import org.apache.hadoop.mapreduce.Mapper;
    10 import org.apache.hadoop.mapreduce.Reducer;
    11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    15 
    16 public class DataDeduplication {
    17     public static void main(String[] args) throws Exception {
    18         //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
    19                 //2将自定义的MyMapper和MyReducer组装在一起
    20                 Configuration conf=new Configuration();
    21                 String jobName=DataDeduplication.class.getSimpleName();
    22                 //1首先寫job,知道需要conf和jobname在去創建即可
    23                 Job job = Job.getInstance(conf, jobName);
    24                 
    25                 //*13最后,如果要打包运行改程序,则需要调用如下行
    26                 job.setJarByClass(DataDeduplication.class);
    27                 
    28                 //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
    29                 FileInputFormat.setInputPaths(job, new Path(args[0]));
    30                 //4指定解析<k1,v1>的类(谁来解析键值对)
    31                 //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
    32                 job.setInputFormatClass(TextInputFormat.class);
    33                 //5指定自定义mapper类
    34                 job.setMapperClass(MyMapper.class);
    35                 //6指定map输出的key2的类型和value2的类型  <k2,v2>
    36                 //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
    37                 job.setMapOutputKeyClass(Text.class);
    38                 job.setMapOutputValueClass(Text.class);
    39                 //7分区(默认1个),排序,分组,规约 采用 默认
    40                 job.setCombinerClass(MyReducer.class);
    41                 //接下来采用reduce步骤
    42                 //8指定自定义的reduce类
    43                 job.setReducerClass(MyReducer.class);
    44                 //9指定输出的<k3,v3>类型
    45                 job.setOutputKeyClass(Text.class);
    46                 job.setOutputValueClass(Text.class);
    47                 //10指定输出<K3,V3>的类
    48                 //*下面这一步可以省
    49                 job.setOutputFormatClass(TextOutputFormat.class);
    50                 //11指定输出路径
    51                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
    52                 
    53                 //12写的mapreduce程序要交给resource manager运行
    54                 job.waitForCompletion(true);
    55     }
    56     private static class MyMapper extends Mapper<Object, Text, Text, Text>{
    57         private static Text line=new Text();
    58         @Override
    59         protected void map(Object k1, Text v1,
    60                 Mapper<Object, Text, Text, Text>.Context context)
    61                 throws IOException, InterruptedException {
    62             line=v1;//v1为每行数据,赋值给line
    63             context.write(line, new Text(""));
    64         }
    65     }
    66     private static class MyReducer extends Reducer<Text, Text, Text, Text>
    67     {
    68         @Override
    69         protected void reduce(Text k2, Iterable<Text> v2s,
    70                 Reducer<Text, Text, Text, Text>.Context context)
    71                 throws IOException, InterruptedException {
    72             context.write(k2, new Text(""));
    73         }
    74     }
    75 }
    数据去重

     运行结果:

    [root@neusoft-master filecontent]# hadoop jar DataDeduplication.jar /neusoft/dedup_in /out12

    [root@neusoft-master filecontent]# hadoop dfs -text /out12/part-r-00000

    结果验证正确~

     

    注意:HDFS的显示形式

    [root@neusoft-master filecontent]# hadoop dfs -ls hdfs://neusoft-master:9000/out12

    [root@neusoft-master filecontent]# hadoop dfs -ls  /out12 

    等价表示形式

           /out12的完整表达形式hdfs://neusoft-master:9000/out12

  • 相关阅读:
    C++ 资源大全中文版
    C++标准库和标准模板库
    非常实用全面的 C++框架,库类等资源
    Parse陨落,开发者服务今后路在何方?
    MySQL 创始人:写代码比打游戏爽,程序员应多泡开源社区
    用callgraph生成的函数调用关系图
    Qemu对x86静态内存布局的模拟
    几篇QEMU/KVM代码分析文章
    任务执行引擎的工程
    初涉核心域
  • 原文地址:https://www.cnblogs.com/jackchen-Net/p/6428465.html
Copyright © 2011-2022 走看看