zoukankan      html  css  js  c++  java
  • 13.MapReduce应用

    一、Join多种应用

    1.1 Reduce Join

    Reduce Join工作原理:

    Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
    Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段打标志)分开,最后进行合并即可。

    案例:

    需求: 将商品信息表中数据根据商品pid合并到订单数据表中
    order.txt:

    id pid amount
    1001 01 1
    1002 02 2
    1003 03 3
    1004 01 4
    pd.txt:
    pid pname
    -- --
    01 小米
    02 华为
    03 联想
    期望获得数据:
    id pname amount
    -- -- --
    1001 小米 1
    1004 小米 4
    1002 华为 2
    1003 格力 3

    代码实现:

    OrderBean实体:

    public class OrderBean implements WritableComparable<OrderBean> {
    
        private String id;
        private String pid;
        private int amount;
        private String pname;
    
        public OrderBean() {
        }
    
    
        @Override
        public int compareTo(OrderBean o) {
            int compare = this.pid.compareTo(o.pid);
            if (compare == 0) {
                return o.pname.compareTo(this.pname);
            } else {
                return compare;
            }
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(id);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readUTF();
            this.pid = in.readUTF();
            this.amount = in.readInt();
            this.pname = in.readUTF();
        }
        //省略getter、setter、toString方法
        ...
    }
    

    Mapper类

    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
        private OrderBean orderBean = new OrderBean();
        private String fileName;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit fs = (FileSplit) context.getInputSplit();
            fileName = fs.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("	");
            //根据文件名来创建OrderBean对象
            if ("order.txt".equals(fileName)){
                orderBean.setId(fields[0]);
                orderBean.setPid(fields[1]);
                orderBean.setAmount(Integer.parseInt(fields[2]));
                orderBean.setPname("");
            }else {
                orderBean.setPid(fields[0]);
                orderBean.setPname(fields[1]);
                orderBean.setId("");
                orderBean.setAmount(0);
            }
            context.write(orderBean,NullWritable.get());
        }
    }
    

    Reducer类:

    public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<NullWritable> vars = values.iterator();
            //指针下移获取第一个OrderBean
            vars.next();
            String pname = key.getPname();
            while (vars.hasNext()) {
                //指针下移,其对应的key也变化了
                vars.next();
                key.setPname(pname);
                context.write(key, NullWritable.get());
            }
        }
    }
    

    分组Comparator类:

    public class OrderComparator extends WritableComparator {
        public OrderComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean oa = (OrderBean) a;
            OrderBean ob = (OrderBean) b;
            return oa.getPid().compareTo(ob.getPid());
        }
    }
    

    驱动Driver

    public class OrderDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(OrderDriver.class);
            job.setMapperClass(OrderMapper.class);
            job.setReducerClass(OrderReducer.class);
    
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            //设置分组Comparator
            job.setGroupingComparatorClass(OrderComparator.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\MyFile\test"));
            //指定_SUCCESS文件的位置
            FileOutputFormat.setOutputPath(job, new Path("d:\output"));
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    缺点:
    Reduce Join合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

    解决方案: 使用Map Join

    1.2 Map Join

    使用场景:

    Map Join适用于一张表非常小、另一表非常大的场景。

    Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据压力,尽可能的减少数据倾斜。

    实现方式:

    1. DistributedCacheDriver缓存小文件
    2. MapsetUp()方法中读取缓存文件

    代码:

    Mapper类:

    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
        private OrderBean orderBean = new OrderBean();
        private Map<String, String> pMap = new HashMap<>();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] cacheFiles = context.getCacheFiles();
            String path = cacheFiles[0].getPath();
            /**
             * 使用FSDataInputStream会中文乱码
             */
    //        FileSystem fs = FileSystem.get(context.getConfiguration());
    //        FSDataInputStream fis = fs.open(new Path(path));
            BufferedReader fis = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
            String line;
            while (StringUtils.isNotEmpty(line = fis.readLine())) {
                String[] fields = line.split("	");
                pMap.put(fields[0], fields[1]);
            }
            IOUtils.closeStream(fis);
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("	");
            String pname = pMap.get(fields[1]);
            pname = pname == null ? "" : pname;
            orderBean.setId(fields[0]);
            orderBean.setPid(fields[1]);
            orderBean.setAmount(Integer.parseInt(fields[2]));
            orderBean.setPname(pname);
            context.write(orderBean, NullWritable.get());
        }
    }
    

    驱动Driver

    public class OrderDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(OrderDriver.class);
            job.setMapperClass(OrderMapper.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\MyFile\test"));
            //指定_SUCCESS文件的位置
            FileOutputFormat.setOutputPath(job, new Path("d:\output"));
    
            //加载缓存数据
            job.addCacheFile(new URI("file:///d:/MyFile/cache/pd.txt"));
            //Map端Join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
            job.setNumReduceTasks(0);
    
    
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    二、计数器应用

    Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户监控已处理的输入数据量和已产生的输出数据量。

    1. 采用枚举的方式统计计数
    enum MyCounter{MALFORORMED,NORMAL}
    //对枚举定义的自定义计数器加1
    context.getCounter(MyCounter.MALFORORMED).increment(1);
    
    1. 采用计数组、计数器名称的方式统计
    context.getCounter("counterGroup","counter").increment(1);
    
    1. 计数结果在程序运行后的控制台上查看

    三、数据清洗(ETL)

    在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

    需求: 去除日志中字段长度小于等于11的日志
    在这里插入图片描述Mappper类:

    public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    	
    	Text k = new Text();
    	
    	@Override
    	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    		
    		// 1 获取1行数据
    		String line = value.toString();
    		
    		// 2 解析日志
    		boolean result = parseLog(line,context);
    		
    		// 3 日志不合法退出
    		if (!result) {
    			return;
    		}
    		
    		// 4 设置key
    		k.set(line);
    		
    		// 5 写出数据
    		context.write(k, NullWritable.get());
    	}
    
    	// 2 解析日志
    	private boolean parseLog(String line, Context context) {
    
    		// 1 截取
    		String[] fields = line.split(" ");
    		
    		// 2 日志长度大于11的为合法
    		if (fields.length > 11) {
    
    			// 系统计数器
    			context.getCounter("map", "true").increment(1);
    			return true;
    		}else {
    			context.getCounter("map", "false").increment(1);
    			return false;
    		}
    	}
    }
    

    驱动Driver:

    public class LogDriver {
    
    	public static void main(String[] args) throws Exception {
    		// 1 获取job信息
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    
    		// 2 加载jar包
    		job.setJarByClass(LogDriver.class);
    
    		// 3 关联map
    		job.setMapperClass(LogMapper.class);
    
    		// 4 设置最终输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 设置reducetask个数为0
    		job.setNumReduceTasks(0);
    
    		// 5 设置输入和输出路径
    		FileInputFormat.setInputPaths(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    		// 6 提交
    		job.waitForCompletion(true);
    	}
    }
    

    计数效果:
    在这里插入图片描述

    四、MapReduce开发总结

    在编写MapReduce程序时,需要考虑的几个方面:

    ①输入数据接口:InputFormat

    1. 默认使用的实现类是:TextInputFormat
    2. TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
    3. KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为keyvalue。默认分隔符是tab ( )
    4. NlineInputFormat按照指定的行数N来划分切片。
    5. CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率
    6. 用户还可以自定义InputFormat

    ②逻辑处理接口:Mapper

    用户根据业务需求实现其中三个方法:map()setup()cleanup ()

    Partitioner分区

    有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces

    如果业务上有特别的需求,可以自定义分区。

    Comparable排序

    当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法

    部分排序:对最终输出的没一个文件进行内部排序
    全排序:对所有数据进行排序,通常只有一个Reduce
    二次排序:排序的条件有两个
    辅助排序:可以让不同的key进入到同一个ReduceTask

    Combiner合并

    Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

    Reduce端分组:Groupingcomparator

    Reduce端对key进行分组。应用于:在接收的KeyBean对象时,想让一个或几个字段相同(全部字段比较不相同)的Key进入到同一个Reduce方法时,可以采用分组排序。

    ⑦逻辑处理接口:Reducer

    用户根据业务需求实现其中三个方法: reduce()setup()cleanup()

    ⑧输出数据接口:OutputFormat

    默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对向目标文本文件中输出为一行。

    用户还可以自定义OutputFormat

  • 相关阅读:
    JAVA---JDK环境变量的配置
    “==” 与“equals(Object)”区别
    js替换字符串中所有斜杠
    uploadify学习笔记
    VBA学习笔记
    浮动导航条的实现
    canvas初识笔记
    EntityFramework存储过程的返回类型
    CSS及html的特殊字符表
    DIV六种实现元素水平居中
  • 原文地址:https://www.cnblogs.com/hucheng1997/p/13083270.html
Copyright © 2011-2022 走看看