zoukankan      html  css  js  c++  java
  • Hadoop实战中高级部分 之 Hadoop MapReduce高级编程

    第一部分:重要的组件
    Combiner
    •什么是Combiner
    •combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>作为输入到reduce函数中,其格式与reduce函数相同。
    •这样可以有效的较少中间结果,减少网络传输负荷。
     
    •什么情况下可以使用Combiner
    •可以对记录进行汇总统计的场景,如求和。
    •求平均数的场景就不可以使用了
    Combiner执行时机
    •运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,即 min.num.spill.for.combine(default 3)
    •当job中设定了combiner,并且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件之前运行
    •通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。
    •Combiner也有可能不执行, Combiner会考虑当时集群的负载情况。
    Combiner如何使用
    •代码示例
    •继承Reducer类
    public static class Combiner extends MapReduceBase implements
               Reducer<Text, Text, Text, Text> {
           public void reduce(Text key, Iterator<Text> values,
                   OutputCollector<Text, Text> output, Reporter reporter)
                   throws IOException {
                     }
        }
     
    •配置作业时加入conf.setCombinerClass(Combiner.class)
     
    Partitioner
    •什么是Partitioner
    •Mapreduce 通过Partitioner 对Key 进行分区,进而把数据按我们自己的需求来分发。
    •什么情况下使用Partitioner
    •如果你需要key按照自己意愿分发,那么你需要这样的组件。
    •例如:数据文件内包含省份,而输出要求每个省份输出一个文件。
    •框架默认的HashPartitioner
    •public class HashPartitioner<K, V> extends Partitioner<K, V> { 

      /** Use {@link Object#hashCode()} to partition. */ 
      public int getPartition(K key, V value, 
                              int numReduceTasks) { 
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
      }
    Partitioner如何使用
    •实现Partitioner接口覆盖getPartition()方法
    •配置作业时加入conf.setPartitionerClass(MyPartitioner.class);
    •Partitioner示例
            public static class MyPartitioner implements Partitioner<Text, Text> {
               
             @Override
                public int getPartition(Text key, Text value, int numPartitions) {
                 }
     
    }
    Partitioner需求示例
    •需求描述
    •数据文件中含有省份
    •需要相同的省份送到相同的Reduce里
    •从而产生不同的文件
    •数据样例
    •1 liaoning
    •1 代表该省份有多少个直辖市
    •步骤
    •实现Partitioner,覆盖getPartition
    •根据省份字段进行切分
     
     
    RecordReader
    •什么是RecordReader
    •用于在分块中读取<Key,Value>对,也就是说每一次我们读取一条记录都会调用该类。
    •主要是处理经过InputFormat分片完的数据 
    •什么时候使用RecordReader
    •需要对输入的数据按自己的需求处理
    •如:要求输入的key不是文件的偏移量而是文件的路径或者名字
    •系统默认为LineRecordReader
    •按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。
     
    RecordReader需求示例
    •需求
    •更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。
    •步骤
    •重写InputFormat不对文件切分
    •重写RecordReader
    •在配置作业时使用自定义的组件进行数据处理
     
     
    第二部分:Join
    案例分析
    •输入为2个文件,文件一内容如下
    •空格分割:用户名 手机号 年龄
    •内容样例
    •Tom 1314567890 14
    •文件二内容
    •空格分割:手机号 地市
    •内容样例
    •13124567890 hubei
    •需要统计出的汇总信息为 用户名 手机号 年龄 地市
    MapJoin
    •设计思路
    •使用DistributedCache.addCacheFile()将地市的文件加入到所有Map的缓存里
    •在Map函数里读取该文件,进行Join
    •  将结果输出到reduce
    •需要注意的是
    •DistributedCache需要在生成Job作业前使用
     
     
    ReduceJoin
    •设计思路
    •Map端读取所有文件,并在输出的内容里加上标识代表数据时从哪个文件里来的
    •在reduce对按照标识对数据进行保存
    •然后根据Key的Join来求出结果直接输出
     
    第三部分:排序
     
    普通排序
    •Mapreduce本身自带排序功能
    •Text对象是不适合排序的,如果内容为整型不会安照编码顺序去排序
    •一般情况下我们可以考虑以IntWritable做为Key,同时将Reduce设置成0 ,进行排序
     
    部分排序
    •即输出的每个文件都是排过序的
    •如果我们不需要全局排序,那么这是个不错的选择。
     
    全局排序
    •产生背景
    •Hadoop平台没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求。
    •使用hadoop进行大量的数据排序排序最直观的方法是把文件所有内容给map之后,map不做任何处理,直接输出给一个reduce,利用hadoop的自己的shuffle机制,对所有数据进行排序,而后由reduce直接输出。
    •快速排序基本步骤就是需要现在所有数据中选取一个作为支点。然后将大于这个支点的放在一边,小于这个支点的放在另一边。
     
    设想如果我们有 N 个支点(这里可以称为标尺),就可以把所有的数据分成 N+1 part ,将这 N+1 part 丢给 reduce ,由 hadoop 自动排序,最后输出 N+1 个内部有序的文件,再把这 N+1 个文件首尾相连合并成一个文件,收工
    由此我们可以归纳出这样一个用 hadoop 对大量数据排序的步骤:
    1   对待排序数据进行抽样;
    2   对抽样数据进行排序,产生标尺;
    3   Map 对输入的每条数据计算其处于哪两个标尺之间;将数据发给对应区间 ID reduce
    4   Reduce 将获得数据直接输出。
    •Hadoop 提供了Sampler接口可以返回一组样本,该接口为Hadoop的采样器。
               public interface Sampler<K, V> {
                            K[] getSample(InputFormat<K, V> inf, Job job)
                             throws IOException, InterruptedException;
                }
    •Hadoop提供了一个TotalOrderPartitioner,可以使我们来实现全局排序。
    二次排序
    •产生背景
    •MapReduce默认会对key进行排序
    •将输出到Reduce的values也进行预先的排序
    •实现方式
    •重写Partitioner,完成key分区,进行第一次排序
    •实现WritableComparator,完成自己的排序逻辑,完成key的第2次排序
    •原理
    •Map之前的数据
             key1  1
             key2  2
             key2  3
             key3  4
             key1  2
    •Mapduce只能排序key,所以为了二次排序我们要重新定义自己的key 简单说来就是<key value> value ,组合完后
             <key1  1 >    1
             <key2  2 >    2
             <key2  3 >    3
             <key3  4>     4
             <key1  2 >    2
     
    •原理
    •接下来实现自定义的排序类,分组类,数据变成
             <key1  1 >    1
             <key1  2 >    2
             <key2  2 >    2
             <key2  3 >    3
             <key3  4>     4
    •最后 reduce处理后输出结果
               key1  1
               key1  2
               key2  2
               key2  3
               key3  4
     
     
     
    第四部分:计数器
    •什么是计数器
                计数器主要用来收集系统信息和作业运行信息,用于知道作业成功、失败等情况,比日志更便利进行分析。
    •内置计数器
    •Hadoop内置的计数器,记录作业执行情况和记录情况。包括MapReduce框架、文件系统、作业计数三大类。
    •计数器由关联任务维护,定期传递给tasktracker,再由tasktracker传给jobtracker。
    •计数器可以被全局聚集。内置的作业计数器实际上由jobtracker维护,不必在整个网络中传递。
    •当一个作业执行成功后,计数器的值才是完整可靠的。
     
     
    用户自定义Java计数器
    •MapReduce框架允许用户自定义计数器
    •计数器是全局使用的
    •计数器有组的概念,可以由一个Java枚举类型来定义
    •如何配置
    •0.20.2以下的版本使用Reporter,
    •0.20.2以上的版本使用context.getCounter(groupName, counterName) 来获取计数器配置并设置。
    •动态计数器
    •所谓动态计数器即不采用Java枚举的方式来定义
     
    •Reporter中的获取动态计数器的方法
    •public void incrCounter(String group,String counter,long amount)
                组名称,计数器名称,计数值
     
    •一些原则
    •创建计数器时,尽量让名称易读
     
     
    •获取计数器
    •Web UI
    •命令行 hadoop job-counter
    •Java API
    •Java API
    •在作业运行完成后,计数器稳定后获取。 使用job.getCounters()得到Counters
     
     
     
    第五部分:合并小文件示例
    •产生背景
    •Hadoop不适合处理小文件
    •会占用大量的内存空间
    •解决方案
    •文件内容读取到SequenceFile内
  • 相关阅读:
    HDU 1789 Doing Homework again(馋)
    Understanding and Using HRMS Security in Oracle HRMS
    BZOJ 1003 ZJOI2006 物流运输trans 动态规划+SPFA
    oracle常规任务
    征服OA 飞鱼工作流程的在线培训课程(两)HTML形成基于
    《iOS8 Swift编程指南》类书图像
    js css 实现简单的计算器
    Android四个基本组件(2)之Service 服务与Content Provider内容提供商
    Unity UGUI——Rect Transform包裹(Anchor Presets)
    NYOJ 1076 计划数(公式 要么 递归)
  • 原文地址:https://www.cnblogs.com/end/p/2866824.html
Copyright © 2011-2022 走看看