zoukankan      html  css  js  c++  java
  • Hadoop优化&新特性

    1 Hadoop数据压缩

    1.1 概述

      压缩技术能够有效减少底层存储系统(HDFS)读写字节数,压缩提高了网络带宽和磁盘空间效率。在运行MR程序时,I/O操作、网络传输、Shuffle和Merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要。

      鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常重要,可以在任意MapReduce阶段启用压缩。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。

      压缩是提升Hadoop运行效率的一种优化策略,通过对Mapper、Reduce运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。

      注意:采用压缩技术减少了磁盘IO,但同时增加了CPU运算负担,所以,压缩特征运用得当可以提升性能,反之亦然。

      压缩原则:运算密集型的job少用压缩,IO密集型的job多用压缩

    1.2 MR支持的压缩编码

    压缩格式

    hadoop自带?

    算法

    文件扩展名

    是否可切分

    换成压缩格式后,原来的程序是否需要修改

    DEFLATE

    是,直接使用

    DEFLATE

    .deflate

    和文本处理一样,不需要修改

    Gzip

    是,直接使用

    DEFLATE

    .gz

    和文本处理一样,不需要修改

    bzip2

    是,直接使用

    bzip2

    .bz2

    和文本处理一样,不需要修改

    LZO

    否,需要安装

    LZO

    .lzo

    需要建索引,还需要指定输入格式

    Snappy

    否,需要安装

    Snappy

    .snappy

    和文本处理一样,不需要修改

      为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示。

    压缩格式

    对应的编码/解码器

    DEFLATE

    org.apache.hadoop.io.compress.DefaultCodec

    gzip

    org.apache.hadoop.io.compress.GzipCodec

    bzip2

    org.apache.hadoop.io.compress.BZip2Codec

    LZO

    com.hadoop.compression.lzo.LzopCodec

    Snappy

    org.apache.hadoop.io.compress.SnappyCodec

      压缩性能的比较

    压缩算法

    原始文件大小

    压缩文件大小

    压缩速度

    解压速度

    gzip

    8.3GB

    1.8GB

    17.5MB/s

    58MB/s

    bzip2

    8.3GB

    1.1GB

    2.4MB/s

    9.5MB/s

    LZO

    8.3GB

    2.9GB

    49.3MB/s

    74.6MB/s

      Snappy官网:http://google.github.io/snappy/

      On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

    1.3 压缩方式选择

    1.3.1 Gzip压缩

      优点:压缩率高,而且压缩/解压缩速度也比较快;Hadoop本身支持,在应用中处理gzip格式的文件就跟直接处理文本一样;大部分Linux系统都自带gzip命令,使用方便

      缺点:不支持Split

      应用场景:当每个文件压缩之后在130M以内的(一个块大小内),都可以考虑gzip压缩格式。比如将一天或者一个小时的日志压缩成一个gzip文件

    1.3.2 Bzip2压缩

      优点:支持Split,具有很高的压缩率,比gzip压缩率高,Hadoop本身自带,使用方便

      缺点:压缩/解压速度慢

      应用场景:适合对速度要求不高,但需要较高压缩率时;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文件想压缩减少存储空间,同时又需要支持Split,而且兼容之前的应用程序的情况

    1.3.3 Lzo压缩

      优点:压缩/解压速度也比较快,合理的压缩率;支持Split,是Hadoop中最流行的压缩格式;可以在Linux系统下安装 lzop 命令,使用方便

      缺点:压缩率比gzip要低一些;Hadoop本身不支持,需要安装;在应用中对 lzo 格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为 lzo 格式)

      应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo 优点越明显

    1.3.4 Snappy压缩

      优点:高速压缩速度和合理的的压缩速度

      缺点:不支持Split;压缩率比gzip要低;Hadoop本身不支持,需要安装

      应用场景:当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入

    1.4 压缩位置选择

      压缩可以在MapReduce作用的任意阶段启用。

    1.5 压缩参数配置

      要在Hadoop中启用压缩,可以配置如下参数:

    参数

    默认值

    阶段

    建议

    io.compression.codecs   

    (在core-site.xml中配置)

    org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec

     

    输入压缩

    Hadoop使用文件扩展名判断是否支持某种编解码器

    mapreduce.map.output.compress(在mapred-site.xml中配置)

    false

    mapper输出

    这个参数设为true启用压缩

    mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

    org.apache.hadoop.io.compress.DefaultCodec

    mapper输出

    企业多使用LZO或Snappy编解码器在此阶段压缩数据

    mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

    false

    reducer输出

    这个参数设为true启用压缩

    mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

    org.apache.hadoop.io.compress.DefaultCodec

    reducer输出

    使用标准工具或者编解码器,如gzip和bzip2

    mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)

    RECORD

    reducer输出

    SequenceFile输出使用的压缩类型:NONE和BLOCK

    1.6 压缩实操案例

    1.6.1 数据流的压缩和解压缩

      测试一下如下压缩方式

    DEFLATE

    org.apache.hadoop.io.compress.DefaultCodec

    gzip

    org.apache.hadoop.io.compress.GzipCodec

    bzip2

    org.apache.hadoop.io.compress.BZip2Codec

      1)新建TestCompress.java

    package com.yuange.mapreduce.compress;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionInputStream;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.junit.Test;
    
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    
    public class TestCompress {
    
        /*
        * 压缩
        * */
        @Test
        public void test() throws IOException {
            //创建输入流
            FileInputStream fis = new FileInputStream("D:\\io\\input2\\hello.txt");
            /*
            * newInstance(Class<T> theClass, Configuration conf)
            * 获取压缩类型
            * */
            GzipCodec gzipCodec = ReflectionUtils.newInstance(GzipCodec.class, new Configuration());
            //创建输出流
            CompressionOutputStream outputStream = gzipCodec.createOutputStream(new FileOutputStream("D:\\io\\input2\\hello.txt" +
                    gzipCodec.getDefaultExtension()));
            //拷贝
            /*
            * copyBytes(InputStream in, OutputStream out,
                                   int buffSize, boolean close)
            * */
            IOUtils.copyBytes(fis,outputStream,1024,true);
        }
    
        /*
            解压缩
        */
        @Test
        public void test2() throws IOException {
            //创建解压缩对象
            GzipCodec gzipCodec = ReflectionUtils.newInstance(GzipCodec.class, new Configuration());
            /*
            * createInputStream(InputStream in)
            * 创建输入流
             * */
            CompressionInputStream inputStream = gzipCodec.createInputStream(
                    new FileInputStream("D:\\io\\input2\\hello.txt.gz"));
            //创建输出流
            FileOutputStream fos = new FileOutputStream("D:\\io\\input3\\aaa.txt");
            //拷贝
            IOUtils.copyBytes(inputStream,fos,1024,true);
        }
    }

      2)新建TestCompress.java

    package com.yuange.mapreduce.compress;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.io.compress.CompressionInputStream;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    public class TestCompress2 {
    
        public static void main(String[] args) throws Exception {
            compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
    //        decompress("e:/hello.txt.bz2");
        }
    
        // 1、压缩
        private static void compress(String filename, String method) throws Exception {
            // (1)获取输入流
            FileInputStream fis = new FileInputStream(new File(filename));
            Class codecClass = Class.forName(method);
            CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
            // (2)获取输出流
            FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
            CompressionOutputStream cos = codec.createOutputStream(fos);
            // (3)流的对拷
            IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
            // (4)关闭资源
            cos.close();
            fos.close();
            fis.close();
        }
    
        // 2、解压缩
        private static void decompress(String filename) throws FileNotFoundException, IOException {
            // (0)校验是否能解压缩
            CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
            CompressionCodec codec = factory.getCodec(new Path(filename));
            if (codec == null) {
                System.out.println("cannot find codec for file " + filename);
                return;
            }
            // (1)获取输入流
            CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
            // (2)获取输出流
            FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
            // (3)流的对拷
            IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
            // (4)关闭资源
            cis.close();
            fos.close();
        }
    }

    1.6.2 Map输出端采用压缩

      即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置

      1)给大家提供的Hadoop源码支持的压缩格式有:BZip2Codec DefaultCodec

      2)新建WordCountDriver.java

    package com.yuange.mapreduce.compress;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = new Configuration();
    
            // 开启map端输出压缩
            configuration.setBoolean("mapreduce.map.output.compress", true);
            // 设置map端输出压缩方式
            configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(WordCountDriver.class);
    
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 1 : 0);
        }
    }

      3Mapper保持不变

    package com.yuange.mapreduce.compress;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
        Text k = new Text();
        IntWritable v = new IntWritable(1);
    
        @Override
        protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
            // 1 获取一行
            String line = value.toString();
            // 2 切割
            String[] words = line.split(" ");
            // 3 循环写出
            for(String word:words){
                k.set(word);
                context.write(k, v);
            }
        }
    }

      4Reducer保持不变

    package com.yuange.mapreduce.compress;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        IntWritable v = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                              Context context) throws IOException, InterruptedException {
            int sum = 0;
            // 1 汇总
            for(IntWritable value:values){
                sum += value.get();
            }
            v.set(sum);
            // 2 输出
            context.write(key, v);
        }
    }

    1.6.3 Reduce输出端采用压缩

      基于WordCount案例处理

      1)新建WordCountDriver2.java

    package com.yuange.mapreduce.compress;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.io.compress.Lz4Codec;
    import org.apache.hadoop.io.compress.SnappyCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver2 {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = new Configuration();
    
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(WordCountDriver.class);
    
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 设置reduce端输出压缩开启
            FileOutputFormat.setCompressOutput(job, true);
    
            // 设置压缩的方式
            FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
    //        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    //        FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
    
            boolean result = job.waitForCompletion(true);
            System.exit(result?1:0);
        }
    }

      2MapperReducer保持不变

    2Hadoop企业优化

    2.1 MapReduce 跑的慢的原因

       MapReduce程序效率的瓶颈在于两点:

      1)计算机性能

        CPU、内存、磁盘健康、网络

      2)I/O

      (1)数据倾斜

      (2)Map和Reduce数设置不合理

      (3)Map运行时间太长,导致Reduce等待过久

      (4)小文件过多

      (5)大量的不可分块的超大文件

      (6)Split次数过多

      (7)Merge次数过多

    2.2 MapReduce优化方法

      MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

    2.2.1 数据输入

      (1)合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢

      (2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景

    2.2.2 Map阶段

      (1)减少溢写(Spill)次数:通过调整mapreduce.io.sort.mb以及mapreduce.map.sort.spill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO

      (2)减少合并(merge)次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间

      (3)在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少IO

    2.2.3 Reduce阶段

      (1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少会导致Task等待,延长处理时间;太多会导致Map、Reduce任务竞争资源,造成处理超时等错误

      (2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间

      (3)规避使用Reduce:因为Reduce在用于连接数据集时会产生大量的网络消耗

      (4)合理设置Reduce端的Buffer:默认情况下,数据达到一个阈值时,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获取所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间有多次写磁盘-->读磁盘的过程。因此,可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce,从而减少IO开销。mapreduce.reduce.input.buffer.percent,默认值为0.0,当值大于0时,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用,这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整

    2.2.4 I/O传输

      (1)采用数据压缩的方式,减少网络IO传输时间,安装Snappy和LZO压缩编码器

      (2)使用SequenceFile二进制文件

    2.2.5 数据倾斜问题

      1)数据倾斜现象

        数据频率倾斜--------某一个区域的数据量要远远大于其他区域

        数据大小倾斜--------部分记录的大小远远大于平均值

      2)减少数据倾斜的方法

        (1)抽样和范围分区

          可以通过对原始数据进行抽样得到的结果集来预设分区边界值

        (2)自定义分区

          基于输出键的背景知识进行自定义分区。如Map输出键的单词来源于一本书,且其中某几个专业词汇较多,那么就可以自定义分区将这些专业词汇发送给固定的一部分Reduce实例,而将其他的都发送给剩余的Reduce实例

        (3)Combine

          使用Combine可以大量地减少数据倾斜,在可能得情况下,Combine的目的就是聚合并精简数据

        (4)采用Map Join,尽量避免Reduce Join

    2.2.6 常用的调优参数

      1资源相关参数

      (1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)

    配置参数

    参数说明

    mapreduce.map.memory.mb

    一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。

    mapreduce.reduce.memory.mb

    一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。

    mapreduce.map.cpu.vcores

    每个MapTask可使用的最多cpu core数目,默认值: 1

    mapreduce.reduce.cpu.vcores

    每个ReduceTask可使用的最多cpu core数目,默认值: 1

    mapreduce.reduce.shuffle.parallelcopies

    每个Reduce去Map中取数据的并行数。默认值是5

    mapreduce.reduce.shuffle.merge.percent

    Buffer中的数据达到多少比例开始写入磁盘。默认值0.66

    mapreduce.reduce.shuffle.input.buffer.percent

    Buffer大小占Reduce可用内存的比例。默认值0.7

    mapreduce.reduce.input.buffer.percent

    指定多少比例的内存用来存放Buffer中的数据,默认值是0.0

      (2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

    配置参数

    参数说明

    yarn.scheduler.minimum-allocation-mb

    给应用程序Container分配的最小内存,默认值:1024

    yarn.scheduler.maximum-allocation-mb

    给应用程序Container分配的最大内存,默认值:8192

    yarn.scheduler.minimum-allocation-vcores

    每个Container申请的最小CPU核数,默认值:1

    yarn.scheduler.maximum-allocation-vcores

    每个Container申请的最大CPU核数,默认值:32

    yarn.nodemanager.resource.memory-mb

    给Containers分配的最大物理内存,默认值:8192

      (3Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)

    配置参数

    参数说明

    mapreduce.task.io.sort.mb   

    Shuffle的环形缓冲区大小,默认100m

    mapreduce.map.sort.spill.percent   

    环形缓冲区溢出的阈值,默认80%

      2)容错相关参数(MapReduce性能优化)

    配置参数

    参数说明

    mapreduce.map.maxattempts

    每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

    mapreduce.reduce.maxattempts

    每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

    mapreduce.task.timeout

    Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

    2.3 HDFS文件优化方法

    2.3.1 HDFS小文件弊端

      HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

    2.3.2 HDFS小文件解决方案

      小文件的优化无非以下几种方式:

    • 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
    • 业务处理之前HDFS上使用MapReduce程序对小文件进行合并。
    • 在MapReduce处理时,可采用CombineTextInputFormat提高效率。

       解决方案:

        1)Hadoop Archive

          是一个高效地将小文件放入HDFS块中的文件存档工具,能够将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用

        2)Sequence File

          Sequence File由一系列的二进制key/Value组成,若key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件

        3)CombineFileInputFormat

          CombineFileInputFormat是一种新的InputFormat,用于将多个文件合并成一个单独的Split,另外,它会考虑数据的存储位置

        4)开启JVM重用

          对于大量小文件Job,开启JVM重用会减少45%的运行时间

          JVM重用原理:一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map

          具体设置:mapreduce.job.jvm.numtasks值在10-20之间

    3 Hadoop新特性

    3.1 2.x新特性

    3.1.1 集群间数据拷贝

      1scp实现两个远程主机之间的文件复制

    scp -r hello.txt root@hadoop103:/user/atguigu/hello.txt // 推 push
    
    scp -r root@hadoop103:/user/atguigu/hello.txt  hello.txt // 拉 pull
    
    scp -r root@hadoop103:/user/atguigu/hello.txt root@hadoop104:/user/atguigu   //是通过本地主机中转实现两个远程主机的文件复制;如果在两个远程主机之间ssh没有配置的情况下可以使用该方式。

      2采用distcp命令实现两个Hadoop集群之间的递归数据复制

    hadoop distcp hdfs://hadoop102:8020/user/atguigu/hello.txt hdfs://hadoop105:8020/user/atguigu/hello.txt

    3.1.2 小文件存档

       1、HDFS存储小文件弊端

        每个文件均按块存储,每个块的元数据存储在NameNode的内存中,因此HDFS存储小文件会非常低效,大量小文件会消耗NameNode中的大部分内存。但注意,存储小文件所需要的磁盘容量和数据块的大小无关。如一个1MB的文件设置为128M的块存储,实际使用的是1MB的磁盘空间,而不是128MB

      2、解决存储小文件办法之一

        HDFS存档文件或HAR文件,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少NameNode内存使用的同时,允许对文件进行透明的访问。具体来说,HDFS存档文件对内还是一个一个独立文件,对NameNode而言却是一个整体,减少了NameNode的内存

      3、实操

        1需要启动YARN进程

    start-yarn.sh

        2归档文件,把/input目录里面的所有文件归档成一个叫input.har的归档文件,并把归档后文件存储到/output路径下。

    hadoop archive -archiveName input.har -p  /input   /output

        3)查看归档

    hadoop fs -ls -r /output/input.har
    hadoop fs -ls -r har:///output/input.har

        4)解归档文件

    hadoop fs -cp har:///output/input.har/* /output2

    3.1.3 回收站

      开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份作用。

      1回收站参数设置及工作机制

    回收站

      2)启用回收站,修改core-site.xml,配置垃圾回收时间1分钟。

    <property>
      <name>fs.trash.interval</name>
      <value>1</value>
    </property>

      3)查看回收站,回收集群中的路径:/user/atguigu/.Trash/….

      4)修改访问垃圾回收站用户名称,进入垃圾回收站用户名称,默认是dr.who,修改为atguigu用户

    [core-site.xml]
    
    <property>
      <name>hadoop.http.staticuser.user</name>
      <value>atguigu</value>
    </property>

      5通过程序删除的文件不会经过回收站,需要调用moveToTrash()进入回收站

    Trash trash = New Trash(conf);
    trash.moveToTrash(path);

      6)恢复回收站数据

    hadoop fs -mv /user/atguigu/.Trash/Current/input /input

      7清空回收站

    hadoop fs -expunge

    3.2 3.x新特性

    3.2.1 NNHA架构

      HDFS NameNode高可用性的初始实现为单个活动NameNode和单个备用NameNodeedits复制到三个JournalNode该体系结构能够容忍系统中一个NN或一个JN的故障。但是,某些部署需要更高程度的容错能力。Hadoop3.x允许用户运行多个备用NameNode。例如,通过配置三个NameNode和五个JournalNode,群集能够容忍两个节点而不是一个节点的故障。

    3.2.2 纠删码

      HDFS中的默认3副本方案在存储空间和其他资源(例如,网络带宽)中具有200%的开销。但是,对于I / O活动相对较低暖和冷数据集,在正常操作期间很少访问其他块副本,但仍会消耗与第一个副本相同的资源量。

      纠删码(Erasure Coding)能够在不到50% 的数据冗余情况下提供和3副本相同的容错能力,因此,使用纠删码作为副本机制的改进是自然而然的。

    4 Hadoop HA高可用

    4.1 HA概述

      (1)所谓HA(High Availablity),即高可用(7*24小时不中断服务)

      (2)实现高可用最关键策略是消除单点故障。HA严格来说应该分成各个组件的HA机制HDFSHAYARNHA

      (3Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF

      (4NameNode主要在以下两个方面影响HDFS集群

        a)NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启

        b)NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用

      HDFS HA功能通过配置Active/Standby两个NameNodes实现在集群中对NameNode的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将NameNode很快的切换到另外一台机器。

    4.2 HDFS-HA工作机制

      通过双NameNode消除单点故障

    4.2.1 HDFS-HA工作要点

      1)元数据管理方式需要改变

      内存中各自保存一份元数据,Edits日志只有Active状态的NameNode节点可以做写操作,两个NameNode都可以读取Edits,共享的Edits放在一个共享存储中管理(qjournalNFS两个主流实现);

      2)需要一个状态管理功能模块

      实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。

      3必须保证两个NameNode之间能够ssh无密码登录

      4)隔离Fence同一时刻仅仅有一个NameNode对外提供服务

    4.2.2 HDFS-HA自动故障转移工作机制

      在该模式下,即使现役NameNode已经失效,系统也不会自动从现役NameNode转移到待机NameNode,下面学习如何配置部署HA自动进行故障转移。自动故障转移为HDFS部署增加了两个新组件:ZooKeeperZKFailoverControllerZKFC)进程,如图3-20所示ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:

      1故障检测

        集群中的每个NameNodeZooKeeper中维护了一个持久会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。

      2现役NameNode选择

        ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNodeZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:

      3健康监测

        ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。

      4ZooKeeper会话管理

        当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。

      5基于ZooKeeper的选择

        如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNodeActive。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为Active状态。

    4.3 HDFS-HA集群配置

    4.3.1 环境准备

      (1)修改IP

      (2)修改主机名及主机名和IP地址的映射

      (3)关闭防火墙

      (4ssh免密登录

      (5)安装JDK,配置环境变量等

    4.3.2 规划集群

    hadoop102  

    hadoop103  

    hadoop104

    NameNode

    NameNode

    NameNode

    ZKFC

    ZKFC

    ZKFC

    JournalNode

    JournalNode

    JournalNode

    DataNode

    DataNode

    DataNode

    ZK

    ZK

    ZK

     

    ResourceManager

     

    NodeManager

    NodeManager

    NodeManager

    4.3.3 配置Zookeeper集群

      若之前已经安装Zookeeper集群,则不需要再安装,若之前没有安装配置,则参考博客:https://www.cnblogs.com/LzMingYueShanPao/p/14679581.html

    4.3.4 配置HDFS-HA集群(注意:再搭HA集群时必须先关闭之前的集群并在各节点做一个快照)

      1)官方地址:http://hadoop.apache.org/

      2)在/opt目录下创建一个ha文件夹,然后将/tmp/*内容删除(三台节点都要做)

    sudo mkdir ha
    sudo chown atguigu:atguigu /opt/ha
    sudo rm -rf /tmp/*

      3)在拷贝之前先将hadoop中的data,log删除,再将/opt/module/ hadoop-3.1.3拷贝/opt/ha目录

    cp -r /opt/module/hadoop-3.1.3 /opt/ha/

      4)分发至各个节点

      5)配置环境变量(三台都配)

    vim /etc/profile.d/my_env.sh
    ##HADOOP_HOME
    export HADOOP_HOME=/opt/ha/hadoop-3.1.3
    export PATH=$PATH:$HADOOP_HOME/bin
    export PATH=$PATH:$HADOOP_HOME/sbin
    source /etc/profile.d/my_env.sh

      6)配置core-site.xml

    <configuration>
      <property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
      </property>
      <property>
        <name>hadoop.data.dir</name>
        <value>/opt/ha/hadoop-3.1.3/data</value>
      </property>
    </configuration>

      7)配置hdfs-site.xml

    <configuration>
      <property>
        <name>dfs.namenode.name.dir</name>
        <value>file://${hadoop.data.dir}/name</value>
      </property>
      <property>
        <name>dfs.datanode.data.dir</name>
        <value>file://${hadoop.data.dir}/data</value>
      </property>
      <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
      </property>
      <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2,nn3</value>
      </property>
      <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>hadoop102:9820</value>
      </property>
      <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>hadoop103:9820</value>
      </property>
      <property>
        <name>dfs.namenode.rpc-address.mycluster.nn3</name>
        <value>hadoop104:9820</value>
      </property>
      <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>hadoop102:9870</value>
      </property>
      <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>hadoop103:9870</value>
      </property>
      <property>
        <name>dfs.namenode.http-address.mycluster.nn3</name>
        <value>hadoop104:9870</value>
      </property>
      <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hadoop102:8485;hadoop103:8485;hadoop104:8485/mycluster</value>
      </property>
    <!--  访问代理类,client用于确定哪个NN为Active -->
      <property>
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
      </property>
      <!-- 配制隔离机制,即同一时刻只能有一台服务器对外响应 -->
      <property>
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
      </property>
      <!-- 使用隔离机制时需要ssh无密登录 -->
      <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/atguigu/.ssh/id_rsa</value>
      </property>
    <!--  指定NN的元数据在JournalNode的哪个位置存放 -->
      <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>${hadoop.data.dir}/jn</value>
      </property>
    </configuration>

        若之前在该文件中配置有如下内容,则将其删除!!!

    <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop104:9868</value>
    </property>

      8)分发配置至其他节点

    4.3.5 启动HDFS-HA集群

      1在各个JournalNode节点上,输入以下命令启动journalnode服务(每个节点都要做)

    hdfs --daemon start journalnode

      2)在[nn1],对其进行格式化,并启动(hadoop102上执行)

    hdfs namenode -format
    hdfs --daemon start namenode

      3)在[nn2][nn3],同步nn1元数据信息(hdaoop103节点和hadoop104节点各执行一次)

    hdfs namenode -bootstrapStandby

      4)启动[nn2][nn3](hdaoop103节点和hadoop104节点各执行一次)

    hdfs --daemon start namenode

      5)将[nn1]切换为Active 

    hdfs haadmin -transitionToActive nn1

      6)查看是否是Active

    hdfs haadmin -getServiceState nn1
    hdfs haadmin -getServiceState nn2
    hdfs haadmin -getServiceState nn3

      7所有节点上上,启动datanode

    hdfs --daemon start datanode

    4.3.6 配置HDFS-HA自动故障转移

      1具体配置

      (1)在hdfs-site.xml中增加

    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>

      (2)在core-site.xml文件中增加

    <property>
        <name>ha.zookeeper.quorum</name>
        <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
    </property>

      2)分发配置

      3)启动

      (1)关闭所有HDFS服务:

    stop-dfs.sh

      (2)启动Zookeeper集群(之前编写了一个一键启动Zookeeper集群的脚本,详情请看博客:https://www.cnblogs.com/LzMingYueShanPao/p/14679581.html

    zkCluster.sh start

      (3)初始化HAZookeeper状态(在任意一台节点执行一次即可!!!)

    hdfs zkfc -formatZK

      (4)启动HDFS服务:

    start-dfs.sh

      4验证,Active NameNode进程kill

    hdfs --daemon stop namenode

    4.4 YARN-HA配置

    4.4.1 YARN-HA工作机制

      1官方文档:http://hadoop.apache.org/docs/r3.1.3/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html

      2YARN-HA工作机制

    4.4.2 配置YARN-HA集群

      1)环境准备(若前面已经配置了HDFS-HA,则直接跳过此步骤)

        (1)修改IP

        (2)修改主机名及主机名和IP地址的映射

        (3)关闭防火墙

        (4ssh免密登录

        (5)安装JDK,配置环境变量等

        (6)配置Zookeeper集群

      2规划集群

    hadoop102

    hadoop103  

    hadoop104

    NameNode

    NameNode

     

    JournalNode

    JournalNode

    JournalNode

    DataNode

    DataNode

    DataNode

    ZK

    ZK

    ZK

    ResourceManager

    ResourceManager

    NodeManager

    NodeManager

    NodeManager

      3具体配置

      (1yarn-site.xml(添加如下内容至该文件中即可)

    <!-- 配置Yarn-HA -->
      <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>
    
        <!--启用resourcemanager ha-->
        <property>
            <name>yarn.resourcemanager.ha.enabled</name>
            <value>true</value>
        </property>
    
        <!--声明两台resourcemanager的地址-->
        <property>
            <name>yarn.resourcemanager.cluster-id</name>
            <value>cluster-yarn1</value>
        </property>
    
        <property>
            <name>yarn.resourcemanager.ha.rm-ids</name>
            <value>rm1,rm2,rm3</value>
        </property>
    
        <property>
            <name>yarn.resourcemanager.hostname.rm1</name>
            <value>hadoop102</value>
        </property>
    
        <property>
            <name>yarn.resourcemanager.hostname.rm2</name>
            <value>hadoop103</value>
        </property>
    
        <property>
            <name>yarn.resourcemanager.hostname.rm3</name>
            <value>hadoop104</value>
        </property>
    
        <!--指定zookeeper集群的地址-->
        <property>
            <name>yarn.resourcemanager.zk-address</name>
            <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
        </property>
    
        <!--启用自动恢复-->
        <property>
            <name>yarn.resourcemanager.recovery.enabled</name>
            <value>true</value>
        </property>
    
        <!--指定resourcemanager的状态信息存储在zookeeper集群-->
        <property>
            <name>yarn.resourcemanager.store.class</name>
            <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
      </property>

        若该配置文件中存在如下代码,则必须先将其删除!!!

    <!--  指定YARN的ResourceManager的地址-->
    <property>
      <name>yarn.resourcemanager.hostname</name>
      <value>hadoop103</value>
    </property>

      (2)分发配置至各个节点

      4启动YARN

      (1)在任意一台节点中启动yarn

    start-yarn.sh

      (2)查看服务状态

    yarn rmadmin -getServiceState rm1
    yarn rmadmin -getServiceState rm2
    yarn rmadmin -getServiceState rm3

    4.5 HDFS Federation架构设计

    4.5.1 NameNode架构的局限性

      1Namespace(命名空间)限制

        由于NameNode在内存中存储所有的元数据(metadata因此单个NameNode所能存储的对象(文件+块)数目受到NameNode所在JVMheap size的限制。50Gheap能够存储20亿200million对象,这20亿个对象支持4000DataNode12PB的存储(假设文件平均大小为40MB随着数据的飞速增长,存储的需求也随之增长。单个DataNode4T增长到36T,集群的尺寸增长到8000DataNode。存储的需求从12PB增长到大于100PB

      2)隔离问题

        由于HDFS仅有一个NameNode,无法隔离各个程序因此HDFS上的一个实验程序就很有可能影响整个HDFS运行的程序。

      3)性能的瓶颈

        由于是单个NameNodeHDFS架构,因此整个HDFS文件系统的吞吐量受限于单个NameNode的吞吐量。

    4.5.2 HDFS Federation架构设计

    NameNode

    NameNode

    NameNode

    元数据

    元数据

    元数据

    Log

    machine

    电商数据/话单数据

    HDFS Federation架构设计

    4.5.3 HDFS Federation应用思考

      不同应用可以使用不同NameNode进行数据管理图片业务、爬虫业务、日志审计业务。Hadoop生态系统中,不的框架使用不同的NameNode进行管理NameSpace。隔离性)

  • 相关阅读:
    WPF程序设计 :第四章 按钮与其他控件(Buttons and Other Controls)
    C#参考 : 枚举类型
    C#3.0 新特性学习笔记(3):匿名类型
    F#语言2008年9月CTP版已经更新
    C#3.0 新特性学习笔记(1): 对象集合初始化器
    WPF程序设计基础:属性系统
    C#3.0 新特性学习笔记(2):var 隐式声明变量
    MSSql行列转换的Sql语法 详解与实例
    WPF程序设计 :第一章 应用程序和窗口(The Application and the Window)
    WPF程序设计 :第二章 基本画刷(Basic Brushes)
  • 原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14682954.html
Copyright © 2011-2022 走看看