zoukankan      html  css  js  c++  java
  • mapReduce体系结构和各种算法 笔记六

    mapReduce体系结构和各种算法

    Mapreduce的工作机制

    任务执行优化

    推测式执行: jobtracker会将执行慢的任务kill,启动一个新的相同备份任务

    mapred-site.xml中设置mapreduce任务的开启和关闭

    Mapred.map.tasks.speculative.execution

    Mapred.reduce.tasks.speculative.execution

    重用jvm,可以省去启的新jvm的消耗时间,mapred-site.xml中配置

    Mapred.job.reuse.jvm.num.tasks设置单个jvm运行的最大任务数(1,>1,-1)

    忽略模式:

    任务在读取失败2次后,会通知jobtracker重新启动该任务,在遇到坏数据时直接跳过,默认为关闭,可以用skipbadrecord方法打开

    错误处理

    硬件故障

    硬件故障:jobtrackertasktracker

    Jobtracker是单点,提高机器性能

    Jobtracker通过心跳信号来检测tracktracker

    Jobtracker会从任务节点列表中移除问题tasktracker

    如果tasktracker正在执行map任务,jobtracker会要求其他节点重新执行

    如果tasktracker正在执行reduce任务,jobtracker会用其他节点继续执行未完成任务

    任务失败

    代码问题 进程死掉

    Jvm自动退出,会向tasktracker父进程发送信息,写入日志

    Tasktracker监听程序会将问题进程标记为失败

    标记任务失败后,任务计数器减1,通过心跳信号通知jobtracker

    Jobtrack收到通知后,将任务放入调度队列,重新执行

    如果一个任务失败4,将不会再执行,作业失败

    打开审计日志

     在hadoop*/conf/log4j.propertites文件中,修改如下:

    Log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN改为INFO

    调整log4j的日志级别

    http://localhost:50070/logLevel

    运维第三方工具

    Ganglia

    Chukwa

    Openstack 

    作业跟踪器: jobTracker

    处理用户提交的作业

    决定有哪些文件参与处理,切割task并分配节点

    监控task,重启失败的task

    每个集群只有吟唯一一个jobTracker位于master节点

    任务跟踪器: taskTracker

    位于slave节点,datanode结合

    管理各节点上的task(jobtracker分配)

    每个节点只有一个taskTracker,但一个taskTracker可以启动多个jvm,用于执行mapreduce任务

    jobTracker交互

    性能调优

    作业需要的reducer

    输入的数据大文件要优于小文件

    减少网络传输:压缩map输出

    优化每个节点的任务数默认值为2

    Mapred.tasktracker.map.tasks.maximum

    Mapred.tasktracker.reduce.tasks.maximum

    Hadoop API开发步骤

    示例代码如下:

    /**

     * Copyright (C) 2015

     * 

     * FileName:Test_1.java

     *

     * Author:<a href="mailto:zhenhuayue@sina.com">Retacn</a>

     *

     * CreateTime: 2015-10-5

     */

    // Package Information

    package cn.yue.test;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    /**

     * 分词

     * 

     * @version

     * 

     * @Description:

     * 

     * @author <a href="mailto:zhenhuayue@sina.com">Retacn</a>

     * 

     * @since 2015-10-5

     * 

     */

    public class Test_1 extends Configured implements Tool {

    enum counter {

    LINESKIP, // 出错的行

    };

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=getConf();

    //任务名

    Job job=new Job(conf,"Test_1");

    //指定class 

    job.setJarByClass(Test_1.class);

    //输入路径

    FileInputFormat.addInputPath(job, new Path(args[0]));

    //输出路径

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //指定mapper

    job.setMapperClass(MyMapper.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    //指定输出keyvalue的格式

    job.setOutputKeyClass(NullWritable.class);

    job.setOutputValueClass(Text.class);

    return job.isSuccessful()?0:1;

    }

    public static class MyMapper extends Mapper<LongWritable, Text, // 输出的keyvalue的格式

    NullWritable, Text> {// 输出的keyvalue的格式

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    //读取源数据

    String line=value.toString();

    try {

    //数据处理

    String[] lineSplit=line.split(" ");

    String month= lineSplit[0];

    String time=lineSplit[1];

    String mac=lineSplit[6];

    Text out=new Text(month+" "+time+" "+mac);

    //数据输出

    context.write(NullWritable.get(), out);

    } catch (Exception e) {

    //出错令计数器加1

    context.getCounter(counter.LINESKIP).increment(1);

    return;

    }

    }

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new Configuration(), new Test_1(), args);

    System.exit(res);

    }

    }

    执行程序:

      先配置运行环境 run configurations arguments中添加如下(指定输入目录和输出目录):

    hdfs://localhost:9000/user/root/in hdfs://localhost:9000/user/root/out

    在运行前查看hdfs中输出目录out不能存在

    Run执行可以 在out目录中看到输出结果

    示例程序

     倒排序,代码如下:

    /**

     * Copyright (C) 2015

     * 

     * FileName:Test_2.java

     *

     * Author:<a href="mailto:zhenhuayue@sina.com">Retacn</a>

     *

     * CreateTime: 2015-10-6

     */

    // Package Information

    package cn.yue.test;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    /**

     * 倒序

     * 

     * @version

     * 

     * @Description:

     * 

     * @author <a href="mailto:zhenhuayue@sina.com">Retacn</a>

     * 

     * @since 2015-10-6

     * 

     */

    public class Test_2 extends Configured implements Tool {

    enum Counter {

    LINESKIP, // 出错的行

    };

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf = getConf();

    // 任务名

    Job job = new Job(conf, "Test_2");

    // 指定class

    job.setJarByClass(Test_2.class);

    // 输入路径

    FileInputFormat.addInputPath(job, new Path(args[0]));

    // 输出路径

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 指定mapper

    job.setMapperClass(MyMapper.class);

    //指定reducer

    job.setReducerClass(MyReducer.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    // 指定输出keyvalue的格式

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(Text.class);

    // TODO 为何添加?

    job.waitForCompletion(true);

    return job.isSuccessful() ? 0 : 1;

    }

    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String line = value.toString();

    try {

    String[] lineSplit = line.split(" ");

    String caller = lineSplit[0];

    String reciver = lineSplit[1];

    context.write(new Text(reciver), new Text(caller));

    } catch (Exception e) {

    context.getCounter(Counter.LINESKIP).increment(1);

    return;

    }

    }

    }

    public static class MyReducer extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

    String valueString;

    String out = "";

    for (Text value : values) {

    valueString = value.toString();

    out += valueString + "|";

    }

    context.write(key, new Text(out));

    }

    }

    public static void main(String[] args) throws Exception {

    int res = ToolRunner.run(new Configuration(), new Test_2(), args);

    System.exit(res);

    }

    }

    运行方法同上,可以指定输入文件

    查看输出结果:

    10000 13953312345|13765431234|18953354321|

    110 13153365432|15953345678|

    13853398765 15153354321|13906431223|

    程序的打包,java程序

    Pig 

    Hadoop客户端

    使类似于sql的面向数据流的语言pig Latin

    Pig Latin可以完成排序,过滤,求和,聚组,关联等操作,可以支持自定义函数

    Pig自动把pig latin映射为map-reduce作业上传到集群运行,减少java代码

    三种运行方式: grunt shell  脚本方式  嵌入式

    4 Zookeeper操作 hadoop集群搭建,sqoop

    Google chubby的开源实现

    用于协调分布式系统上的各种服务,例如消息是否到达,单点失效,负载均衡

    应用场景: hbase 实现nameNode的自动切换

    工作原理领导者,跟随者以及选举过程

    Sqoop

    用于在hadoop和关型数据库这间的数据交换

    通过jdbc接口连入关系型数据库

    Avro

    数据序列化工具

    用于支持大批量数据交换的应用

    动态语言友好

    Thrift接口

      Chukwa

    架构在hadoop之上的数据采集和分析框架

    主要进行日志采集和分析

    通过安装在收集节点的”代理”采集原始日志数据

    代理将数据发给收集器 

    收集器定时将数据写入hadoop集群

    指定定时启动的map-reduce作业对数据进行加工处理和分析

    Hadoop基础管理中心(hicc)最终展示数据

    Cassandra

    noSQL,分布式的key-value型数据

    hbase类似

    只有顺序写,没有随机写

  • 相关阅读:
    【HDOJ】2774 Shuffle
    【POJ】2170 Lattice Animals
    【POJ】1084 Square Destroyer
    【POJ】3523 The Morning after Halloween
    【POJ】3134 Power Calculus
    【Latex】如何在Latex中插入伪代码 —— clrscode3e
    【HDOJ】4801 Pocket Cube 的几种解法和优化
    【HDOJ】4080 Stammering Aliens
    【HDOJ】1800 Flying to the Mars
    SQL语法
  • 原文地址:https://www.cnblogs.com/retacn-yue/p/6194227.html
Copyright © 2011-2022 走看看