zoukankan      html  css  js  c++  java
  • MapReduce笔记——技术点汇总

    目录

    · 概况

    · 原理

        · MapReduce编程模型

        · MapReduce过程

        · 容错机制

    · API

        · 概况

        · WordCount示例

        · Writable接口

        · Mapper类

        · Reducer类

        · Partitioner抽象类

        · WritableComparator接口

        · 示例:连接

        · 示例:二次排序


    概况

    1. 起源:一篇Google论文。

    2. 特点

        a) 开发简单:用户可不考虑进程通信、套接字编程,无需高深技巧,只需符合MapReduce编程模型。

        b) 伸缩性:当集群资源无法满足计算需求时,可通过增加节点达到线性伸缩集群的目的。

        c) 容错性:节点故障导致的作业失败,计算框架自动将作业安排到健康节点重新执行,直到任务完成。

    3. MapReduce含义:MapReduce编程模型;MapReduce运行环境(YARN)。

    4. 局限性

        a) 执行速度慢:普通MapReduce作业一般分钟级别完成,复杂作业或数据量更大时可能花费一小时或更多。MapReduce通常时数据密集型作业,大量中间结果写到磁盘并通过网络传输,消耗大量时间。

        b) 过于底层:与SQL相比,过于底层。对于习惯关心数据库的用户,或数据分析师,编写mapreduce函数无疑头疼。

        c) 无法实现所有算法。

    原理

    MapReduce编程模型

    1. MapReduce起源:LISP和其他函数式编程语言中的古老映射和化简操作。

    2. MapReduce操作数据最小单位:键值对。

    3. MapReduce模型执行过程

    (Key1, Value1)  →  (Key1, List<Value2>)  →  (Key3, Value3)

        a) 将数据抽象为键值对形式作为map函数输入;

        b) 经过map函数处理,生成一系列新键值对作为中间结果输出到本地;

        c) 计算框架自动将中间结果按键聚合,并将键相同的数据分发到reduce函数,以键和对应指的集合作为reduce函数输入;

        d) 经过reduce函数处理,生成一系列键值对作为最终输出。

    4. WordCount举例

    MapReduce过程

    1. 过程描述:inputmapcombinereduceoutput五个阶段,其中combine阶段不一定发生,map函数输出的中间结果被被分发到reduce函数的过程称为shuffleshuffle阶段还会发生copysort

    1. MapReduce任务:一个作业被分成MapReduce计算两个阶段,分别由一个或多个Map任务和Reduce任务组成。

    2. input阶段

        a) input过程:如果使用HDFS上文件作为MapReduce输入,计算框架以“org.apache.hadoop.mapreduce.InputFormat”抽象类的子类“org.apache.hadoop.mapreduce.lib.input.FileInputFormat”作为该文件的InputSplit,每个InputSplit作为一个Map任务的输入,再将InputSplit解析成键值对。

        b) InputSplit对数据影响:InputSplit只在逻辑上对数据切分,不影响磁盘上存储的文件。

        c) InputSplit包含信息:分片的元数据信息,包括起始位置、长度和所在节点列表等。

     1 package org.apache.hadoop.mapreduce.lib.input;
     2 
     3 // ...
     4 
     5 public class FileSplit extends InputSplit implements Writable {
     6 
     7   // ...
     8 
     9   /** Constructs a split with host information
    10    *
    11    * @param file the file name
    12    * @param start the position of the first byte in the file to process
    13    * @param length the number of bytes in the file to process
    14    * @param hosts the list of hosts containing the block, possibly null
    15    */
    16   public FileSplit(Path file, long start, long length, String[] hosts) {
    17     this.file = file;
    18     this.start = start;
    19     this.length = length;
    20     this.hosts = hosts;
    21   }
    22 
    23   // ...
    24 }

        d) InputSplit大小计算:minSize取自mapred-site.xml参数“mapreduce.input.fileinputformat.split.minsize”,默认1;maxSize取自mapred-site.xml参数“mapreduce.input.fileinputformat.split.maxsize”,默认Long.MAX_VALUEblockSize取自hdfs-site.xml参数“dfs.block.size”。所以,使用默认配置时,InputSplit大小为块大小。

     1 package org.apache.hadoop.mapreduce.lib.input;
     2 
     3 // ...
     4 
     5 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     6 
     7   // ...
     8 
     9   protected long computeSplitSize(long blockSize, long minSize,
    10                                   long maxSize) {
    11     return Math.max(minSize, Math.min(maxSize, blockSize));
    12   }
    13 
    14   // ...
    15 }

        e) input数量计算:文件大小÷InputSplit大小。

        f) 对齐:任务调度时,优先考虑本节点的数据,如果本节点没有可处理的数据或还需其他节点数据,Map任务所在节点会从其他节点将数据网络传输给自己。当InputSplit大小大于块大小时,Map任务会从其他节点读取一部分数据,就无法实现完全数据本地化,所以InputSplit大小应等于块大小。

        g) map函数输入:通过InputFormat.createRecordReader()方法将InputSplit解析为键值对。

     1 package org.apache.hadoop.mapreduce.lib.input;
     2 
     3 // ...
     4 
     5 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
     6 
     7   // ...
     8 
     9   @Override
    10   public RecordReader<LongWritable, Text> 
    11     createRecordReader(InputSplit split,
    12                        TaskAttemptContext context) {
    13     String delimiter = context.getConfiguration().get(
    14         "textinputformat.record.delimiter");
    15     byte[] recordDelimiterBytes = null;
    16     if (null != delimiter)
    17       recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    18     return new LineRecordReader(recordDelimiterBytes);
    19   }
    20 
    21   // ...
    22 }

    4. map阶段

        a) 写缓冲区:map函数输出时,先写到内存环形缓冲区,并做一次预排序;每个Map任务都有一个环形缓冲区,大小取自mapred-site.xml参数“mapreduce.task.io.sort.mb”,默认100MB

        b) 缓冲区溢写磁盘:当环形缓冲区达到阀值(mapred-site.xml参数“mapreduce.map.sort.spill.percent”,默认0.80)时,一个后台线程将缓冲区内容溢写(spill)到磁盘mapred-site.xml参数“mapreduce.cluster.local.dir”指定的目录,默认${hadoop.tmp.dir}/mapred/local

        c) 溢写磁盘前排序:溢写磁盘前,线程根据数据要传送的Reducer对缓冲区数据分区(默认按键),每个分区内再按键排序。

        d) Combiner:当已指定Combiner且溢写次数至少3次时,在溢写磁盘前执行Combiner;效果是对map函数输出的中间结果进行一次合并,作用与reduce函数一样;目的是降低中间结果数据量(中间结果要写磁盘且通过网络传至Reducer),提升运行效率。

        e) 压缩:对中间结果压缩,目的与Combiner相同;mapred-site.xml参数“mapreduce.map.output.compress”,默认false,参数“mapreduce.map.output.compress.codec”。

    压缩格式

    算法

    文件扩展名

    可切分

    codec

    说明

    Deflate

    Deflate

    .deflate

    org.apache.hadoop.io.compress.DeflateCodec

    gzip

    Deflate

    .gz

    org.apache.hadoop.io.compress.GzipCodec

    bzip2

    bzip2

    .bz2

    org.apache.hadoop.io.compress.BZip2Codec

    LZO

    LZOP

    .lzo

    com.hadoop.compression.lzo.LzopCodec

    Snappy

    Snappy

    .snappy

    org.apache.hadoop.io.compress.SnappyCodec

    高压缩、高速,推荐

        f) 中间结果传输:中间结果通过HTTP方式传至Reducer,传输工作线程数配置mapred-site.xml参数“mapreduce.tasktracker.http.threads”,默认40

    5. shuffle阶段

        a) copy:一个Reduce任务可能需多个Map任务输出,而每个Map任务完成时间很可能不同,当只要有一个Map任务完成,Reduce任务即开始复制其输出;复制线程数配置mapred-site.xml参数“mapreduce.reduce.shuffle.parallelcopies”,默认5

        b) copy的缓冲区:如果map输出相当小,数据先被复制到Reducer所在节点的内存缓冲区(大小配置mapred-site.xml参数“mapreduce.reduce.shuffle.input.buffer.percent”,默认0.70),当内存缓冲区大小达到阀值(mapred-site.xml参数“mapreduce.reduce.shuffle.merge.percent”,默认0.66)或内存缓冲区文件数达到阀值(mapred-site.xml参数“mapreduce.reduce.merge.inmem.threshold”,默认1000)时,则合并后溢写磁盘。否则,map输出被复制到磁盘。

        c) copy的合并:随复制到磁盘的文件增多,后台线程将其合并为更大、有序的文件,为后续合并节约时间。合并时,压缩的中间结果将在内存中解压缩。

        d) sort:复制完成所有map输出后,合并map输出文件并归并排序。

        e) sort的合并:将map输出文件合并,直至合并因子(mapred-site.xml参数“mapreduce.task.io.sort.factor”,默认10)。例如,有50map输出文件,进行5次合并,每次将10个文件合并成一个文件,最后5个文件。

    6. reduce阶段

        a) reduce函数输入:经过shuffle的文件都是按键分区且有序,相同分区的文件调用一次reduce函数。

        b) reduce函数输出:一般为HDFS

    7. 排序

        a) MapReduce中的排序算法:快速排序、归并排序。

        b) MapReduce发生的3次排序:map阶段的缓冲区排序(快速排序算法);map阶段溢写磁盘前,对溢写文件合并时的排序(归并排序算法);shuffle阶段的文件合并sort(归并排序算法)。

    容错机制

    1. 任务错误:对错误任务不断重试,直到总尝试次数超过N次认为彻底失败;Map任务、Reduce任务总尝试次数分别为mapred-site.xml参数“mapreduce.map.maxattempts”和“mapreduce.reduce.maxattempts”,默认均为4

    2. ApplicationMaster错误:通过YARN容错机制完成。

    3. NodeManager错误:通过YARN容错机制完成。

    4. ResourceManager错误:通过YARN容错机制完成。

    API

    概况

    1. 新旧API

        a) 旧API包:org.apache.hadoop.mapred

        b) 新API包:org.apache.hadoop.mapreduce

    2. Maven依赖

    1 <dependency>
    2     <groupId>org.apache.hadoop</groupId>
    3     <artifactId>hadoop-client</artifactId>
    4     <version>2.6.5</version>
    5 </dependency>

    WordCount示例

    1. Mapper

     1 package mr.wordcount;
     2 
     3 import java.io.IOException;
     4 import java.util.StringTokenizer;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.LongWritable;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapreduce.Mapper;
     9 
    10 public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    11 
    12     private final static IntWritable one = new IntWritable(1);
    13     
    14     private Text word = new Text();
    15 
    16     @Override
    17     protected void map(LongWritable key, Text value,
    18             Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    19             throws IOException, InterruptedException {
    20         StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
    21         while (stringTokenizer.hasMoreTokens()) {
    22             word.set(stringTokenizer.nextToken());
    23             context.write(word, one);
    24         }
    25     }
    26     
    27 }

    2. Reducer类

     1 package mr.wordcount;
     2 
     3 import java.io.IOException;
     4 import org.apache.hadoop.io.IntWritable;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Reducer;
     7 
     8 public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
     9 
    10     private IntWritable result = new IntWritable();
    11     
    12     @Override
    13     protected void reduce(Text key, Iterable<IntWritable> values,
    14             Reducer<Text, IntWritable, Text, IntWritable>.Context context)
    15             throws IOException, InterruptedException {
    16         int sum = 0;
    17         for (IntWritable value : values) {
    18             sum += value.get();
    19         }
    20         result.set(sum);
    21         context.write(key, result);
    22     }
    23 
    24 }

    3. main方法

     1 package mr.wordcount;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Job;
     8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    10 
    11 public class WordCount {
    12 
    13     public static void main(String[] args) throws Exception {
    14         if (args.length != 2) {
    15             System.err.println("Usage: wordcount <in> <out>");
    16             System.exit(2);
    17         }
    18         
    19         Configuration conf = new Configuration();
    20         Job job = Job.getInstance(conf, "WordCount");
    21         job.setJarByClass(WordCount.class);
    22         job.setMapperClass(TokenizerMapper.class);
    23         job.setCombinerClass(IntSumReducer.class);
    24         job.setReducerClass(IntSumReducer.class);
    25         job.setOutputKeyClass(Text.class);
    26         job.setOutputValueClass(IntWritable.class);
    27         FileInputFormat.addInputPath(job, new Path(args[0]));
    28         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    29         System.exit(job.waitForCompletion(true) ? 0 : 1);
    30     }
    31 
    32 }

    4. 执行命令

    hadoop jar wordcount.jar mr.wordcount.WordCount /test/wordcount/in /test/wordcount/out
    hadoop fs -ls /test/wordcount/out
    hadoop fs -cat /test/wordcount/out/part-r-00000

    Writable接口

    1. 职责:Hadoop序列化格式。

    2. Hadoop序列化场景:IPC(进程间通信)、数据持久化。

    3. 源码(2个重要方法)

     1 package org.apache.hadoop.io;
     2 
     3 // ...
     4 
     5 public interface Writable {
     6   /** 
     7    * Serialize the fields of this object to <code>out</code>.
     8    * 
     9    * @param out <code>DataOuput</code> to serialize this object into.
    10    * @throws IOException
    11    */
    12   void write(DataOutput out) throws IOException;
    13 
    14   /** 
    15    * Deserialize the fields of this object from <code>in</code>.  
    16    * 
    17    * <p>For efficiency, implementations should attempt to re-use storage in the 
    18    * existing object where possible.</p>
    19    * 
    20    * @param in <code>DataInput</code> to deseriablize this object from.
    21    * @throws IOException
    22    */
    23   void readFields(DataInput in) throws IOException;
    24 }

    4. 内置实现类

        a) 包:org.apache.hadoop.io

        b) 类图

        c) 与Java类型对照

    Java类型

    Writable实现

    备注

    null

    NullWritable

    序列化长度为0,充当占位符

    String

    Text

    boolean

    BooleanWritable

    byte

    ByteWritable

    int

    IntWritableVIntWritable

    V开头表示变长,否则定长

    long

    LongWritableVLongWritable

    V开头表示变长,否则定长

    float

    FloatWritable

    double

    DoubleWritable

     

    5. 自定义实现类 

     1 import java.io.DataInput;
     2 import java.io.DataOutput;
     3 import java.io.IOException;
     4 import org.apache.hadoop.io.Text;
     5 import org.apache.hadoop.io.WritableComparable;
     6 
     7 public class TextPair implements WritableComparable<TextPair> {
     8     
     9     private Text first;
    10     
    11     private Text second;
    12 
    13     public TextPair() {
    14         this(new Text(), new Text());
    15     }
    16 
    17     public TextPair(Text first, Text second) {
    18         this.first = first;
    19         this.second = second;
    20     }
    21 
    22     public void write(DataOutput out) throws IOException {
    23         first.write(out);
    24         second.write(out);
    25     }
    26 
    27     public void readFields(DataInput in) throws IOException {
    28         first.readFields(in);
    29         second.readFields(in);
    30     }
    31 
    32     // 用于MapReduce过程中的排序
    33     public int compareTo(TextPair o) {
    34         int result = first.compareTo(o.first);
    35         if (result == 0) {
    36             result = second.compareTo(o.second);
    37         }
    38         return result;
    39     }
    40 
    41     public Text getFirst() {
    42         return first;
    43     }
    44 
    45     public void setFirst(Text first) {
    46         this.first = first;
    47     }
    48 
    49     public Text getSecond() {
    50         return second;
    51     }
    52 
    53     public void setSecond(Text second) {
    54         this.second = second;
    55     }
    56 
    57 }

    Mapper类

     1. 源码(4个重要方法)

     1 package org.apache.hadoop.mapreduce;
     2 
     3 // ...
     4 
     5 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     6 
     7   /**
     8    * The <code>Context</code> passed on to the {@link Mapper} implementations.
     9    */
    10   public abstract class Context
    11     implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    12   }
    13   
    14   /**
    15    * Called once at the beginning of the task.
    16    */
    17   protected void setup(Context context
    18                        ) throws IOException, InterruptedException {
    19     // NOTHING
    20   }
    21 
    22   /**
    23    * Called once for each key/value pair in the input split. Most applications
    24    * should override this, but the default is the identity function.
    25    */
    26   @SuppressWarnings("unchecked")
    27   protected void map(KEYIN key, VALUEIN value, 
    28                      Context context) throws IOException, InterruptedException {
    29     context.write((KEYOUT) key, (VALUEOUT) value);
    30   }
    31 
    32   /**
    33    * Called once at the end of the task.
    34    */
    35   protected void cleanup(Context context
    36                          ) throws IOException, InterruptedException {
    37     // NOTHING
    38   }
    39   
    40   /**
    41    * Expert users can override this method for more complete control over the
    42    * execution of the Mapper.
    43    * @param context
    44    * @throws IOException
    45    */
    46   public void run(Context context) throws IOException, InterruptedException {
    47     setup(context);
    48     try {
    49       while (context.nextKeyValue()) {
    50         map(context.getCurrentKey(), context.getCurrentValue(), context);
    51       }
    52     } finally {
    53       cleanup(context);
    54     }
    55   }
    56 }

    2. run方法:setup→map→cleanup的执行模板。

    Reducer类

    1. 源码

     1 package org.apache.hadoop.mapreduce;
     2 
     3 // ...
     4 
     5 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     6 
     7   /**
     8    * The <code>Context</code> passed on to the {@link Reducer} implementations.
     9    */
    10   public abstract class Context 
    11     implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    12   }
    13 
    14   /**
    15    * Called once at the start of the task.
    16    */
    17   protected void setup(Context context
    18                        ) throws IOException, InterruptedException {
    19     // NOTHING
    20   }
    21 
    22   /**
    23    * This method is called once for each key. Most applications will define
    24    * their reduce class by overriding this method. The default implementation
    25    * is an identity function.
    26    */
    27   @SuppressWarnings("unchecked")
    28   protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
    29                         ) throws IOException, InterruptedException {
    30     for(VALUEIN value: values) {
    31       context.write((KEYOUT) key, (VALUEOUT) value);
    32     }
    33   }
    34 
    35   /**
    36    * Called once at the end of the task.
    37    */
    38   protected void cleanup(Context context
    39                          ) throws IOException, InterruptedException {
    40     // NOTHING
    41   }
    42 
    43   /**
    44    * Advanced application writers can use the 
    45    * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
    46    * control how the reduce task works.
    47    */
    48   public void run(Context context) throws IOException, InterruptedException {
    49     setup(context);
    50     try {
    51       while (context.nextKey()) {
    52         reduce(context.getCurrentKey(), context.getValues(), context);
    53         // If a back up store is used, reset it
    54         Iterator<VALUEIN> iter = context.getValues().iterator();
    55         if(iter instanceof ReduceContext.ValueIterator) {
    56           ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
    57         }
    58       }
    59     } finally {
    60       cleanup(context);
    61     }
    62   }
    63 }

    2. run方法:setup→reduce→cleanup的执行模板。

    Partitioner抽象类

    1. 场景:控制shuffle,即控制中间结果分发的目的Reducer

    2. 源码

     1 package org.apache.hadoop.mapreduce;
     2 
     3 // ...
     4 
     5 public abstract class Partitioner<KEY, VALUE> {
     6   
     7   /** 
     8    * Get the partition number for a given key (hence record) given the total 
     9    * number of partitions i.e. number of reduce-tasks for the job.
    10    *   
    11    * <p>Typically a hash function on a all or a subset of the key.</p>
    12    *
    13    * @param key the key to be partioned.
    14    * @param value the entry value.
    15    * @param numPartitions the total number of partitions.
    16    * @return the partition number for the <code>key</code>.
    17    */
    18   public abstract int getPartition(KEY key, VALUE value, int numPartitions);
    19   
    20 }

    3. 内置子类

        a) org.apache.hadoop.mapreduce.lib.partition.HashPartitioner:按键的Hash值分区,默认。

     1 package org.apache.hadoop.mapreduce.lib.partition;
     2 
     3 // ...
     4 
     5 public class HashPartitioner<K, V> extends Partitioner<K, V> {
     6 
     7   /** Use {@link Object#hashCode()} to partition. */
     8   public int getPartition(K key, V value,
     9                           int numReduceTasks) {
    10     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    11   }
    12 
    13 }

        b) org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner:按键分区,使得每个Reduce任务处理一个键连续区间(如1~10001000~2000……)的数据,最终输出是全局有序的;如果键对应的数据分布不均匀,则导致部分Reduce任务完成时间变长。

    4. 自定义子类

     1 import org.apache.hadoop.io.IntWritable;
     2 import org.apache.hadoop.io.Text;
     3 import org.apache.hadoop.mapreduce.Partitioner;
     4 
     5 public class MyPartitioner extends Partitioner<Text, IntWritable> {
     6 
     7     @Override
     8     public int getPartition(Text key, IntWritable value, int numReduceTasks) {
     9         return ((new Boolean(value.get() > 10000)).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    10     }
    11 
    12 }
    1 // main方法设置Partitioner
    2 job.setPartitionerClass(MyPartitioner.class);

    WritableComparator接口

    1. 场景:控制MapReduce过程中第2次和第3次排序的排序规则。

    2. 内置实现类:org.apache.hadoop.io.WritableComparator

    3. 自定义实现类

     1 import org.apache.hadoop.io.IntWritable;
     2 import org.apache.hadoop.io.WritableComparable;
     3 import org.apache.hadoop.io.WritableComparator;
     4 
     5 public class MyWritableComparator extends WritableComparator {
     6 
     7     protected MyWritableComparator() {
     8         super(IntWritable.class, true);
     9     }
    10     
    11     @Override
    12     @SuppressWarnings("rawtypes")
    13     public int compare(WritableComparable key1, WritableComparable key2) {
    14         IntWritable x = (IntWritable) key1;
    15         IntWritable y = (IntWritable) key2;
    16         return (x.get() % 5 - y.get() % 5) > 0 ? 1 : -1;
    17     }
    18 
    19 }
    1 // main方法设置WritableComparator
    2 job.setSortComparatorClass(MyWritableComparator.class);
    3 job.setGroupingComparatorClass(MyWritableComparator.class);

    示例:连接

    1. 数据准备(一对多关系)

        a) 学生信息

    Jenny,00001
    Hardy,00002
    Bradley,00003

        b) 选课信息

    00001,Chinese
    00001,Math
    00002,Music
    00002,Math
    00003,Physic

    2. 代码

        a) Mapper类

     1 package mr.join;
     2 
     3 import java.io.IOException;
     4 import org.apache.hadoop.io.LongWritable;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Mapper;
     7 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     8 
     9 public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    10 
    11     public static final String LEFT_PATH = "student_info";
    12     
    13     public static final String RIGHT_PATH = "student_class_info";
    14     
    15     public static final String LEFT_TABLE_FLAG = "lu0001";
    16     
    17     public static final String RIGHT_TABLE_FLAG = "ru0001";
    18     
    19     private Text outKey = new Text();
    20     
    21     private Text outValue = new Text();
    22     
    23     @Override
    24     protected void map(LongWritable key, Text value,
    25             Mapper<LongWritable, Text, Text, Text>.Context context)
    26             throws IOException, InterruptedException {
    27         String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
    28         String tableFlag = null;
    29         String joinKey = null;
    30         String joinValue = null;
    31         
    32         if (filePath.contains(LEFT_PATH)) {
    33             tableFlag = LEFT_TABLE_FLAG;
    34             String[] values = value.toString().split(",");
    35             joinKey = values[1];
    36             joinValue = values[0];
    37             
    38         } else if (filePath.contains(RIGHT_PATH)) {
    39             tableFlag = RIGHT_TABLE_FLAG;
    40             String[] values = value.toString().split(",");
    41             joinKey = values[0];
    42             joinValue = values[1];
    43         } else {
    44             return;
    45         }
    46         
    47         outKey.set(joinKey);
    48         outValue.set(tableFlag + joinValue);
    49         context.write(outKey, outValue);
    50     }
    51 
    52 }

        b) Reducer类 

     1 package mr.join;
     2 
     3 import static mr.join.JoinMapper.LEFT_TABLE_FLAG;
     4 import static mr.join.JoinMapper.RIGHT_TABLE_FLAG;
     5 import java.io.IOException;
     6 import java.util.ArrayList;
     7 import java.util.Iterator;
     8 import java.util.List;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 
    13 public class JoinReducer extends Reducer<Text, Text, Text, NullWritable> {
    14 
    15     private Text result = new Text();
    16     
    17     @Override
    18     protected void reduce(Text key, Iterable<Text> values,
    19             Reducer<Text, Text, Text, NullWritable>.Context context)
    20             throws IOException, InterruptedException {
    21         Iterator<Text> iterator = values.iterator();
    22         List<String> studentClassNames = new ArrayList<>();
    23         String studentName = null;
    24         
    25         while (iterator.hasNext()) {
    26             String value = iterator.next().toString();
    27             if (value.startsWith(LEFT_TABLE_FLAG)) {
    28                 studentName = value.substring(LEFT_TABLE_FLAG.length());
    29             } else if (value.startsWith(RIGHT_TABLE_FLAG)) {
    30                 studentClassNames.add(value.substring(RIGHT_TABLE_FLAG.length()));
    31             }
    32         }
    33         // 笛卡尔积
    34         for (String studentClassName : studentClassNames) {
    35             result.set(studentName + "," + studentClassName);
    36             context.write(result, NullWritable.get());
    37         }
    38     }
    39 
    40 }

        c) main方法

     1 package mr.join;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Job;
     7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     9 
    10 public class Join {
    11 
    12     public static void main(String[] args) throws Exception {
    13         Configuration conf = new Configuration();
    14         Job job = Job.getInstance(conf, "Join");
    15         job.setJarByClass(Join.class);
    16         job.setMapperClass(JoinMapper.class);
    17         job.setReducerClass(JoinReducer.class);
    18         job.setOutputKeyClass(Text.class);
    19         job.setOutputValueClass(Text.class);
    20         FileInputFormat.addInputPath(job, new Path("/test/join/student_info"));
    21         FileInputFormat.addInputPath(job, new Path("/test/join/student_class_info"));
    22         FileOutputFormat.setOutputPath(job, new Path("/test/join/result"));
    23         System.exit(job.waitForCompletion(true) ? 0 : 1);
    24     }
    25 
    26 }

        d) 执行命令 

    hadoop jar join.jar mr.join.Join

        e) 结果

    Jenny,Math
    Jenny,Chinese
    Hardy,Math
    Hardy,Music
    Bradley,Physic

    示例:二次排序

    1. 二次排序:key由两个字段组成,先第1个字段排序,在此基础上再按第2个字段排序。本示例实现先按第1个字段正序,再按第2个字段倒序,类似SQL“order by column1 asc, column2 desc”。

    2. 原理

        a) map阶段后期:先调用job.setPartitionerClass()的类对map函数输出分区;每个分区内再调用job.setSortComparatorClass()的类对key排序;如果未设置job.setSortComparatorClass(),则调用keycompareTo()方法排序。

        b) reduce阶段:先调用job.setSortComparatorClass()的类对key排序,再开始构造key对应的value迭代器,调用job.setGroupingComparatorClass()的类将key相同的value分到相同value迭代器。

    3. 数据准备(column1column2column3

    4,3,h
    4,2,g
    4,1,e
    3,4,b
    2,7,c
    2,3,a
    3,1,f
    3,3,j
    3,2,i
    3,3,d

    4. 代码

        a) Map key

     1 package mr.secondarysort;
     2 
     3 import java.io.DataInput;
     4 import java.io.DataOutput;
     5 import java.io.IOException;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.io.WritableComparable;
     8 
     9 public class CompositeKey implements WritableComparable<CompositeKey> {
    10 
    11     private Text column1 = new Text();
    12     
    13     private Text column2 = new Text();
    14 
    15     @Override
    16     public void write(DataOutput out) throws IOException {
    17         column1.write(out);
    18         column2.write(out);
    19     }
    20 
    21     @Override
    22     public void readFields(DataInput in) throws IOException {
    23         column1.readFields(in);
    24         column2.readFields(in);
    25     }
    26 
    27     // 用于环形缓冲区排序
    28     @Override
    29     public int compareTo(CompositeKey o) {
    30         // 仅按第1个字段排序
    31         return column1.compareTo(o.column1);
    32     }
    33 
    34     public Text getColumn1() {
    35         return column1;
    36     }
    37 
    38     public Text getColumn2() {
    39         return column2;
    40     }
    41 
    42     public void set(String column1, String column2) {
    43         this.column1.set(column1);
    44         this.column2.set(column2);
    45     }
    46     
    47 }

        b) Mapper类

     1 package mr.secondarysort;
     2 
     3 import java.io.IOException;
     4 import org.apache.hadoop.io.LongWritable;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Mapper;
     7 
     8 public class SecondarySortMapper extends Mapper<LongWritable, Text, CompositeKey, Text> {
     9 
    10     private CompositeKey outKey = new CompositeKey();
    11     
    12     private Text outValue = new Text();
    13     
    14     @Override
    15     protected void map(LongWritable key, Text value,
    16             Mapper<LongWritable, Text, CompositeKey, Text>.Context context)
    17             throws IOException, InterruptedException {
    18         String valueString = value.toString();
    19         String[] columns = valueString.split(",");
    20         outKey.set(columns[0], columns[1]);
    21         outValue.set(valueString);
    22         context.write(outKey, outValue);
    23     }
    24 
    25 }

        c) Partitioner类

     1 package mr.secondarysort;
     2 
     3 import org.apache.hadoop.io.Text;
     4 import org.apache.hadoop.mapreduce.Partitioner;
     5 
     6 public class SecondarySortPartitioner extends Partitioner<CompositeKey, Text> {
     7 
     8     @Override
     9     public int getPartition(CompositeKey key, Text value, int numReduceTasks) {
    10         return (key.getColumn1().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    11     }
    12 
    13 }

        d) SortComparator类

     1 package mr.secondarysort;
     2 
     3 import org.apache.hadoop.io.WritableComparable;
     4 import org.apache.hadoop.io.WritableComparator;
     5 
     6 // 用于二次排序
     7 public class SortComparator extends WritableComparator {
     8 
     9     protected SortComparator() {
    10         super(CompositeKey.class, true);
    11     }
    12 
    13     @Override
    14     @SuppressWarnings("rawtypes")
    15     public int compare(WritableComparable key1, WritableComparable key2) {
    16         // 按两个字段排序
    17         CompositeKey compositeKey1 = (CompositeKey) key1;
    18         CompositeKey compositeKey2 = (CompositeKey) key2;
    19         int result = compositeKey1.getColumn1().compareTo(compositeKey2.getColumn1());
    20         if (result == 0) {
    21             // 第2个字段倒序
    22             result = -compositeKey1.getColumn2().compareTo(compositeKey2.getColumn2());
    23         }
    24         return result;
    25     }
    26     
    27 }

        e) GroupingComparator类

     1 package mr.secondarysort;
     2 
     3 import org.apache.hadoop.io.WritableComparable;
     4 import org.apache.hadoop.io.WritableComparator;
     5 
     6 // 用于对value分组
     7 public class GroupingComparator extends WritableComparator {
     8 
     9     protected GroupingComparator() {
    10         super(CompositeKey.class, true);
    11     }
    12 
    13     @Override
    14     @SuppressWarnings("rawtypes")
    15     public int compare(WritableComparable key1, WritableComparable key2) {
    16         // 按第1个字段分组
    17         CompositeKey compositeKey1 = (CompositeKey) key1;
    18         CompositeKey compositeKey2 = (CompositeKey) key2;
    19         return compositeKey1.getColumn1().compareTo(compositeKey2.getColumn1());
    20     }
    21     
    22 }

        f) Reducer类

     1 package mr.secondarysort;
     2 
     3 import java.io.IOException;
     4 import java.util.Iterator;
     5 import org.apache.hadoop.io.NullWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Reducer;
     8 
     9 public class SecondarySortReducer extends Reducer<CompositeKey, Text, Text, NullWritable> {
    10 
    11     private Text result = new Text();
    12     
    13     @Override
    14     protected void reduce(CompositeKey key, Iterable<Text> values,
    15             Reducer<CompositeKey, Text, Text, NullWritable>.Context context)
    16             throws IOException, InterruptedException {
    17         Iterator<Text> iterator = values.iterator();
    18         while (iterator.hasNext()) {
    19             result.set(iterator.next());
    20             context.write(result, NullWritable.get());
    21         }
    22     }
    23 
    24 }

        g) main方法

     1 package mr.secondarysort;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.Path;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Job;
     7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     9 
    10 public class SecondarySort {
    11 
    12     public static void main(String[] args) throws Exception {
    13         Configuration conf = new Configuration();
    14         Job job = Job.getInstance(conf, "SecondarySort");
    15         job.setJarByClass(SecondarySort.class);
    16         job.setMapperClass(SecondarySortMapper.class);
    17         job.setMapOutputKeyClass(CompositeKey.class);
    18         job.setMapOutputValueClass(Text.class);
    19         job.setPartitionerClass(SecondarySortPartitioner.class);
    20         job.setSortComparatorClass(SortComparator.class);
    21         job.setGroupingComparatorClass(GroupingComparator.class);
    22         job.setReducerClass(SecondarySortReducer.class);
    23         job.setOutputKeyClass(Text.class);
    24         job.setOutputValueClass(Text.class);
    25         FileInputFormat.addInputPath(job, new Path("/test/secondarysort/in"));
    26         FileOutputFormat.setOutputPath(job, new Path("/test/secondarysort/out"));
    27         System.exit(job.waitForCompletion(true) ? 0 : 1);
    28     }
    29 
    30 }

    5. 执行命令

    hadoop jar secondarysort.jar mr.secondarysort.SecondarySort

    6. 结果

    2,7,c
    2,3,a
    3,4,b
    3,3,d
    3,3,j
    3,2,i
    3,1,f
    4,3,h
    4,2,g
    4,1,e

    作者:netoxi
    出处:http://www.cnblogs.com/netoxi
    本文版权归作者和博客园共有,欢迎转载,未经同意须保留此段声明,且在文章页面明显位置给出原文连接。欢迎指正与交流。

  • 相关阅读:
    完全备份、差异备份以及增量备份的区别(转)
    Backup Exec Inventory 与Catalog的含义(转载)
    从客户端中检测到有潜在危险的Request.Form值的解决办法
    IQueryable与IEnumberable的区别(转)
    SQL递归查询(with cte as) 物料分解
    Http权威指南笔记(二) Http状态码大全
    Http权威指南笔记(一) URI URL URN 关系
    echarts在.Net中使用实例(二) 使用ajax动态加载数据
    echarts在.Net中使用实例(一) 简单的Demo
    sql显示12个月数据
  • 原文地址:https://www.cnblogs.com/netoxi/p/7223090.html
Copyright © 2011-2022 走看看