zoukankan      html  css  js  c++  java
  • Hadoop:The Definitive Guid 总结 Chapter 7 MapReduce的类型与格式

    MapReduce数据处理模型非常简单:map和reduce函数的输入和输出是键/值对(key/value pair)

    1.MapReduce的类型

    Hadoop的MapReduce一般遵循如下常规格式:

    map(K1, V1) –> list (K2, V2)              

    combine(K2, list(V2)) –> list(K2, V2)

    partition(K2, V2) –> integer              

    reduce(K2, list(V2)) –> list(K3, V3) 

    map:对数据进行抽去过滤数据,组织key/value对等操作.

    combine:为了减少reduce的输入和Hadoop内部网络数据传输负载,需要在map端对输出进行预处理,类似reduce。combine不一定适用任何情况,选用

    partition:将中间键值对划分到一个reduce分区,返回分区索引号。实际上,分区单独由键决定(值是被忽略的),分区内的键会排序,相同的键的所有值会合成一个组(list(V2))

    reduce:每个reduce会处理具有某些特性的键,每个键上都有值的序列,是通过对所有map输出的值进行统计得来的,reduce根据所有map传来的结果,最后进行统计合并操作,并输出结果。

    注:combine与reduce一样时,K3与K2相同,V3与V2相同。

    MapReduce的Java API代码:一般Combine函数与reduce的一样的

    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
            // ...
        }
    
        protected void map(KEYIN key, VALUEIN value, Context context)
                throws IOException, InterruptedException {
            // ...
        }
    }
    
    public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public class Context extends
                ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
            // ...
        }
    
        protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
                throws IOException, InterruptedException {
            // ...
        }
    }

    用于处理中间数据的partition函数 API代码:

    public abstract class Partitioner<KEY, VALUE> {
        public abstract int getPartition(KEY key, VALUE value, int numPartitions);
    }

    关于默认的MapReduce作业

    默认的map是Mapper,是一个泛型类型,简单的将所有输入的值和键原封不动的写到输出中,即输入输出类型相同。

    Mapper的实现

    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        protected void map(KEYIN key, VALUEIN value, Context context)
                throws IOException, InterruptedException {
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }

    默认的 partitioner是HashPartitioner,对每天记录的键进行哈希操作以决定该记录属于那个分区让reduce处理,每个分区对应一个reducer任务,所以分区数等于Job的reduce的个数

    HashPartitioner的实现

    public class HashPartitioner<K, V> extends Partitioner<K, V> {
        public int getPartition(K key, V value, int numReduceTasks) {
            return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
        }
    }

    默认的reduce函数Reducer,也是泛型类型,简单的将所有输入写到输出中。记录在发给reduce之前,会被排序,一般是按照键值的大小排序。reduce的默认输出格式是TextOutputFormat----它将键和值转换成字符串并用Tab进行分割,然后一条记录一行地进行输出。

    Reducer 的实现

    public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) 
                throws IOException, InterruptedException {
            for (VALUEIN value: values) {
                context.write((KEYOUT) key, (VALUEOUT) value);
            }
        }
    }

    选择reduce的个数:一般集群的总共的slot个数等于node的数目乘以每个node上的slot数目,而reduce的数目一般设置为比总slot数目少一些

    默认MapReduce函数实例程序

    public class MinimalMapReduceWithDefaults extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (job == null) {
                return -1;
                }
            
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(Mapper.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setPartitionerClass(HashPartitioner.class);
            job.setNumReduceTasks(1);
            job.setReducerClass(Reducer.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            return job.waitForCompletion(true) ? 0 : 1;
            }
        
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
            System.exit(exitCode);
            }
    }


     

    关于默认的stream作业(Stream概念见第二章)

    stream最简单的形式:

    % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
         -input input/ncdc/sample.txt \
         -output output \
         -mapper /bin/cat

    注意,必须提供一个mappe:默认的identity mapp不能在stream工作

    这里再给出更多设置的stream形式,其他详见权威指南:

    % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
        -input input/ncdc/sample.txt \
        -output output \
        -inputformat org.apache.hadoop.mapred.TextInputFormat \
        -mapper /bin/cat \
        -partitioner org.apache.hadoop.mapred.lib.HashPartitioner \
        -numReduceTasks 1 \
        -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
        -outputformat org.apache.hadoop.mapred.TextOutputFormat

    关于Streaming中的键和值

    Streaming用分隔符用于通过标准输入把key/value对转换为一串比特发送到map或reduce函数

    默认时,用Tab分隔符,也可以根据需要,用配置的分隔符来进行分割,例如:来自输出的键可以由一条记录的前n个字段组成(stream.num.map.output.key.fields或stream.num.reduce.output.key.fields定义),剩下的就是值,eg,输出的是"a,b,c",n=2,则键为"a、b",而值是"c"。Map和Reduce的分隔符是相互独立进行配置的,参见下图

     

    2.输入格式

    1).输入分片与记录

    一个输入分片(input split)是由单个map处理的输入块,即每一个map只处理一个输入分片每个分片被划分为若干个记录(records),每条记录就是一个key/value对,map一个接一个的处理每条记录,输入分片和记录都是逻辑的,不必将他们对应到文件上。注意,一个分片不包含数据本身,而是指向数据的引用和。

    输入分片在Java中被表示为InputSplit借口

    public abstract class InputSplit {
        public abstract long getLength() throws IOException, InterruptedException;
        public abstract String[] getLocations() throws IOException,InterruptedException;
    }

    InputFormat负责创建输入分片并将它们分割成记录,下面就是原型用法:

    public abstract class InputFormat<K, V> {
        public abstract List<InputSplit> getSplits(JobContext context)
                throws IOException, InterruptedException;
    
        public abstract RecordReader<K, V> createRecordReader(InputSplit split,
                TaskAttemptContext context) throws IOException,
                InterruptedException;
    }

    客户端通过调用getSpilts()方法获得分片数目,在TaskTracker或NM上,MapTask会将分片信息传给InputFormat的createRecordReader()方法,进而这个方法来获得这个分片的RecordReader,RecordReader基本就是记录上的迭代器,MapTask用一个RecordReader来生成记录的key/value对,然后再传递给map函数,如下代码

    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        cleanup(context);
    }

    此处的Context实现接口MapContextImpl,并且封装调用RecordReader下面的经过实现的方法,包括nextKeyValue,getCurrentKey,getCurrentValue。nextKeyValue()方法反复被调用用来为mapper生成key/value对,然后把这些key/value传递给map()方法,直到独到stream的末尾,此时nextKeyValue返回false

    A.FileInputFormat类

    FileInputFormat是所有使用文件为数据源的InputFormat实现的基类,它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现,把分片割成记录的作业由其子类来完成。

    下图为InputFormat类的层次结构:

    B.FileInputFormat类输入路径

    FileInputFormat提供四种静态方法来设定Job的输入路径,其中下面的addInputPath()方法addInputPaths()方法可以将一个或多个路径加入路径列表,setInputPaths()方法一次设定完整的路径列表(可以替换前面所设路径)

    public static void addInputPath(Job job, Path path);
    public static void addInputPaths(Job job, String commaSeparatedPaths);
    public static void setInputPaths(Job job, Path... inputPaths);
    public static void setInputPaths(Job job, String commaSeparatedPaths);

    如果需要排除特定文件,可以使用FileInputFormat的setInputPathFilter()设置一个过滤器:

    public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);


     

    C.FileInputFormat类的输入分片

    FileInputFormat类一般之分割超过HDFS块大小的文件,通常分片与HDFS块大小一样,然后分片大小也可以改变的,下面展示了控制分片大小的属性:

    这里数据存储在HDFS上的话,输入分片大小不宜设置比HDFS块更大,原因这样会增加对MapTask来说不是本地文件的块数。

    最大的分片的大小默认是Java long类型的表示的最大值,这样设置的效果:当它的值被设置成小于块大小时,将强制分片比快小(?)

    分片大小公式:

    默认情况

     max(minimumSize, min(maximumSize, blockSize))

    下图距离说明如何控制分片的大小

     minimumSize < blockSize < maximumSize

    D.小文件与CombineFileInputFormat

    CombineFileInputFormat是针对小文件设计的,CombineFileInputFormat会把多个文件打包到一个分片中一边每个mapper可以处理更多的数据;减少大量小文件的另一种方法可以使用SequenceFile将这些小文件合并成一个或者多个大文件。

    CombineFileInputFormat不仅对于处理小文件实际上对于处理大文件也有好处,本质上,CombineFileInputFormat使map操作中处理的数据量与HDFS中文件的块大小之间的耦合度降低了

    CombineFileInputFormat是一个抽象类,没有提供实体类,所以需要实现一个CombineFileInputFormat具体类和getRecordReader()方法(旧的接口是这个方法,新的接口InputFormat中则是createRecordReader(),似乎3rd权威指南在这个地方有些错误)

    E.避免切分

    有些应用程序可能不希望文件被切分,而是用一个mapper完整处理每一个输入文件,有两种方法可以保证输入文件不被切分。第一种方法就是增加最小分片大小,将它设置成大于要处理的最大文件大小,eg:把这个值设置为long.MAXVALUE即可。第二种方法就是使用FileInputFormat具体子类,并且重载isSplitable()方法,把其返回值设置为false,如下所示

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    public class NonSplittableTextInputFormat extends TextInputFormat {
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }
    }

    F.mapper中的文件信息

    处理文件输入分片的mapper可以从文件配置对象的某些特定属性中读入输入分片的有关信息,这可以通过在mapper实现中实现configure()方法来获取作业配置对象JobConf,下图显示了文件输入分片的属性

    G.把整个文件作为一条记录处理

    有时,mapper需要访问问一个文件中的全部内容。即使不分割文件,仍然需要一个RecordReader来读取文件内容为record的值,下面给出实现这个功能的完整程序,详细解释见权威指南

    InputFormat的实现类WholeFileInputFormat

    public class WholeFileInputFormat extends
            FileInputFormat<NullWritable, BytesWritable> {
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }
    
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(
                InputSplit split, TaskAttemptContext context) throws IOException,
                InterruptedException {
            WholeFileRecordReader reader = new WholeFileRecordReader();
            reader.initialize(split, context);
            return reader;
        }
    }

    WholeFileRecordReader的实现:WholeFileInputFormat使用RecordReader将整个文件读为一条记录

    class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            this.fileSplit = (FileSplit) split;
            this.conf = context.getConfiguration();
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!processed) {
                byte[] contents = new byte[(int) fileSplit.getLength()];
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try {
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents, 0, contents.length);
                } finally {
                    IOUtils.closeStream(in);
                }
                processed = true;
                return true;
            }
            return false;
        }
    
        @Override
        public NullWritable getCurrentKey() throws IOException,
                InterruptedException {
            return NullWritable.get();
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException,
                InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException {
            return processed ? 1.0f : 0.0f;
        }
    
        @Override
        public void close() throws IOException {
            // do nothing
        }
    }

    将若干个小文件打包成顺序文件的MapReduce程序

    public class SmallFilesToSequenceFileConverter extends Configured implements
            Tool {
        static class SequenceFileMapper extends
                Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
            private Text filenameKey;
    
            @Override
            protected void setup(Context context) throws IOException,
                    InterruptedException {
                InputSplit split = context.getInputSplit();
                Path path = ((FileSplit) split).getPath();
                filenameKey = new Text(path.toString());
            }
    
            @Override
            protected void map(NullWritable key, BytesWritable value,
                    Context context) throws IOException, InterruptedException {
                context.write(filenameKey, value);
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (job == null) {
                return -1;
            }
            job.setInputFormatClass(WholeFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
            job.setMapperClass(SequenceFileMapper.class);
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),
                    args);
            System.exit(exitCode);
        }
    }

    由于输入格式为WholeFileInputFormat,所以mapper只需要找到文件输入分片的文件名。

    2).文本输入

    A.TextInputFormat

    TextInputFormat是默认的InputFormat。每条记录是一行输入。key是LongWritable类型,存储该行在整个文件中的字节偏移量,value是这行的内容,不包括任何终止符(换行符和回车符),它是Text类型

    如下例

    On the top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.

    每条记录表示以下key/value对

    (0, On the top of the Crumpetty Tree)
    (33, The Quangle Wangle sat,)
    (57, But his face you could not see,)
    (89, On account of his Beaver Hat.)

    上面的key明显不是行号,因为每个分片需要单独处理的原因,行号只是一个片内的顺序标记,所以在分片内在的行号是可以的,而在文件中是很难办到的。然而为了使key是唯一的,我们可以利用已知的上一个分片的大小,计算出当前位置在整个文件中的偏移量(不是行号),这样加上文件名,就能确定唯一key,如果行固定长,就可以算出行号

    PS:因为FileInputFormat定义的是逻辑结构,不能匹配HDFS块大小,所以TextFileInputFormat的以行为单位的逻辑记录中,很有可能某一行是跨文件块存储的,如下所示

    B.KeyValueTextInputFormat

    对下面的文本,KeyValueTextInputFormat比较适合处理,其中可以通过mapreduce.input.keyvaluelinerecordreader.key.value.separator属性设置指定分隔符,默认值为制表符,以下指定"→"为分隔符

    line1→On the top of the Crumpetty Tree
    line2→The Quangle Wangle sat,
    line3→But his face you could not see,
    line4→On account of his Beaver Hat.

    C.NLineInputFormat

    如果希望mapper收到固定行数的输入,需要使用NLineInputFormat作为InputFormat。与TextInputFormat一样,key是文件中行的字节偏移量,值是行本身。

    N是每个mapper收到的输入行数,默认时N=1,每个mapper会正好收到一行输入,mapreduce.input.lineinputformat.linespermap属性控制N的值。以刚才的文本为例:

    On the top of the Crumpetty Tree
    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.

    例如,如果N=2,则每个输入分片包括两行。第一个mapper会收到前两行key/value对:

    (0, On the top of the Crumpetty Tree)
    (33, The Quangle Wangle sat,)

    另一个mapper则收到:

    (57, But his face you could not see,)
    (89, On account of his Beaver Hat.)

    3).二进制输入

    A.SequenceFileInputFormat

    如果要用顺序文件数据作为MapReduce的输入,应用SequenceFileInputFormat。key和value顺序文件,所以要保证map输入的类型匹配

    虽然从名称看不出来,但是SequenceFileInputFormat可以读MapFile和SequenceFile,如果在处理顺序文件时遇到目录,SequenceFileInputFormat类会认为值正在读MapFile,使用的是其数据文件,因此没有MapFileInputFormat类是自然的

    B.SequenceFileAsTextInputFormat和SequenceFileAsBinaryInputFormat

    两者均是SequenceFileInputFormat的变体,前者将顺序文件(其实就是SequenceFile)的key和value转成Text对象,后者获取顺序文件的key和value作为二进制对象

    4).多种输入

    对于不同格式,不同表示的文本文件输出的处理,可以用MultipleInputs类里处理,它允许为每条输入路径指定InputFormat和Mapper,例如,下满对Met Office和NCDC两种不同格式的气象数据放在一起进行处理:

    MultipleInputs.addInputPath(job, ncdcInputPath,TextInputFormat.class, MaxTemperatureMapper.class);
    MultipleInputs.addInputPath(job, metOfficeInputPath,TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);

    两个数据源的行格式不同,所以使用了两个不同的mapper

    MultipleInputs类有一个重载版本的addInputPath()方法:

    public static void addInputPath(Job job, Path path,Class<? extends InputFormat> inputFormatClass)

    在有多种输入格式只有一个mapper时候(调用Job的setMapperClass()方法),这个方法会很有用

    另外还有一个用于食用JDBC从关系数据库中读取数据的输入格式--DBInputFormat(参见权威指南)

    3.输出格式

    OutputFormat类的层次结构:

    1).文本输出

    默认输出格式是TextOutputFormat,它本每条记录写成文本行,key/value任意,这里key和value可以用制表符分割,用mapreduce.output.textoutputformat.separator书信可以改变制表符,与TextOutputFormat对应的输入格式是KeyValueTextInputFormat

    可以使用NullWritable来省略输出的key和value。

    2).二进制输出

    A.SequenceFileOutputFormat

    SequenceFileOutputFormat将它的输出写为一个顺序文件,因为它的格式紧凑,很容易被压缩,所以易于作为MapReduce的输入

    B.SequenceFileAsBinaryOutputFormat和MapFileOutputFormat

    前者把key/value对作为二进制格式写到一个SequenceFile容器中,后者把MapFile作为输出,MapFile中的key必需顺序添加,所以必须确保reducer输出的key已经排好序。

    3).多个输出--MultipleOutputs类

    有时可能需要对输出的把文件名进行控制,或让每个reducer输出多个文件。MapReduce为此提供了库:MultipleOutputs类

    MultipleOutputs允许我们依据输出的key和value或者二进制string命名输出的文件名,如果为map输出的文件,则文件名的格式为"name-m-nnnnn", 如果reduce输出的文件,则文件名的格式为"name-r-nnnnn",其中"name"由MapReduce程序决定,”nnnnn“为part从0开始的整数编号,part编号确保从不同分区的输出(mapper或reducer)生成的文件名字不会冲突

    下面的程序使用MultipleOutputs类将整个数据集切分为以气象站ID命名的文件

    public class PartitionByStationUsingMultipleOutputs extends Configured
            implements Tool {
        static class StationMapper extends Mapper<LongWritable, Text, Text, Text> {
            private NcdcRecordParser parser = new NcdcRecordParser();
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
    
                parser.parse(value);
                context.write(new Text(parser.getStationId()), value);
            }
        }
    
        static class MultipleOutputsReducer extends
                Reducer<Text, Text, NullWritable, Text> {
            private MultipleOutputs<NullWritable, Text> multipleOutputs;
    
            @Override
            protected void setup(Context context) throws IOException,
                    InterruptedException {
                multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
            }
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context)
                    throws IOException, InterruptedException {
                for (Text value : values) {
                    multipleOutputs
                            .write(NullWritable.get(), value, key.toString());
                }
            }
    
            @Override
            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                multipleOutputs.close();
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (job == null) {
                return -1;
            }
            job.setMapperClass(StationMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setReducerClass(MultipleOutputsReducer.class);
            job.setOutputKeyClass(NullWritable.class);
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(
                    new PartitionByStationUsingMultipleOutputs(), args);
            System.exit(exitCode);
        }
    }

    程序解释略,运行后有以下结果:

    output/010010-99999-r-00027
    output/010050-99999-r-00013
    output/010100-99999-r-00015
    output/010280-99999-r-00014
    output/010550-99999-r-00000
    output/010980-99999-r-00011
    output/011060-99999-r-00025
    output/012030-99999-r-00029
    output/012350-99999-r-00018
    output/012620-99999-r-00004

    我们还可以适当 改变MultipleOutputs类中的write方法中的路径名参数,来得到我们想要输出文件名(例如:029070-99999/1901/part-r-00000),有下面程序:

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            parser.parse(value);
            String basePath = String.format("%s/%s/part",
                    parser.getStationId(), parser.getYear());
            multipleOutputs.write(NullWritable.get(), value, basePath);
        }
    }

    4).延时输出

    有些文件应用倾向于不创建空文件,此时就可以利用LazyOutputFormat,它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正的创建文件,要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法.

  • 相关阅读:
    第六次实训作业
    事件处理程序
    第四次实训作业
    I/O流
    课程总结
    求和计算器
    常用类的课后作业
    窗口实训1
    课后练习----实现窗口的切换
    第五次实训作业继承
  • 原文地址:https://www.cnblogs.com/biyeymyhjob/p/2633608.html
Copyright © 2011-2022 走看看