zoukankan      html  css  js  c++  java
  • MapReduce

    Mapper类 :

    • 用户自定义一个Mapper类继承Hadoop的Mapper类
    • Mapper的输入数据是KV对的形式(类型可以自定义)
    • Map阶段的业务逻辑定义在map()方法中
    • Mapper的输出数据是KV对的形式(类型可以自定义)

    注意:map()方法是对每一行数据调用一次!!

    Reducer类

    • 用户自定义Reducer类要继承Hadoop的Reducer类
    • Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
    • Reducer的业务逻辑写在reduce()方法中
    • Reduce()方法是对相同K的一组KV对调用执行一次

    Driver

    • 1. 获取配置文件对象,获取job对象实例
    • 2. 指定程序jar的本地路径
    • 3. 指定Mapper/Reducer类
    • 4. 指定Mapper输出的kv数据类型
    • 5. 指定最终输出的kv数据类型
    • 6. 指定job处理的原始数据路径
    • 7. 指定job输出结果路径
    • 8. 提交作业

    Maven 相关依赖:

    <dependencies>
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.9.2</version></dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.9.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.9.2</version>
    </dependency>
    </dependencies>

    log4j.properties:

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    YARN 集群运行:

      

    hadoop jar wc.jar com.lagou.wordcount.WordcountDriver
    /user/lagou/input /user/lagou/output

    当一个javaBean 对象需要做value,此时需要实现Writable 接口:

    重写序列化和反序列化方法:
    @Override
    public void write(DataOutput out) throws IOException {
    ....
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    ....
    }

    如果javaBean 对象需要做 key,还需要实现Comparable接口.

    自定义分区:

    • 1. 自定义类继承Partitioner,重写getPartition()方法
    • 2. 在Driver驱动中,指定使用自定义Partitioner
    • 3. 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量。

    示例:

    package com.lagou.mr.partition;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
        @Override
        public int getPartition(Text text, PartitionBean partitionBean, int
                numPartitions) {
            int partition = 0;
            final String appkey = text.toString();
            if (appkey.equals("kar")) {
                partition = 1;
            } else if (appkey.equals("pandora")) {
                partition = 2;
            } else {
                partition = 0;
            }
            return partition;
        }
    }

    总结

    • 1. 自定义分区器时最好保证分区数量与reduceTask数量保持一致;
    • 2. 如果分区数量不止1个,但是reduceTask数量1个,此时只会输出一个文件。
    • 3. 如果reduceTask数量大于分区数量,但是输出多个空文件
    • 4. 如果reduceTask数量小于分区数量,有可能会报错。

    Combiner:

      使用场景:每次溢写需要,多次溢写需要

      自定义Combiner实现步骤:

      • 自定义一个Combiner继承Reducer,重写Reduce方法
      • 在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)
    package com.lagou.mr.wc;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import javax.xml.soap.Text;
    import java.io.IOException;
    
    public class WordCountCombiner extends Reducer<Text,
            IntWritable, Text, IntWritable> {
        IntWritable total = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context
                context) throws IOException, InterruptedException {
    //2 遍历key对应的values,然后累加结果
            int sum = 0;
            for (IntWritable value : values) {
                int i = value.get();
                sum += 1;
            }
    // 3 直接输出当前key对应的sum值,结果就是单词出现的总次数
            total.set(sum);
            context.write(key, total);
        }
    }
    job.setCombinerClass(WordcountCombiner.class);

    排序:

    全局排序:一个reducer,会调用key的comperTo 方法

    辅助排序: ( GroupingComparator分组)

      JavaBean 实现WritableComparable 接口,实现数据的排序

      自定义GroupingComparator:继承WritableComparator ,用于判断多个key 是否应该被划分到同一组,给reduce处理

    示例代码:

    javaBean:

    package com.lagou.mr.group;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class OrderBean implements WritableComparable<OrderBean> {
    
        private String orderId;//订单id
        private Double price;//金额
    
    
        public OrderBean(String orderId, Double price) {
            this.orderId = orderId;
            this.price = price;
        }
    
        public OrderBean() {
        }
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public Double getPrice() {
            return price;
        }
    
        public void setPrice(Double price) {
            this.price = price;
        }
    
        //指定排序规则,先按照订单id比较再按照金额比较,按照金额降序排
        @Override
        public int compareTo(OrderBean o) {
            int res = this.orderId.compareTo(o.getOrderId()); //0 1 -1
            if (res == 0) {
                //订单id相同,比较金额
                res = - this.price.compareTo(o.getPrice());
            }
            return res;
        }
    
        //序列化
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeUTF(orderId);
            out.writeDouble(price);
        }
    
        //反序列化
        @Override
        public void readFields(DataInput in) throws IOException {
            this.orderId = in.readUTF();
            this.price = in.readDouble();
        }
    
        //重写toString()
    
        @Override
        public String toString() {
            return orderId + '	' +
                    price
                    ;
        }
    }

    自定义GroupingComparator:

    package com.lagou.mr.group;
    
    import com.sun.corba.se.impl.orb.ParserTable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class CustomGroupingComparator extends WritableComparator {
    
        public CustomGroupingComparator() {
            super(OrderBean.class, true); //注册自定义的GroupingComparator接受OrderBean对象
        }
    
        //重写其中的compare方法,通过这个方法来让mr接受orderid相等则两个对象相等的规则,key相等
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) { //a 和b是orderbean的对象
            //比较两个对象的orderid
            final OrderBean o1 = (OrderBean) a;
            final OrderBean o2 = (OrderBean) b;
            final int i = o1.getOrderId().compareTo(o2.getOrderId());
            return i; // 0 1 -1
        }
    }
    //指定使用groupingcomparator
            job.setGroupingComparatorClass(CustomGroupingComparator.class);

    ReduceJoin:

    •   定义连表后的包含所有字段以及文件来源标签的javaBean类.
    •   在setup方法内,通过切片信息获取文件名
    •   在map中根据文件名,write不同的javaBean对象,(注意连表字段作为key)
    •   reduce端根据标签信息,少表定义map,多表定义List,遍历 List,拼接少表字段

    注意 如果需要向List添加JavaBean对象,需要new一个新对象,做深度拷贝,BeanUtils.copyProperties(newBean, bean),

    hadoop在一次reduce函数的调用过程中,会重复使用key和value两个对象

    package com.lagou.mr.join.reduce_join;
    import com.sun.org.apache.bcel.internal.generic.NEW;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;Reducer
    import java.io.IOException;
    
    public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text,
            DeliverBean> {
        String name;
        DeliverBean bean = new DeliverBean();
        Text k = new Text();
    
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
    // 1 获取输入文件切片
            FileSplit split = (FileSplit) context.getInputSplit();
    // 2 获取输入文件名称
            name = split.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws
                IOException, InterruptedException {
    // 1 获取输入数据
            String line = value.toString();
    // 2 不同文件分别处理
            if (name.startsWith("deliver_info")) {
    // 2.1 切割
                String[] fields = line.split("	");
    // 2.2 封装bean对象
                bean.setUserId(fields[0]);
                bean.setPositionId(fields[1]);
                bean.setDate(fields[2]);
                bean.setPositionName("");
                bean.setFlag("deliver");
                k.set(fields[1]);
            } else {
    // 2.3 切割
                String[] fields = line.split("	");
    // 2.4 封装bean对象
                bean.setPositionId(fields[0]);
                bean.setPositionName(fields[1]);
                bean.setUserId("");
                bean.setDate("");
                bean.setFlag("position");
                k.set(fields[0]);
            }
    // 3 写出
            context.write(k, bean);
        }
    }
    package com.lagou.mr.join.reduce_join;
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    public class ReduceJoinReducer extends Reducer<Text, DeliverBean, DeliverBean,
            NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<DeliverBean> values, Context
                context) throws IOException, InterruptedException {
    // 1准备投递行为数据的集合
            ArrayList<DeliverBean> deBeans = new ArrayList<>();
    // 2 准备bean对象
            DeliverBean pBean = new DeliverBean();
            for (DeliverBean bean : values) {
                if ("deliver".equals(bean.getFlag())) {//
                    DeliverBean dBean = new DeliverBean();
                    try {
                        BeanUtils.copyProperties(dBean, bean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    deBeans.add(dBean);
                } else {
                    try {
                        BeanUtils.copyProperties(pBean, bean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
    // 3 表的拼接
            for (DeliverBean bean : deBeans) {
                bean.setPositionName(pBean.getPositionName());
    // 4 数据写出去
                context.write(bean, NullWritable.get());
            }
        }
    }

      

      

  • 相关阅读:
    数据库中Schema(模式)概念的理解
    debug --- 使用Eclipse
    pgsql 相关函数
    浏览器显示页面排版错误
    jqury 属性
    节点互换需要克隆
    mysql数据库允许远程访问
    request与response的编码和解码
    文本和属性 radio,checkbox,select
    js 和 JQuery 获取iframe的父子值
  • 原文地址:https://www.cnblogs.com/wanghzh/p/14870644.html
Copyright © 2011-2022 走看看