zoukankan      html  css  js  c++  java
  • Hadoop Reduce Join和Map Join

    一 Reduce Join

    表1:订单表数据。字段为订单编号、品牌id、购买数量。

    1001	01	1
    1002	02	2
    1003	03	3
    1004	01	4
    1005	02	5
    1006	03	6

    表2:品牌信息表。字段为品牌id,品牌名称。

    01	小米
    02	华为
    03	格力

    需求:将表1中的品牌id替换成品牌名称进行输出。

    定义实体类

    ①包括表1和表2的所有字段。

    ②增加一个flag字段,区分来自于哪张表。

    public class TableBean implements Writable{
    
    //    id    pid    amount
    //    pid    pname
        
        private String id;    // 订单id
        private String pid;    // 产品id
        private int amount;    // 数量
        private String pname;    // 产品名称
        private String flag;    // 定义一个标记,标记是订单表还是产品表
        
        public TableBean() {
            super();
        }
        
        public TableBean(String id, String pid, int amount, String pname, String flag) {
            super();
            this.id = id;
            this.pid = pid;
            this.amount = amount;
            this.pname = pname;
            this.flag = flag;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
    
            // 序列化方法
            out.writeUTF(id);
            out.writeUTF(pid);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeUTF(flag);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            // 反序列化方法
            id = in.readUTF();
            pid = in.readUTF();
            amount = in.readInt();
            pname = in.readUTF();
            flag = in.readUTF();
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getPid() {
            return pid;
        }
    
        public void setPid(String pid) {
            this.pid = pid;
        }
    
        public int getAmount() {
            return amount;
        }
    
        public void setAmount(int amount) {
            this.amount = amount;
        }
    
        public String getPname() {
            return pname;
        }
    
        public void setPname(String pname) {
            this.pname = pname;
        }
    
        public String getFlag() {
            return flag;
        }
    
        public void setFlag(String flag) {
            this.flag = flag;
        }
    
        @Override
        public String toString() {
            return id + "	" + amount + "	" + pname;
        }
    }

    Mapper类

    ①需要在setup方法中通过切片信息得到文件名,即数据来自于哪张表。

    ②根据不同的表名封装不同的字段。

    ③将2张表的连接字段作为key,这样表1和表2对应的数据都可以进入到同一个reduce方法。

    public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
        
        String name;
        
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context)
                throws IOException, InterruptedException {
            
            // 获取文件的名称
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            
            name = inputSplit.getPath().getName();
        }
    
        TableBean tableBean = new TableBean();
        Text k = new Text();
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            
    //        id    pid    amount
    //        1001    01    1
    //
    //        pid    pname
    //        01    小米
            
            // 1 获取一行
            String line = value.toString();
            
            if (name.startsWith("order")) {// 订单表
                
                String[] fields = line.split("	");
                
                // 封装key和value
                tableBean.setId(fields[0]);
                tableBean.setPid(fields[1]);
                tableBean.setAmount(Integer.parseInt(fields[2]));
                tableBean.setPname("");
                tableBean.setFlag("order");
                
                k.set(fields[1]);
                
            }else {// 产品表
                
                String[] fields = line.split("	");
                
                // 封装key和value
                tableBean.setId("");
                tableBean.setPid(fields[0]);
                tableBean.setAmount(0);
                tableBean.setPname(fields[1]);
                tableBean.setFlag("pd");
                
                k.set(fields[0]);
            }
            
            // 写出
            context.write(k, tableBean);
        }
    }

    Reducer类

    ①在同一个reduce方法中,得到多个订单表和1条对应的品牌表数据。

    ②将订单集合的数据循环设置品牌名称,写出。

    ③BeanUtils.copyProperties(tmpBean, tableBean)拷贝对象,个人觉得没必要,未测试。

    public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable>{
        
        @Override
        protected void reduce(Text key, Iterable<TableBean> values,
                Context context) throws IOException, InterruptedException {
            
            // 存储所有订单集合
            ArrayList<TableBean> orderBeans = new ArrayList<>();
            // 存储产品信息
            TableBean pdBean = new TableBean();
            
            for (TableBean tableBean : values) {
                
                if ("order".equals(tableBean.getFlag())) {// 订单表
                    
                    TableBean tmpBean = new TableBean();
                    
                    try {
                        BeanUtils.copyProperties(tmpBean, tableBean);
                        
                        orderBeans.add(tmpBean);
                        
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }else {
                    try {
                        BeanUtils.copyProperties(pdBean, tableBean);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
            }
            
            
            for (TableBean tableBean : orderBeans) {
                tableBean.setPname(pdBean.getPname());
                
                context.write(tableBean, NullWritable.get());
            }
        }
    }

    Driver类,略。

    二 Map Join

    Map Join适用于一张表十分小、一张表很大的场景。在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

    Driver类

    ①job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt")) 将小表缓存起来。

    ②去掉Reducer过程。

    public class DistributedCacheDriver {
    
        public static void main(String[] args) throws Exception, IOException {
    
            // 0 根据自己电脑路径重新配置
            args = new String[] { "e:/input/inputtable2", "e:/output1" };
    
            // 1 获取job信息
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 2 设置加载jar包路径
            job.setJarByClass(DistributedCacheDriver.class);
    
            // 3 关联map
            job.setMapperClass(DistributedCacheMapper.class);
    
            // 4 设置最终输出数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 5 设置输入输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 6 加载缓存数据
            job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));
    
            // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
            job.setNumReduceTasks(0);
    
            // 8 提交
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
    
        }
    }

    Mapper类

    ①在setup方法中,逐行读取缓存的小表(品牌表),并存进Map中。

    ②在map方法中,处理全部业务逻辑。

    public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    
        HashMap<String, String> pdMap = new HashMap<>();
        
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            
            // 缓存小表
            URI[] cacheFiles = context.getCacheFiles();
            String path = cacheFiles[0].getPath().toString();
            
            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
            
            String line;
            while(StringUtils.isNotEmpty(line = reader.readLine())){
    //            pid    pname
    //            01    小米
    
                // 1 切割
                String[] fileds = line.split("	");
                
                pdMap.put(fileds[0], fileds[1]);
            }
            
            // 2 关闭资源
            IOUtils.closeStream(reader);
        }
        
        Text k = new Text();
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
    //        id    pid    amount
    //        1001    01    1
            
    //        pid    pname
    //        01    小米
            // 1 获取一行
            String line = value.toString();
            
            // 2 切割
            String[] fileds = line.split("	");
            
            // 3 获取pid
            String pid = fileds[1];
            
            // 4 取出pname
            String pname = pdMap.get(pid);
            
            // 5 拼接
            line = line +"	"+ pname;
            
            
            k.set(line);
            
            // 6 写出
            context.write(k, NullWritable.get());
        }
    }
  • 相关阅读:
    加油 ^_^
    ES6 小记
    Angular2 初学小记
    Round 4
    react中iconfont字体图标不显示问题
    react 实现组件嵌套以及子组件与父组件之间的通信
    wamp&花生壳 在本地搭建自己的网站
    深入浅出CSS(二):关于雪碧图、background-position与steps函数的三角恋情
    深入浅出CSS(一):line-height与vertical-align的性质
    十进制字符编号
  • 原文地址:https://www.cnblogs.com/noyouth/p/13236947.html
Copyright © 2011-2022 走看看