zoukankan      html  css  js  c++  java
  • 大数据入门第九天——MapReduce详解(六)MR其他补充

    一、自定义in/outputFormat

      1.需求  

      现有一些原始日志需要做增强解析处理,流程:

        1、 从原始日志文件中读取数据

        2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

        3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

    1374609560.11    1374609560.16    1374609560.16    1374609560.16    110    5    8615038208365    460023383869133    8696420056841778    2    460    0    14615            54941    10.188.77.252    61.145.116.27    35020    80    6    cmnet    1    221.177.218.34    221.177.217.161    221.177.218.34    221.177.217.167    ad.veegao.com    http://ad.veegao.com/veegao/iris.action        Apache-HttpClient/UNAVAILABLE (java 1.4)    POST    200    593    310    4    3    0    0    4    3    0    0    0    0    http://ad.veegao.com/veegao/iris.action    5903903079251243019    5903903103500771339    5980728
    1374609558.91    1374609558.97    1374609558.97    1374609559.31    112    461    8615038208365    460023383869133    8696420056841778    2    460    0    14615            54941    10.188.77.252    101.226.76.175    37293    80    6    cmnet    1    221.177.218.34    221.177.217.161    221.177.218.34    221.177.217.167    short.weixin.qq.com    http://short.weixin.qq.com/cgi-bin/micromsg-bin/getcdndns        Android QQMail HTTP Client    POST    200    543    563    2    3    0    0    2    3    0    0    0    0    http://short.weixin.qq.com/cgi-bin/micromsg-bin/getcdndns    5903903079251243019    5903903097240039435    5980728
    1374609514.70    1374609514.75    1374609514.75    1374609515.58    110    5    8613674976196    460004901700207    8623350100353878    2    460    0    14694            58793    10.184.80.32    111.13.13.222    36181    80    6    cmnet    1    221.177.156.4    221.177.217.145    221.177.156.4    221.177.217.156    retype.wenku.bdimg.com    http://retype.wenku.bdimg.com/img/97308d2b7375a417866f8f09        AMB_400    GET    200    345    4183    5    5    0    0    5    5    0    0    0    0    http://retype.wenku.bdimg.com/img/97308d2b7375a417866f8f09    5903900710696611851    5903902908140003339    5937307
    1374609511.98    1374609512.02    1374609512.02    1374609512.48    110    362    8613674976196    460004901700207    8623350100353878    2    460    0    14694            58793    10.184.80.32    120.204.207.160    33548    80    6    cmnet    1    221.177.156.4    221.177.217.145    221.177.156.4    221.177.217.156    t4.qpic.cn    http://t4.qpic.cn/mblogpic/217cf24d43f1f19255e2/120        AMB_400    GET    200    346    3184    4    4    0    0    4    4    0    0    0    0    http://t4.qpic.cn/mblogpic/217cf24d43f1f19255e2/120    5903900710696611851    5903902896317288459    5937307
    1374609518.14    1374609518.24    1374609518.24    1374609518.72    110    362    8613674976196    460004901700207    8623350100353878    2    460    0    14694            58793    10.184.80.32    120.204.207.160    33548    80    6    cmnet    1    221.177.156.4    221.177.217.145    221.177.156.4    221.177.217.156    t4.qpic.cn    http://t4.qpic.cn/mblogpic/96e02ad781c9be6f5ad2/120        AMB_400    GET    200    346    3328    4    4    0    0    4    4    0    0    0    0    http://t4.qpic.cn/mblogpic/96e02ad781c9be6f5ad2/120    5903900710696611851    5903902896317288459    5937307
    日志示例

       2.分析    

        程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

        这里和之前不一样的点就是需要从数据库提取信息,示例用的是原始的。那我们从简就可以使用DbUtils来简化一些,在mapper中通过setup()进行初始化即可!

      3.代码

      这里偷个小懒就没有手动建立数据库之类测试了。关键点是自定义OutputFormat

      我们默认使用的是TextOutputFormat(),在自定义之前,当然有必要先参考这个默认的东东:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.mapreduce.lib.output;
    
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.FSDataOutputStream;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.OutputFormat;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.util.*;
    
    /** An {@link OutputFormat} that writes plain text files. */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
      public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
      protected static class LineRecordWriter<K, V>
        extends RecordWriter<K, V> {
        private static final String utf8 = "UTF-8";
        private static final byte[] newline;
        static {
          try {
            newline = "
    ".getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }
    
        protected DataOutputStream out;
        private final byte[] keyValueSeparator;
    
        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
          this.out = out;
          try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }
    
        public LineRecordWriter(DataOutputStream out) {
          this(out, "	");
        }
    
        /**
         * Write the object to the byte stream, handling Text as a special
         * case.
         * @param o the object to print
         * @throws IOException if the write throws, we pass it on
         */
        private void writeObject(Object o) throws IOException {
          if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
          } else {
            out.write(o.toString().getBytes(utf8));
          }
        }
    
        public synchronized void write(K key, V value)
          throws IOException {
    
          boolean nullKey = key == null || key instanceof NullWritable;
          boolean nullValue = value == null || value instanceof NullWritable;
          if (nullKey && nullValue) {
            return;
          }
          if (!nullKey) {
            writeObject(key);
          }
          if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
          }
          if (!nullValue) {
            writeObject(value);
          }
          out.write(newline);
        }
    
        public synchronized 
        void close(TaskAttemptContext context) throws IOException {
          out.close();
        }
      }
    
      public RecordWriter<K, V> 
             getRecordWriter(TaskAttemptContext job
                             ) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        boolean isCompressed = getCompressOutput(job);
        String keyValueSeparator= conf.get(SEPERATOR, "	");
        CompressionCodec codec = null;
        String extension = "";
        if (isCompressed) {
          Class<? extends CompressionCodec> codecClass = 
            getOutputCompressorClass(job, GzipCodec.class);
          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
          extension = codec.getDefaultExtension();
        }
        Path file = getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        if (!isCompressed) {
          FSDataOutputStream fileOut = fs.create(file, false);
          return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
        } else {
          FSDataOutputStream fileOut = fs.create(file, false);
          return new LineRecordWriter<K, V>(new DataOutputStream
                                            (codec.createOutputStream(fileOut)),
                                            keyValueSeparator);
        }
      }
    }
    TextOutputFormat

      一些参考与实例https://www.cnblogs.com/liuming1992/p/4758504.html

           http://blog.csdn.net/woshisap/article/details/42320129

            http://chengjianxiaoxue.iteye.com/blog/2163284  --> 推荐

      4.自定义inputFormat  

    public class WholeFileInputFormat extends
            FileInputFormat<NullWritable, BytesWritable> {
        //设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }
    
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(
                InputSplit split, TaskAttemptContext context) throws IOException,
                InterruptedException {
            WholeFileRecordReader reader = new WholeFileRecordReader();
            reader.initialize(split, context);
            return reader;
        }
    }
    WholeFileInputFormat
    class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            this.fileSplit = (FileSplit) split;
            this.conf = context.getConfiguration();
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!processed) {
                byte[] contents = new byte[(int) fileSplit.getLength()];
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try {
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents, 0, contents.length);
                } finally {
                    IOUtils.closeStream(in);
                }
                processed = true;
                return true;
            }
            return false;
        }
    
        @Override
        public NullWritable getCurrentKey() throws IOException,
                InterruptedException {
            return NullWritable.get();
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException,
                InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException {
            return processed ? 1.0f : 0.0f;
        }
    
        @Override
        public void close() throws IOException {
            // do nothing
        }
    }
    WholeFileRecordReader

        参考http://blog.csdn.net/woshixuye/article/details/53557487

           http://irwenqiang.iteye.com/blog/1448164

           http://m635674608.iteye.com/blog/2243076

      完整代码:

    package cn.itcast.bigdata.mr.logenhance;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.HashMap;
    import java.util.Map;
    
    public class DBLoader {
    
        public static void dbLoader(Map<String, String> ruleMap) throws Exception {
    
            Connection conn = null;
            Statement st = null;
            ResultSet res = null;
            
            try {
                Class.forName("com.mysql.jdbc.Driver");
                conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root");
                st = conn.createStatement();
                res = st.executeQuery("select url,content from url_rule");
                while (res.next()) {
                    ruleMap.put(res.getString(1), res.getString(2));
                }
    
            } finally {
                try{
                    if(res!=null){
                        res.close();
                    }
                    if(st!=null){
                        st.close();
                    }
                    if(conn!=null){
                        conn.close();
                    }
    
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    DBLoader
    package cn.itcast.bigdata.mr.logenhance;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogEnhance {
    
        static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
            Map<String, String> ruleMap = new HashMap<String, String>();
    
            Text k = new Text();
            NullWritable v = NullWritable.get();
    
            // 从数据库中加载规则信息倒ruleMap中
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
                try {
                    DBLoader.dbLoader(ruleMap);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                // 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
                Counter counter = context.getCounter("malformed", "malformedline");
                String line = value.toString();
                String[] fields = StringUtils.split(line, "	");
                try {
                    String url = fields[26];
                    String content_tag = ruleMap.get(url);
                    // 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志
                    if (content_tag == null) {
                        k.set(url + "	" + "tocrawl" + "
    ");
                        context.write(k, v);
                    } else {
                        k.set(line + "	" + content_tag + "
    ");
                        context.write(k, v);
                    }
    
                } catch (Exception exception) {
                    counter.increment(1);
                }
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(LogEnhance.class);
    
            job.setMapperClass(LogEnhanceMapper.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法
            job.setOutputFormatClass(LogEnhanceOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:/srcdata/webloginput/"));
    
            // 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat
            // 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path
            FileOutputFormat.setOutputPath(job, new Path("D:/temp/output/"));
    
            // 不需要reducer
            job.setNumReduceTasks(0);
    
            job.waitForCompletion(true);
            System.exit(0);
    
        }
    
    }
    LogEnhance
    package cn.itcast.bigdata.mr.logenhance;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter
     * 然后再调用RecordWriter的write(k,v)方法将数据写出
     * 
     * @author
     * 
     */
    public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {
    
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
    
            FileSystem fs = FileSystem.get(context.getConfiguration());
    
            Path enhancePath = new Path("D:/temp/en/log.dat");
            Path tocrawlPath = new Path("D:/temp/crw/url.dat");
    
            FSDataOutputStream enhancedOs = fs.create(enhancePath);
            FSDataOutputStream tocrawlOs = fs.create(tocrawlPath);
    
            return new EnhanceRecordWriter(enhancedOs, tocrawlOs);
        }
    
        /**
         * 构造一个自己的recordwriter
         * 
         * @author
         * 
         */
        static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> {
            FSDataOutputStream enhancedOs = null;
            FSDataOutputStream tocrawlOs = null;
    
            public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {
                super();
                this.enhancedOs = enhancedOs;
                this.tocrawlOs = tocrawlOs;
            }
    
            @Override
            public void write(Text key, NullWritable value) throws IOException, InterruptedException {
                String result = key.toString();
                // 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat
                if (result.contains("tocrawl")) {
                    tocrawlOs.write(result.getBytes());
                } else {
                    // 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat
                    enhancedOs.write(result.getBytes());
                }
    
            }
    
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                if (tocrawlOs != null) {
                    tocrawlOs.close();
                }
                if (enhancedOs != null) {
                    enhancedOs.close();
                }
    
            }
    
        }
    
    }
    LogEnhanceOutputFormat

       其他 待补充。。

    二、计数器与多Job串联

     1.计数器

     MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。可以用来记录一些全局数据等!

      相关介绍与参考http://blog.csdn.net/xw_classmate/article/details/50954384

               https://www.cnblogs.com/codeOfLife/p/5521356.html

      2.多Job串联  

        一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现

          ——一般不用,因为串联在job中容易写死,建议通过shell脚本来控制

        自定义实现可以通过jobName来区分多个Job,自己控制提交与依赖,所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入

        自定义实现的示例,参考https://www.cnblogs.com/yjmyzz/p/4540469.html

        通过JobControl来控制Job的依赖关系:

        核心代码:

          ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
            ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
            ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
           
            cJob1.setJob(job1);
            cJob2.setJob(job2);
            cJob3.setJob(job3);
    
            // 设置作业依赖关系
            cJob2.addDependingJob(cJob1);
            cJob3.addDependingJob(cJob2);
     
            JobControl jobControl = new JobControl("RecommendationJob");
            jobControl.addJob(cJob1);
            jobControl.addJob(cJob2);
            jobControl.addJob(cJob3);
     
     
            // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束
            Thread jobControlThread = new Thread(jobControl);
            jobControlThread.start();
            while (!jobControl.allFinished()) {
                Thread.sleep(500);
            }
            jobControl.stop();
     
            return 0;

        更多完整示例http://blog.csdn.net/sven119/article/details/78806380

               http://mntms.iteye.com/blog/2096456  -->推荐

    三、数据压缩

      

       经典用法:

        Mapper输出压缩:    

    new API:
     
    Configuration conf = new Configuration();
    conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
    conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
    Job job = new Job(conf);
     
    old API:
     
    conf.setCompressMapOutput(true);
    conf.setMapOutputCompressorClass(GzipCodec.class);

        Reducer输出压缩:

          配置方法:

    mapreduce.output.fileoutputformat.compress=false
    mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
    mapreduce.output.fileoutputformat.compress.type=RECORD

          代码设置法:

           //将reduce输出文件压缩  
                FileOutputFormat.setCompressOutput(job, true);  //job使用压缩  
                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //设置压缩格式  

       更多参考https://www.cnblogs.com/ggjucheng/archive/2012/04/22/2465580.html

    四、常用MR配置参数优化

       1.资源相关参数

    11.1 资源相关参数
    //以下参数是在用户自己的mr应用程序中配置就可以生效
    (1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
    (2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
    (3) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
    “-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”
    (4) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
    “-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “”
    (5) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1
    (6) mapreduce.reduce.cpu.vcores: 每个Reduce task可使用的最多cpu core数目, 默认值: 1
    
    //应该在yarn启动之前就配置在服务器的配置文件中才能生效
    (7) yarn.scheduler.minimum-allocation-mb      1024   给应用程序container分配的最小内存
    (8) yarn.scheduler.maximum-allocation-mb      8192    给应用程序container分配的最大内存
    (9) yarn.scheduler.minimum-allocation-vcores    1    
    (10)yarn.scheduler.maximum-allocation-vcores    32
    (11)yarn.nodemanager.resource.memory-mb   8192  
    
    //shuffle性能优化的关键参数,应在yarn启动之前就配置好
    (12)mapreduce.task.io.sort.mb   100         //shuffle的环形缓冲区大小,默认100m
    (13)mapreduce.map.sort.spill.percent   0.8    //环形缓冲区溢出的阈值,默认80%
    View Code

      2.容错相关参数

    (1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
    (2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
    (3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
    (4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.
    (5) mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
    View Code

      3.本地作业参数

    设置以下几个参数:
    mapreduce.framework.name=local
    mapreduce.jobtracker.address=local
    fs.defaultFS=local
    View Code

      4.效率和稳定性相关参数

    (1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false
    (2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false
    (3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。
    (4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize:  FileInputFormat做切片时的最大切片大小
    (切片的默认大小就等于blocksize,即 134217728)
    View Code
  • 相关阅读:
    _DataStructure_C_Impl:稀疏矩阵十字链表存储
    _DataStructure_C_Impl:稀疏矩阵三元组
    _DataStructure_C_Impl:Array
    _DataStructure_C_Impl:KMP模式匹配
    _DataStructure_C_Impl:链串
    _DataStructure_C_Impl:堆串
    _DataStructure_C_Impl:顺序串
    _DataStructure_C_Impl:双端队列
    _DataStructure_C_Impl:链式队列
    _DataStructure_C_Impl:只有队尾指针的链式循环队列
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8422230.html
Copyright © 2011-2022 走看看