zoukankan      html  css  js  c++  java
  • Hadoop小小笔记

    >> 偶遇JobClient

    这两年在在整一个云计算的东西,但工作主要集中在Client端。

    对Hadoop早有耳闻,但一直没有机会,前几天看到了JobClient这个熟悉的字眼,所以就把Hadoop的源代码拖来,找个机会看看。倒不是想用Hadoop干什么事情,了解了解,免得“云深不知处”。

    虽然Hadoop是用Java开发的,但问题不大,基本上能看懂。Hadoop当然是博大精深,包含了conf/DFS/io/ipc/MapReduce几个部分,但我也只是挑了MapReduce的代码作为观摩对象:

    感兴趣的文件夹:

    ...\src\mapred\org\apache\hadoop\mapred

    ...\src\mapred\org\apache\hadoop\mapreduce

    感兴趣的类:

    JobTracker/TaskTracker/

    JobID/JobProfile/JobContext

    JobInProgress/TaskInProgress/MapTask/ReduceTask

    JobHistory/JobHistoryServer

    >> 关于MapReduce

    MapReduce模型隐藏了并行化,容错,位置优化和负载均衡的细节,使用起来比较简洁。

     1. MapReduce == Map -> Combine -> Reduce

    Map-Reduce框架的运作完全基于<key,value>对,也就是说数据的输入是一批<key,value>对,生成的结果也是一批<key,value>对,只是有时候它们的类型不一样而已。

    由于Keyvalue的类需要支持被序列化操作,它们必须要实现Writable接口。此外,key的类还必须实现WritableComparable接口,以便可以让框架对数据集的执行排序操作。

    一个Map-Reduce任务的执行过程以及数据输入输出的类型如下所示:

    (input)<k1,v1> -> map -> <k2,v2> -> combine -> <k2,v2> -> reduce -> <k3,v3>(output)

     2. 例子: WordCount 1.0

    MapReduce Tutorial中有一个WordCount的例子,要求从读取两个文本文件并计算文本中每个单词的总数。

    源代码:

    package org.myorg;  

    import java.io.IOException;
    import java.util.*;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;

    public class WordCount {

    // Mapper之Map方法
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    output.collect(word, one);
    }
    }
    }

    // Reducer之Reduce方法
    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    int sum = 0;
    while (values.hasNext()) {
    sum += values.next().get();
    }
    output.collect(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    // Job Configuraion
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    // 设置Mapper/Combiner/Reducer
    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    // 设置输入/输出的格式,此处均为Text
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    // 运行Job
    JobClient.runJob(conf);
    }
    }

    Inputs(file01 & file02):

    -------------------------------------------------------------------

    ../wordcount/input/file01    Hello World Bye World
    ../wordcount/input/file02:     Hello Hadoop Goodbye Hadoop

    -------------------------------------------------------------------

    Output:

    ------------------

        Bye 1
        Goodbye 1
        Hadoop 2
        Hello 2
        World 2

    ------------------

    Workflow:

    Step1: Mapper

    Mapper通过map方法每次处理一行文本,然后利用StringTokenizer将其分离成Tokens,然后就将键值对< <word>, 1>输出,它将作为Combine的输入。

    ----------------------------

    the first map emits:
    < Hello, 1>
    < World, 1>
    < Bye, 1>
    < World, 1>

    The second map emits:
    < Hello, 1>
    < Hadoop, 1>
    < Goodbye, 1>
    < Hadoop, 1>

    -----------------------------

    Step2: Combine

    在WordCount这个例子中,Combiner与Reducer是一样的,Combiner类负责将相同key的值合并起来

    ----------------------------------

    The output of the first map:
    < Bye, 1>
    < Hello, 1>
    < World, 2>

    The output of the second map:
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 1>

     ----------------------------------

    Step3: Reduce

    Reducer类通过reduce方法,计算每个单词的总数,从而得到最终的输出。

    -----------------------------------

    Thus the output of the job is:
    < Bye, 1>
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 2>
    < World, 2>

    -----------------------------------

     

    >> MapReduce Architecture

    --------------------------------------------------------------------------------------------

    --------------------------------------------------------------------------------------------

     

    >> JobClient

    每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS,并把路径提交到 JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个 TaskTracker服务中去执行。

    Methods:

    JobClient.runJob()

    JobClient.submitJob

    JobClient.killJob()

     

    >> JobTracker

    它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。 master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每 一个task。TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker 部署在单独的机器上。

    JobTracker is a daemon per pool that administers all aspects of mapred activities.

    JobTracker keeps all the current jobs by containing instances of JobInProgress.

    Methods:

    JobTracker.submitJob(): creates/adds a JobInProgress to jobs and jobsByArrival

    JobTracker.pollForNewTask()

     

    >> JobInProgress/TaskInProgress

     JobInProgress represents a job as it is being tracked by JobTracker.

     TaskInProgress represents a set of tasks for a given unique input, where input is a split for map task or a partition for reduce task.

     

    >> MapTask/ReduceTask:
    MapTask offers method run() that calls MapRunner.run(), which in turn calls the user-supplied Mapper.map().
    ReduceTask offers run() that sorts input files using SequenceFile.Sorter.sort(), and then calls user-supplied Reducer.reduce().

     

    >> 其他

    Hadoop的Task Recovery机制还是比较有意思的,它可以重新尝试运行失败的Task,具体可以看看JobTracker.RecoveryManager。

     

    // I should borrow some concept of Hadoop to SolidMCP
    //    RunningJob
    //    Reporter
    //    JobClient
    //    JobHistory.HistoryCleaner
    //    JobHistory.JobInfo
    //    JobHistory.Listener
    //    JobProfile
    //    TaskReport
    //    TaskTracker
    //    TaskLog
    //    JobQueueInfo
    //    JobContext
    //    JobEndNotifier
    //    JobControl

     

    References:

    http://wiki.apache.org/hadoop/HadoopMapRedClasses

    http://sebug.net/paper/databases/nosql/Nosql.html

     

  • 相关阅读:
    分析一个文本文件(英文文章)中各个词出现的频率,并且把频率最高的10个词打印出来
    求一个数组中的最大整数
    一个统计文本文件中各个英文单词出现频率的问题,并且输出频率最高的10个词
    Python学习一:基础语法
    Spring学习之二
    Spring学习之装配Bean
    Spring学习一
    缓存之ehcache
    解决axios传递参数后台无法接收问题
    服务端解决跨域问题
  • 原文地址:https://www.cnblogs.com/piaoger/p/2332655.html
Copyright © 2011-2022 走看看