zoukankan      html  css  js  c++  java
  • MapReduce(三)--OutputFormat数据输出,Join

    一、OutputFormat数据输出

    1.1、OutputFormat接口实现类

    OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

    1)文本输出TextOutputFormat

    默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。

    2)SequenceFileOutputFormat

    将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

    3)自定义OutputFormat

    根据用户需求,自定义实现输出。

    1.2、自定义OutputFormat

    1)使用场景

    为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

    例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

    2)自定义OutputFormat步骤

    (1)自定义一个类继承FileOutputFormat。

    (2)改写RecordWriter,具体改写输出数据的方法write()。

    1.3、自定义OutputFormat案例实操

    1.3.1、需求

    过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

    1)输入数据

    http://www.baidu.com
    http://www.google.com
    http://cn.bing.com
    http://www.atguigu.com
    http://www.sohu.com
    http://www.sina.com
    http://www.sin2a.com
    http://www.sin2desa.com
    http://www.sindsafa.com
    

    1.3.2、需求分析

    image

    1.3.3、代码编写

    1)编写FilterMapper类

    package com.dianchou.mr.outputformat;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class FilterMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //http://www.baidu.com
            context.write(value, NullWritable.get());
        }
    }
    

    2)编写FilterReducer类

    package com.dianchou.mr.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    
        Text k = new Text();
    
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //http://www.baidu.com
            String line = key.toString();
            line = line + "
    ";
            k.set(line);
            context.write(k, NullWritable.get());
        }
    }
    

    3)自定义一个OutputFormat类

    package com.dianchou.mr.outputformat;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            return new FilterRecordWriter(job);
        }
    }
    

    4)编写RecordWriter类

    package com.dianchou.mr.outputformat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
    
        FSDataOutputStream fosAtguigu = null;
        FSDataOutputStream fosOther = null;
    
        public FilterRecordWriter(TaskAttemptContext job) {
    
            FileSystem fs = null;
            try {
                fs = FileSystem.get(job.getConfiguration());
                //创建两个输出流
                Path atguiguPath = new Path("D:/hadoop/atguigu.log");
                Path otherPath = new Path("D:/hadoop/other.log");
                fosAtguigu = fs.create(atguiguPath);
                fosOther = fs.create(otherPath);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            // 判断是否包含“atguigu”输出到不同文件
            if(key.toString().contains("atguigu")){
                fosAtguigu.write(key.toString().getBytes());
            }else{
                fosOther.write(key.toString().getBytes());
            }
    
        }
    
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            IOUtils.closeStream(fosAtguigu);
            IOUtils.closeStream(fosOther);
        }
    }
    

    5)编写FilterDriver类

    package com.dianchou.mr.outputformat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class FilterDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "D:\hadoop\fileoutput-input", "D:\hadoop\fileoutput-output" };
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(FilterDriver.class);
            job.setMapperClass(FilterMapper.class);
            job.setReducerClass(FilterReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 要将自定义的输出格式组件设置到job中
            job.setOutputFormatClass(FilterOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
            // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    二、Join多种应用

    2.1、Reduce Join

    2.1.1、Reduce Join工作原理

    Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

    Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

    2.2、Reduce Join案例实操

    2.2.1、需求

    image

    将商品信息表中数据根据商品pid合并到订单数据表中。

    image

    2.2.2、需求分析

    通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联

    image

    2.2.3、代码实现

    1)创建商品和订合并后的Bean类

    package com.dianchou.mr.table;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class TableBean implements Writable {
    
        private String order_id; // 订单id
        private String p_id;      // 产品id
        private int amount;       // 产品数量
        private String pname;     // 产品名称
        private String flag;      // 表的标记
    
        public TableBean() {
        }
    
        public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
            this.order_id = order_id;
            this.p_id = p_id;
            this.amount = amount;
            this.pname = pname;
            this.flag = flag;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(order_id);
            out.writeUTF(p_id);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeUTF(flag);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.order_id = in.readUTF();
            this.p_id = in.readUTF();
            this.amount = in.readInt();
            this.pname = in.readUTF();
            this.flag = in.readUTF();
        }
    
        public String getOrder_id() {
            return order_id;
        }
    
        public void setOrder_id(String order_id) {
            this.order_id = order_id;
        }
    
        public String getP_id() {
            return p_id;
        }
    
        public void setP_id(String p_id) {
            this.p_id = p_id;
        }
    
        public int getAmount() {
            return amount;
        }
    
        public void setAmount(int amount) {
            this.amount = amount;
        }
    
        public String getPname() {
            return pname;
        }
    
        public void setPname(String pname) {
            this.pname = pname;
        }
    
        public String getFlag() {
            return flag;
        }
    
        public void setFlag(String flag) {
            this.flag = flag;
        }
    
        @Override
        public String toString() {
            return order_id + "	" + pname + "	" + amount + "	";
        }
    }
    

    2)编写TableMapper类

    package com.dianchou.mr.table;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
        String name;
        Text k = new Text();
        TableBean bean = new TableBean();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取切片文件信息
            FileSplit split = (FileSplit) context.getInputSplit();
            //获取切片文件名称
            name = split.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("	");
    
            //不同文件不同处理
            if (name.startsWith("order")) {   //订单表
                bean.setOrder_id(fields[0]);
                bean.setP_id(fields[1]);
                bean.setAmount(Integer.parseInt(fields[2]));
                bean.setPname("");
                bean.setFlag("order");
                k.set(fields[1]);
            } else {    //产品表
                bean.setOrder_id("");
                bean.setP_id(fields[0]);
                bean.setPname(fields[1]);
                bean.setFlag("pd");
                bean.setAmount(0);
                k.set(fields[0]);
            }
            context.write(k, bean);
        }
    }
    

    3)编写TableReducer类

    package com.dianchou.mr.table;
    
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
            //存储订单信息
            ArrayList<TableBean> orderBeans = new ArrayList<>();
            //存储产品信息
            TableBean pdBean = new TableBean();
    
            for (TableBean bean : values) {
                if ("order".equals(bean.getFlag())) { //订单表
                    TableBean orderBean = new TableBean();
                    try {
                        BeanUtils.copyProperties(orderBean, bean);
                        orderBeans.add(orderBean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else{  //产品表
                    try {
                        BeanUtils.copyProperties(pdBean, bean);
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
    
            //表拼接
            for (TableBean orderBean : orderBeans) {
                orderBean.setPname(pdBean.getPname());
                context.write(orderBean, NullWritable.get());
            }
        }
    }
    

    4)编写TableDriver类

    package com.dianchou.mr.table;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class TableDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 0 根据自己电脑路径重新配置
            args = new String[]{"D:\hadoop\table-input","D:\hadoop\table-output"};
    
            // 1 获取配置信息,或者job对象实例
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 2 指定本程序的jar包所在的本地路径
            job.setJarByClass(TableDriver.class);
    
            // 3 指定本业务job要使用的Mapper/Reducer业务类
            job.setMapperClass(TableMapper.class);
            job.setReducerClass(TableReducer.class);
    
            // 4 指定Mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(TableBean.class);
    
            // 5 指定最终输出的数据的kv类型
            job.setOutputKeyClass(TableBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 6 指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    5)测试结果

    1001	小米	1
    1001	小米	1
    1002	华为	2
    1002	华为	2
    1003	格力	3
    1003	格力	3

    6)Reduce Join缺点及解决方案

    缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

    解决方案:Map端实现数据合并

    2.2、Map Join

    2.2.1、Map Join工作原理

    1)使用场景

    Map Join适用于一张表十分小、一张表很大场景。

    2)优点

    思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

    在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

    3)具体办法:采用DistributedCache

    (1)在Mapper的setup阶段,将文件读取到缓存集合中。

    (2)在驱动函数中加载缓存。

    // 缓存普通文件到Task运行节点。
    job.addCacheFile(new URI("file://e:/cache/pd.txt"));
    

    2.3、Map Join案例实操

    2.3.1、需求

    image

    将商品信息表中数据根据商品pid合并到订单数据表中。

    image

    2.3.2、需求分析

    MapJoin适用于关联表中有小表的情形。

    image

    2.3.3、代码实现

    1)先在驱动模块中添加缓存文件

    package com.dianchou.mr.cache;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.net.URI;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class DistributedCacheDriver {
    
        public static void main(String[] args) throws Exception {
    
            // 0 根据自己电脑路径重新配置
            args = new String[]{"D:\hadoop\cache-input", "D:\hadoop\cache-output"};
    
            // 1 获取job信息
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 2 设置加载jar包路径
            job.setJarByClass(DistributedCacheDriver.class);
    
            // 3 关联map
            job.setMapperClass(DistributedCacheMapper.class);
    
            // 4 设置最终输出数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 5 设置输入输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 6 加载缓存数据
            job.addCacheFile(new URI("file:///d:/hadoop/pd.txt"));
    
            // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
            job.setNumReduceTasks(0);
    
            // 8 提交
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    2)读取缓存的文件数据

    package com.dianchou.mr.cache;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.*;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author lawrence
     * @create 2021-01-26
     */
    public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        Map<String,String> pdMap = new HashMap<>();
        Text k = new Text();
    
        /**
         * 缓存文件
         * pid	pname
         * 01	小米
         * 02	华为
         * 03	格力
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // 1 获取缓存的文件
            URI[] cacheFiles = context.getCacheFiles();
            String path = cacheFiles[0].getPath();
    
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));
    
            String line;
            while (StringUtils.isNotEmpty(line = bufferedReader.readLine())){
                String[] fields = line.split("	");
                pdMap.put(fields[0],fields[1]);
            }
            bufferedReader.close();
        }
    
        /**
         *id	pid	amount
         * 1001	01	1
         * 1002	02	2
         * 1003	03	3
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 1 获取一行
            String line = value.toString();
    
            // 2 截取
            String[] fields = line.split("	");
    
            // 3 获取产品id
            String pId = fields[1];
    
            // 4 获取商品名称
            String pdName = pdMap.get(pId);
    
            // 5 拼接
            //id	pname	amount
            //1001	小米	1
            k.set(fields[0] + "	"+ pdName + "	" + fields[2]);
    
            // 6 写出
            context.write(k, NullWritable.get());
    
        }
    }
    

    三、计数器应用

    Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。

    3.1、计数器API

    (1)采用枚举的方式统计计数

    enum MyCounter{MALFORORMED,NORMAL}
    
    //对枚举定义的自定义计数器加1
    context.getCounter(MyCounter.MALFORORMED).increment(1);

    (2)采用计数器组、计数器名称的方式统计

    context.getCounter("counterGroup", "counter").increment(1);
    

    (3)计数结果在程序运行后的控制台上查看。

    四、数据清洗(ETL)

    在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

    4.1、数据清洗案例实操-简单解析版

    4.1.1、需求

    去除日志中字段长度小于等于11的日志。

    4.1.2、需求分析

    需要在Map阶段对输入的数据根据规则进行过滤清洗。

    4.1.3、代码编写

    1)编写LogMapper类

    package com.dianchou.mr.weblog;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-27
     */
    public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        Text k = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            // 1 获取1行数据
            String line = value.toString();
    
            // 2 解析日志
            boolean result = parseLog(line, context);
    
            // 3 日志不合法退出
            if (!result) {
                return;
            }
    
            // 4 设置key
            k.set(line);
    
            // 5 写出数据
            context.write(k, NullWritable.get());
        }
    
        // 2 解析日志
        private boolean parseLog(String line, Context context) {
    
            // 1 截取
            String[] fields = line.split(" ");
    
            // 2 日志长度大于11的为合法
            if (fields.length > 11) {
    
                // 系统计数器
                context.getCounter("map", "true").increment(1);
                return true;
            } else {
                context.getCounter("map", "false").increment(1);
                return false;
            }
        }
    }
    

    2)编写LogDriver类

    package com.dianchou.mr.weblog;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-27
     */
    public class LogDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "D:\hadoop\weblog-input", "D:\hadoop\weblog-output" };
    
            // 1 获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 2 加载jar包
            job.setJarByClass(LogDriver.class);
    
            // 3 关联map
            job.setMapperClass(LogMapper.class);
    
            // 4 设置最终输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 设置reducetask个数为0
            job.setNumReduceTasks(0);
    
            // 5 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 6 提交
            job.waitForCompletion(true);
        }
    }
    

    4.2、数据清洗案例实操-复杂解析版

    对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。

    1)定义一个bean,用来记录日志数据中的各数据字段

    package com.dianchou.mr.weblog2;
    
    /**
     * @author lawrence
     * @create 2021-01-27
     */
    public class LogBean {
        private String remote_addr;// 记录客户端的ip地址
        private String remote_user;// 记录客户端用户名称,忽略属性"-"
        private String time_local;// 记录访问时间与时区
        private String request;// 记录请求的url与http协议
        private String status;// 记录请求状态;成功是200
        private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
        private String http_referer;// 用来记录从那个页面链接访问过来的
        private String http_user_agent;// 记录客户浏览器的相关信息
    
        private boolean valid = true;// 判断数据是否合法
    
        public String getRemote_addr() {
            return remote_addr;
        }
    
        public void setRemote_addr(String remote_addr) {
            this.remote_addr = remote_addr;
        }
    
        public String getRemote_user() {
            return remote_user;
        }
    
        public void setRemote_user(String remote_user) {
            this.remote_user = remote_user;
        }
    
        public String getTime_local() {
            return time_local;
        }
    
        public void setTime_local(String time_local) {
            this.time_local = time_local;
        }
    
        public String getRequest() {
            return request;
        }
    
        public void setRequest(String request) {
            this.request = request;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getBody_bytes_sent() {
            return body_bytes_sent;
        }
    
        public void setBody_bytes_sent(String body_bytes_sent) {
            this.body_bytes_sent = body_bytes_sent;
        }
    
        public String getHttp_referer() {
            return http_referer;
        }
    
        public void setHttp_referer(String http_referer) {
            this.http_referer = http_referer;
        }
    
        public String getHttp_user_agent() {
            return http_user_agent;
        }
    
        public void setHttp_user_agent(String http_user_agent) {
            this.http_user_agent = http_user_agent;
        }
    
        public boolean isValid() {
            return valid;
        }
    
        public void setValid(boolean valid) {
            this.valid = valid;
        }
    
        @Override
        public String toString() {
    
            StringBuilder sb = new StringBuilder();
            sb.append(this.valid);
            sb.append("01").append(this.remote_addr);
            sb.append("01").append(this.remote_user);
            sb.append("01").append(this.time_local);
            sb.append("01").append(this.request);
            sb.append("01").append(this.status);
            sb.append("01").append(this.body_bytes_sent);
            sb.append("01").append(this.http_referer);
            sb.append("01").append(this.http_user_agent);
    
            return sb.toString();
        }
    }
    
    View Code

    2)编写LogMapper类

    package com.dianchou.mr.weblog2;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @author lawrence
     * @create 2021-01-27
     */
    public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Text k = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
            // 1 获取1行
            String line = value.toString();
            // 2 解析日志是否合法
            LogBean bean = parseLog(line);
            if (!bean.isValid()) {
                return;
            }
            k.set(bean.toString());
            // 3 输出
            context.write(k, NullWritable.get());
        }
    
        // 解析日志
        private LogBean parseLog(String line) {
            LogBean logBean = new LogBean();
            // 1 截取
            String[] fields = line.split(" ");
            if (fields.length > 11) {
                // 2封装数据
                logBean.setRemote_addr(fields[0]);
                logBean.setRemote_user(fields[1]);
                logBean.setTime_local(fields[3].substring(1));
                logBean.setRequest(fields[6]);
                logBean.setStatus(fields[8]);
                logBean.setBody_bytes_sent(fields[9]);
                logBean.setHttp_referer(fields[10]);
                if (fields.length > 12) {
                    logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
                }else {
                    logBean.setHttp_user_agent(fields[11]);
                }
                // 大于400,HTTP错误
                if (Integer.parseInt(logBean.getStatus()) >= 400) {
                    logBean.setValid(false);
                }
            }else {
                logBean.setValid(false);
            }
    
            return logBean;
        }
    }
    
    View Code

    3)编写LogDriver类

    package com.dianchou.mr.weblog2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * @author lawrence
     * @create 2021-01-27
     */
    public class LogDriver {
        public static void main(String[] args) throws Exception {
            args = new String[] { "D:\hadoop\weblog-input", "D:\hadoop\weblog-output2" };
    
            // 1 获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 2 加载jar包
            job.setJarByClass(LogDriver.class);
    
            // 3 关联map
            job.setMapperClass(LogMapper.class);
    
            // 4 设置最终输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 5 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 6 提交
            job.waitForCompletion(true);
        }
    }
    
    View Code

    五、MapReduce开发总结

    1.输入数据接口:InputFormat

    (1)默认使用的实现类是:TextInputFormat

    (2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。

    (3)KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为key,value。默认分隔符是tab( )。

    (4)NlineInputFormat按照指定的行数N来划分切片。

    (5)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。

    (6)用户还可以自定义InputFormat。

    2.逻辑处理接口:Mapper

    用户根据业务需求实现其中三个方法:map()   setup()   cleanup ()

    3.Partitioner分区

    (1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces

    (2)如果业务上有特别的需求,可以自定义分区。

    4.Comparable排序

    (1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。

    (2)部分排序:对最终输出的每一个文件进行内部排序。

    (3)全排序:对所有数据进行排序,通常只有一个Reduce。

    (4)二次排序:排序的条件有两个。

    5.Combiner合并

    Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

    6.Reduce端分组:GroupingComparator

    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

    7.逻辑处理接口:Reducer

    用户根据业务需求实现其中三个方法:reduce()   setup()   cleanup ()

    8.输出数据接口:OutputFormat

    (1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。

    (2)将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

    (3)用户还可以自定义OutputFormat。

    作者:Lawrence

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    扫描上面二维码关注我
    如果你真心觉得文章写得不错,而且对你有所帮助,那就不妨帮忙“推荐"一下,您的“推荐”和”打赏“将是我最大的写作动力!
    本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接.
  • 相关阅读:
    springMVC 是单例还是的多例的?
    js如何获取数字占的位数~
    java 为什么wait(),notify(),notifyAll()必须在同步方法/代码块中调用?
    数据挖掘基本概念讲解
    js如何判断小数点后有几位
    volotile关键字的内存可见性及重排序
    上传文件multipart form-data boundary 说明
    vi 调到第一行和最后一行
    linux监控平台搭建-磁盘
    Guava Cache 参数配置说明
  • 原文地址:https://www.cnblogs.com/hujinzhong/p/14330356.html
Copyright © 2011-2022 走看看