zoukankan      html  css  js  c++  java
  • mapReduce体系结构和各种算法 笔记六

    mapReduce体系结构和各种算法

    Mapreduce的工作机制

    任务执行优化

    推测式执行: jobtracker会将执行慢的任务kill,启动一个新的相同备份任务

    mapred-site.xml中设置mapreduce任务的开启和关闭

    Mapred.map.tasks.speculative.execution

    Mapred.reduce.tasks.speculative.execution

    重用jvm,可以省去启的新jvm的消耗时间,mapred-site.xml中配置

    Mapred.job.reuse.jvm.num.tasks设置单个jvm运行的最大任务数(1,>1,-1)

    忽略模式:

    任务在读取失败2次后,会通知jobtracker重新启动该任务,在遇到坏数据时直接跳过,默认为关闭,可以用skipbadrecord方法打开

    错误处理

    硬件故障

    硬件故障:jobtrackertasktracker

    Jobtracker是单点,提高机器性能

    Jobtracker通过心跳信号来检测tracktracker

    Jobtracker会从任务节点列表中移除问题tasktracker

    如果tasktracker正在执行map任务,jobtracker会要求其他节点重新执行

    如果tasktracker正在执行reduce任务,jobtracker会用其他节点继续执行未完成任务

    任务失败

    代码问题 进程死掉

    Jvm自动退出,会向tasktracker父进程发送信息,写入日志

    Tasktracker监听程序会将问题进程标记为失败

    标记任务失败后,任务计数器减1,通过心跳信号通知jobtracker

    Jobtrack收到通知后,将任务放入调度队列,重新执行

    如果一个任务失败4,将不会再执行,作业失败

    打开审计日志

     在hadoop*/conf/log4j.propertites文件中,修改如下:

    Log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN改为INFO

    调整log4j的日志级别

    http://localhost:50070/logLevel

    运维第三方工具

    Ganglia

    Chukwa

    Openstack 

    作业跟踪器: jobTracker

    处理用户提交的作业

    决定有哪些文件参与处理,切割task并分配节点

    监控task,重启失败的task

    每个集群只有吟唯一一个jobTracker位于master节点

    任务跟踪器: taskTracker

    位于slave节点,datanode结合

    管理各节点上的task(jobtracker分配)

    每个节点只有一个taskTracker,但一个taskTracker可以启动多个jvm,用于执行mapreduce任务

    jobTracker交互

    性能调优

    作业需要的reducer

    输入的数据大文件要优于小文件

    减少网络传输:压缩map输出

    优化每个节点的任务数默认值为2

    Mapred.tasktracker.map.tasks.maximum

    Mapred.tasktracker.reduce.tasks.maximum

    Hadoop API开发步骤

    示例代码如下:

    /**

     * Copyright (C) 2015

     * 

     * FileName:Test_1.java

     *

     * Author:<a href="mailto:zhenhuayue@sina.com">Retacn</a>

     *

     * CreateTime: 2015-10-5

     */

    // Package Information

    package cn.yue.test;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    /**

     * 分词

     * 

     * @version

     * 

     * @Description:

     * 

     * @author <a href="mailto:zhenhuayue@sina.com">Retacn</a>

     * 

     * @since 2015-10-5

     * 

     */

    public class Test_1 extends Configured implements Tool {

    enum counter {

    LINESKIP, // 出错的行

    };

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=getConf();

    //任务名

    Job job=new Job(conf,"Test_1");

    //指定class 

    job.setJarByClass(Test_1.class);

    //输入路径

    FileInputFormat.addInputPath(job, new Path(args[0]));

    //输出路径

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //指定mapper

    job.setMapperClass(MyMapper.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    //指定输出keyvalue的格式

    job.setOutputKeyClass(NullWritable.class);

    job.setOutputValueClass(Text.class);

    return job.isSuccessful()?0:1;

    }

    public static class MyMapper extends Mapper<LongWritable, Text, // 输出的keyvalue的格式

    NullWritable, Text> {// 输出的keyvalue的格式

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    //读取源数据

    String line=value.toString();

    try {

    //数据处理

    String[] lineSplit=line.split(" ");

    String month= lineSplit[0];

    String time=lineSplit[1];

    String mac=lineSplit[6];

    Text out=new Text(month+" "+time+" "+mac);

    //数据输出

    context.write(NullWritable.get(), out);

    } catch (Exception e) {

    //出错令计数器加1

    context.getCounter(counter.LINESKIP).increment(1);

    return;

    }

    }

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new Configuration(), new Test_1(), args);

    System.exit(res);

    }

    }

    执行程序:

      先配置运行环境 run configurations arguments中添加如下(指定输入目录和输出目录):

    hdfs://localhost:9000/user/root/in hdfs://localhost:9000/user/root/out

    在运行前查看hdfs中输出目录out不能存在

    Run执行可以 在out目录中看到输出结果

    示例程序

     倒排序,代码如下:

    /**

     * Copyright (C) 2015

     * 

     * FileName:Test_2.java

     *

     * Author:<a href="mailto:zhenhuayue@sina.com">Retacn</a>

     *

     * CreateTime: 2015-10-6

     */

    // Package Information

    package cn.yue.test;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    /**

     * 倒序

     * 

     * @version

     * 

     * @Description:

     * 

     * @author <a href="mailto:zhenhuayue@sina.com">Retacn</a>

     * 

     * @since 2015-10-6

     * 

     */

    public class Test_2 extends Configured implements Tool {

    enum Counter {

    LINESKIP, // 出错的行

    };

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf = getConf();

    // 任务名

    Job job = new Job(conf, "Test_2");

    // 指定class

    job.setJarByClass(Test_2.class);

    // 输入路径

    FileInputFormat.addInputPath(job, new Path(args[0]));

    // 输出路径

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 指定mapper

    job.setMapperClass(MyMapper.class);

    //指定reducer

    job.setReducerClass(MyReducer.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    // 指定输出keyvalue的格式

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(Text.class);

    // TODO 为何添加?

    job.waitForCompletion(true);

    return job.isSuccessful() ? 0 : 1;

    }

    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String line = value.toString();

    try {

    String[] lineSplit = line.split(" ");

    String caller = lineSplit[0];

    String reciver = lineSplit[1];

    context.write(new Text(reciver), new Text(caller));

    } catch (Exception e) {

    context.getCounter(Counter.LINESKIP).increment(1);

    return;

    }

    }

    }

    public static class MyReducer extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

    String valueString;

    String out = "";

    for (Text value : values) {

    valueString = value.toString();

    out += valueString + "|";

    }

    context.write(key, new Text(out));

    }

    }

    public static void main(String[] args) throws Exception {

    int res = ToolRunner.run(new Configuration(), new Test_2(), args);

    System.exit(res);

    }

    }

    运行方法同上,可以指定输入文件

    查看输出结果:

    10000 13953312345|13765431234|18953354321|

    110 13153365432|15953345678|

    13853398765 15153354321|13906431223|

    程序的打包,java程序

    Pig 

    Hadoop客户端

    使类似于sql的面向数据流的语言pig Latin

    Pig Latin可以完成排序,过滤,求和,聚组,关联等操作,可以支持自定义函数

    Pig自动把pig latin映射为map-reduce作业上传到集群运行,减少java代码

    三种运行方式: grunt shell  脚本方式  嵌入式

    4 Zookeeper操作 hadoop集群搭建,sqoop

    Google chubby的开源实现

    用于协调分布式系统上的各种服务,例如消息是否到达,单点失效,负载均衡

    应用场景: hbase 实现nameNode的自动切换

    工作原理领导者,跟随者以及选举过程

    Sqoop

    用于在hadoop和关型数据库这间的数据交换

    通过jdbc接口连入关系型数据库

    Avro

    数据序列化工具

    用于支持大批量数据交换的应用

    动态语言友好

    Thrift接口

      Chukwa

    架构在hadoop之上的数据采集和分析框架

    主要进行日志采集和分析

    通过安装在收集节点的”代理”采集原始日志数据

    代理将数据发给收集器 

    收集器定时将数据写入hadoop集群

    指定定时启动的map-reduce作业对数据进行加工处理和分析

    Hadoop基础管理中心(hicc)最终展示数据

    Cassandra

    noSQL,分布式的key-value型数据

    hbase类似

    只有顺序写,没有随机写

  • 相关阅读:
    浅谈SQL Server 对于内存的管理
    【JSON解析】JSON解析
    SQLSERVER吞噬内存解决记录
    数据schemaAvro简介
    Windows命令查看文件MD5
    均分纸牌(贪心)
    an easy problem(贪心)
    导弹拦截问题(贪心)
    活动选择(贪心)
    整数区间(贪心)
  • 原文地址:https://www.cnblogs.com/retacn-yue/p/6194227.html
Copyright © 2011-2022 走看看