zoukankan      html  css  js  c++  java
  • 13.MapReduce应用

    一、Join多种应用

    1.1 Reduce Join

    Reduce Join工作原理:

    Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
    Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段打标志)分开,最后进行合并即可。

    案例:

    需求: 将商品信息表中数据根据商品pid合并到订单数据表中
    order.txt:

    id pid amount
    1001 01 1
    1002 02 2
    1003 03 3
    1004 01 4
    pd.txt:
    pid pname
    -- --
    01 小米
    02 华为
    03 联想
    期望获得数据:
    id pname amount
    -- -- --
    1001 小米 1
    1004 小米 4
    1002 华为 2
    1003 格力 3

    代码实现:

    OrderBean实体:

    public class OrderBean implements WritableComparable<OrderBean> {
    
        private String id;
        private String pid;
        private int amount;
        private String pname;
    
        public OrderBean() {
        }
    
    
        @Override
        public int compareTo(OrderBean o) {
            int compare = this.pid.compareTo(o.pid);
            if (compare == 0) {
                return o.pname.compareTo(this.pname);
            } else {
                return compare;
            }
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(id);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readUTF();
            this.pid = in.readUTF();
            this.amount = in.readInt();
            this.pname = in.readUTF();
        }
        //省略getter、setter、toString方法
        ...
    }
    

    Mapper类

    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
        private OrderBean orderBean = new OrderBean();
        private String fileName;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit fs = (FileSplit) context.getInputSplit();
            fileName = fs.getPath().getName();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("	");
            //根据文件名来创建OrderBean对象
            if ("order.txt".equals(fileName)){
                orderBean.setId(fields[0]);
                orderBean.setPid(fields[1]);
                orderBean.setAmount(Integer.parseInt(fields[2]));
                orderBean.setPname("");
            }else {
                orderBean.setPid(fields[0]);
                orderBean.setPname(fields[1]);
                orderBean.setId("");
                orderBean.setAmount(0);
            }
            context.write(orderBean,NullWritable.get());
        }
    }
    

    Reducer类:

    public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<NullWritable> vars = values.iterator();
            //指针下移获取第一个OrderBean
            vars.next();
            String pname = key.getPname();
            while (vars.hasNext()) {
                //指针下移,其对应的key也变化了
                vars.next();
                key.setPname(pname);
                context.write(key, NullWritable.get());
            }
        }
    }
    

    分组Comparator类:

    public class OrderComparator extends WritableComparator {
        public OrderComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean oa = (OrderBean) a;
            OrderBean ob = (OrderBean) b;
            return oa.getPid().compareTo(ob.getPid());
        }
    }
    

    驱动Driver

    public class OrderDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(OrderDriver.class);
            job.setMapperClass(OrderMapper.class);
            job.setReducerClass(OrderReducer.class);
    
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            //设置分组Comparator
            job.setGroupingComparatorClass(OrderComparator.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\MyFile\test"));
            //指定_SUCCESS文件的位置
            FileOutputFormat.setOutputPath(job, new Path("d:\output"));
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    缺点:
    Reduce Join合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

    解决方案: 使用Map Join

    1.2 Map Join

    使用场景:

    Map Join适用于一张表非常小、另一表非常大的场景。

    Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据压力,尽可能的减少数据倾斜。

    实现方式:

    1. DistributedCacheDriver缓存小文件
    2. MapsetUp()方法中读取缓存文件

    代码:

    Mapper类:

    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
        private OrderBean orderBean = new OrderBean();
        private Map<String, String> pMap = new HashMap<>();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] cacheFiles = context.getCacheFiles();
            String path = cacheFiles[0].getPath();
            /**
             * 使用FSDataInputStream会中文乱码
             */
    //        FileSystem fs = FileSystem.get(context.getConfiguration());
    //        FSDataInputStream fis = fs.open(new Path(path));
            BufferedReader fis = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
            String line;
            while (StringUtils.isNotEmpty(line = fis.readLine())) {
                String[] fields = line.split("	");
                pMap.put(fields[0], fields[1]);
            }
            IOUtils.closeStream(fis);
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("	");
            String pname = pMap.get(fields[1]);
            pname = pname == null ? "" : pname;
            orderBean.setId(fields[0]);
            orderBean.setPid(fields[1]);
            orderBean.setAmount(Integer.parseInt(fields[2]));
            orderBean.setPname(pname);
            context.write(orderBean, NullWritable.get());
        }
    }
    

    驱动Driver

    public class OrderDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(OrderDriver.class);
            job.setMapperClass(OrderMapper.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:\MyFile\test"));
            //指定_SUCCESS文件的位置
            FileOutputFormat.setOutputPath(job, new Path("d:\output"));
    
            //加载缓存数据
            job.addCacheFile(new URI("file:///d:/MyFile/cache/pd.txt"));
            //Map端Join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
            job.setNumReduceTasks(0);
    
    
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    二、计数器应用

    Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户监控已处理的输入数据量和已产生的输出数据量。

    1. 采用枚举的方式统计计数
    enum MyCounter{MALFORORMED,NORMAL}
    //对枚举定义的自定义计数器加1
    context.getCounter(MyCounter.MALFORORMED).increment(1);
    
    1. 采用计数组、计数器名称的方式统计
    context.getCounter("counterGroup","counter").increment(1);
    
    1. 计数结果在程序运行后的控制台上查看

    三、数据清洗(ETL)

    在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

    需求: 去除日志中字段长度小于等于11的日志
    在这里插入图片描述Mappper类:

    public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    	
    	Text k = new Text();
    	
    	@Override
    	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    		
    		// 1 获取1行数据
    		String line = value.toString();
    		
    		// 2 解析日志
    		boolean result = parseLog(line,context);
    		
    		// 3 日志不合法退出
    		if (!result) {
    			return;
    		}
    		
    		// 4 设置key
    		k.set(line);
    		
    		// 5 写出数据
    		context.write(k, NullWritable.get());
    	}
    
    	// 2 解析日志
    	private boolean parseLog(String line, Context context) {
    
    		// 1 截取
    		String[] fields = line.split(" ");
    		
    		// 2 日志长度大于11的为合法
    		if (fields.length > 11) {
    
    			// 系统计数器
    			context.getCounter("map", "true").increment(1);
    			return true;
    		}else {
    			context.getCounter("map", "false").increment(1);
    			return false;
    		}
    	}
    }
    

    驱动Driver:

    public class LogDriver {
    
    	public static void main(String[] args) throws Exception {
    		// 1 获取job信息
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    
    		// 2 加载jar包
    		job.setJarByClass(LogDriver.class);
    
    		// 3 关联map
    		job.setMapperClass(LogMapper.class);
    
    		// 4 设置最终输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 设置reducetask个数为0
    		job.setNumReduceTasks(0);
    
    		// 5 设置输入和输出路径
    		FileInputFormat.setInputPaths(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    		// 6 提交
    		job.waitForCompletion(true);
    	}
    }
    

    计数效果:
    在这里插入图片描述

    四、MapReduce开发总结

    在编写MapReduce程序时,需要考虑的几个方面:

    ①输入数据接口:InputFormat

    1. 默认使用的实现类是:TextInputFormat
    2. TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
    3. KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为keyvalue。默认分隔符是tab ( )
    4. NlineInputFormat按照指定的行数N来划分切片。
    5. CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率
    6. 用户还可以自定义InputFormat

    ②逻辑处理接口:Mapper

    用户根据业务需求实现其中三个方法:map()setup()cleanup ()

    Partitioner分区

    有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces

    如果业务上有特别的需求,可以自定义分区。

    Comparable排序

    当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法

    部分排序:对最终输出的没一个文件进行内部排序
    全排序:对所有数据进行排序,通常只有一个Reduce
    二次排序:排序的条件有两个
    辅助排序:可以让不同的key进入到同一个ReduceTask

    Combiner合并

    Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

    Reduce端分组:Groupingcomparator

    Reduce端对key进行分组。应用于:在接收的KeyBean对象时,想让一个或几个字段相同(全部字段比较不相同)的Key进入到同一个Reduce方法时,可以采用分组排序。

    ⑦逻辑处理接口:Reducer

    用户根据业务需求实现其中三个方法: reduce()setup()cleanup()

    ⑧输出数据接口:OutputFormat

    默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对向目标文本文件中输出为一行。

    用户还可以自定义OutputFormat

  • 相关阅读:
    对MVC模型的自悟,详尽解释,为了更多非计算机人员可以理解
    openSUSE leap 42.3 实现有线 无线同时用
    Fedora27 源配置
    Ubuntu16.04添加HP Laserjet Pro M128fn打印机和驱动
    openSUSE leap 42.3 添加HP Laserjet Pro M128fn打印机和驱动
    OpenSUSE Leap 42.3下通过Firefox Opera Chromium浏览器直接执行java应用程序(打开java jnlp文件)实现在服务器远程虚拟控制台完成远程管理的方法
    OpenSUSE Leap 42.3 安装java(Oracle jre)
    linux下支持托盘的邮件客户端Sylpheed
    Ubuntu下通过Firefox Opera Chromium浏览器直接执行java应用程序(打开java jnlp文件)实现在服务器远程虚拟控制台完成远程管理的方法
    Firefox 浏览器添加Linux jre插件
  • 原文地址:https://www.cnblogs.com/hucheng1997/p/13083270.html
Copyright © 2011-2022 走看看