zoukankan      html  css  js  c++  java
  • Map Join与计数器

    一、概念

      1、Map Join:

        Map Join适用于一张表十分小、一张表很大的场景,然后两张表还有关联

    二、项目介绍

      1、待处理文本

        order.txt订单信息表里记录着订单ID,商品ID,订单销量(模拟当大表)

        pd.txt商品信息表里记录着商品ID,商品名称(模拟当小表,小表直接加入缓存)    

          
    1001    01    1
    1002    02    2
    1003    03    3
    1004    01    4
    1005    02    5
    1006    03    6
    order.txt 
          
    01    小米
    02    华为
    03    格力
    pd.txt

      2、需求

       将商品信息表中数据根据商品pid合并到订单数据表中

       3、MapBean.java    

          
    package com.jh.mapperJoin;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class MapBean implements Writable {
        private String orderID; //订单ID
        private String shopID; //商品ID
        private Integer orderNum; //订单商品数量
        private String shopName; //商品名称
    
        public MapBean() {
            super();
        }
    
        @Override
        public String toString() {
            return orderID + "	" + shopName + "	" + orderNum;
        }
    
        public String getOrderID() {
            return orderID;
        }
    
        public void setOrderID(String orderID) {
            this.orderID = orderID;
        }
    
        public String getShopID() {
            return shopID;
        }
    
        public void setShopID(String shopID) {
            this.shopID = shopID;
        }
    
        public Integer getOrderNum() {
            return orderNum;
        }
    
        public void setOrderNum(Integer orderNum) {
            this.orderNum = orderNum;
        }
    
        public String getShopName() {
            return shopName;
        }
    
        public void setShopName(String shopName) {
            this.shopName = shopName;
        }
    
        //序列化
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(orderID);
            out.writeUTF(shopID);
            out.writeInt(orderNum);
            out.writeUTF(shopName);
        }
    
        //反序列化
        @Override
        public void readFields(DataInput in) throws IOException {
            orderID = in.readUTF();
            shopID = in.readUTF();
            orderNum = in.readInt();
            shopName = in.readUTF();
        }
    }
    MapBean

      4、MapJoinMapper.java    

          
    package com.jh.mapperJoin;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    
    public class MapJoinMapper extends Mapper<LongWritable,Text,MapBean,NullWritable> {
        /*
            Map Join将小表放在缓存中,只需在driver类中设置缓存小表的路径,
            在Mapper类中取缓存小表,进行读取操作
         */
        private HashMap<String,String> map = new HashMap<>();
        private MapBean bean = new MapBean();
    
        // 用枚举的形式统计读取到的商品的条数和订单的条数(计数器)
        enum MyCount{ORDERNUM,SHOPNUM}
        /**
         * 任务开始执行时调用一次
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // 获取商品数据信息
            URI[] cacheFiles = context.getCacheFiles();
            // 获取文件路径
            String path = cacheFiles[0].getPath();
            // 读取文件内容
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
            String line;
            /*
                判断文件每一行,不是空行进行切割,
                并将商品ID赋值map的键,商品名称赋值map的值
             */
            while (StringUtils.isNotEmpty(line = bufferedReader.readLine())){
                String[] split = line.split("	");
                map.put(split[0],split[1]);
    
                // 用枚举的形式统计读取到的商品的条数
                context.getCounter(MyCount.SHOPNUM).increment(1);
                // 用计算器组名加计数器名来统计读取到的商品的条数
                context.getCounter("Msg","ShopNum").increment(1);
            }
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 获取大表每一行
            String line = value.toString();
            // 分隔符切割
            String[] split = line.split("	");
    
            // 赋值
            bean.setOrderID(split[0]);
            bean.setShopID(split[1]);
            bean.setOrderNum(Integer.parseInt(split[2]));
            // 赋值商品名称,通过小表存在map集合中的键取出值
            bean.setShopName(map.get(split[1]));
    
            // 分别用枚举和组的形式统计读取到的订单的条数
            context.getCounter(MyCount.ORDERNUM).increment(1);
            context.getCounter("Msg","OrderNum").increment(1);
            context.write(bean,NullWritable.get());
        }
    
        /**
         * 任务执行完毕调用一次
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            //super.cleanup(context);
        }
    }
    MapJoinMapper

      5、MapJoinDriver.java  

          
    package com.jh.mapperJoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    public class MapJoinDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
            //1.获取job对象和配置文件对象
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //2.添加jar的路径
            job.setJarByClass(MapJoinDriver.class);
    
            //3.设置mapper类
            job.setMapperClass(MapJoinMapper.class);
    
            //4.设置mapper类输出的数据类型
            job.setMapOutputKeyClass(MapBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //map join添加缓存文件
            job.addCacheFile(new URI("file:///C:/Users/ASUS/Desktop/datas/mapJoinInput2/pd.txt"));
            //map join没有reducetask
            job.setNumReduceTasks(0);
            
            //设置文件的输入出路径
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            //提交任务
            boolean result = job.waitForCompletion(true);
            //成功返回0,失败返回1
            System.exit(result ? 0:1);
        }
    }
    MapJoinDriver

      6、输出结果为

        

      

  • 相关阅读:
    Eclipse中配置Tomcat碰到Server Tomcat v6.0 Server at localhost failed to start问题
    解决java中对URL编码的问题
    上白泽慧音
    小K的农场
    [USACO15JAN]草鉴定Grass Cownoisseur
    [HNOI2012]矿场搭建/Mining Your Own Business
    [POI2008]BLO-Blockade
    「JOISC 2018 Day 1」帐篷
    Sudoku
    序列
  • 原文地址:https://www.cnblogs.com/si-137/p/13416201.html
Copyright © 2011-2022 走看看