zoukankan      html  css  js  c++  java
  • 案例-使用MapReduce实现join操作

    哈喽~各位小伙伴们中秋快乐,好久没更新新的文章啦,今天分享如何使用mapreduce进行join操作。

    在离线计算中,我们常常不只是会对单一一个文件进行操作,进行需要进行两个或多个文件关联出更多数据,类似与sql中的join操作。
    今天就跟大家分享一下如何在MapReduce中实现join操作

    需求

    现有两张,一张是产品信息表,一张是订单表。订单表中只表存了产品ID,如果想要查出订单以及产品的相关信息就必须使用关联。
    

    实现

    根据MapReduce特性,大家都知道在reduce端,相同key的key,value对会被放到同一个reduce方法中(不设置partition的话)。
    利用这个特点我们可以轻松实现join操作,请看下面示例。
    

    产品表

    ID brand model
    p0001 苹果 iphone11 pro max
    p0002 华为 p30
    p0003 小米 mate10

    订单表

    id name address produceID num
    00001 kris 深圳市福田区 p0001 1
    00002 pony 深圳市南山区 p0001 2
    00003 jack 深圳市坂田区 p0001 3

    假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

    select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID
    

    MapReduce实现思路

    通过将关联的条件(prodcueID)作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个
    reduce task,在reduce中进行数据的串联
    

    实现方式一-reduce端join

    定义一个Bean

    public class RJoinInfo implements Writable{
        private String customerName="";
        private String customerAddr="";
        private String orderID="";
        private int orderNum;
        private String productID="";
        private String productBrand="";
        private String productModel="";
    //    0是产品,1是订单
        private int flag;
        
        setter/getter
    

    编写Mapper

    public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> {
        private static Logger logger = LogManager.getLogger(RJoinMapper.class);
        private RJoinInfo rJoinInfo = new RJoinInfo();
        private Text k = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //        输入方式支持很多中包括数据库等等。这里用的是文件,因此可以直接强转为文件切片
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
    //        获取文件名称
            String name = fileSplit.getPath().getName();
            logger.info("splitPathName:"+name);
    
            String line = value.toString();
            String[] split = line.split("	");
    
    
            String productID = "";
    
                if(name.contains("product")){
                    productID = split[0];
                    String setProductBrand = split[1];
                    String productModel = split[2];
    
                    rJoinInfo.setProductID(productID);
                    rJoinInfo.setProductBrand(setProductBrand);
                    rJoinInfo.setProductModel(productModel);
                    rJoinInfo.setFlag(0);
                }else if(name.contains("orders")){
                    String orderID = split[0];
                    String customerName = split[1];
                    String cutsomerAddr = split[2];
                    productID = split[3];
                    String orderNum = split[4];
    
                    rJoinInfo.setProductID(productID);
                    rJoinInfo.setCustomerName(customerName);
                    rJoinInfo.setCustomerAddr(cutsomerAddr);
                    rJoinInfo.setOrderID(orderID);
                    rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
                    rJoinInfo.setFlag(1);
                }
    
            k.set(productID);
            context.write(k,rJoinInfo);
        }
    }
    

    代码解释,这里根据split的文件名,判断是products还是orders,
    然后根据是product还是orders获取不同的数据,最用都以productID为Key发送给Reduce端

    编写Reducer

    public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> {
        private static Logger logger = LogManager.getLogger(RJoinReducer.class);
        @Override
        protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException {
            List<RJoinInfo> orders = new ArrayList<>();
    
            String productID = key.toString();
            logger.info("productID:"+productID);
            RJoinInfo rJoinInfo = new RJoinInfo();
    
            for (RJoinInfo value : values) {
                int flag = value.getFlag();
                if (flag == 0) {
    //                产品
                    try {
                        BeanUtils.copyProperties(rJoinInfo,value);
                    } catch (IllegalAccessException e) {
                        logger.error(e.getMessage());
                    } catch (InvocationTargetException e) {
                        logger.error(e.getMessage());
                    }
                }else {
    //                订单
                    RJoinInfo orderInfo = new RJoinInfo();
                    try {
                        BeanUtils.copyProperties(orderInfo,value);
                    } catch (IllegalAccessException e) {
                        logger.error(e.getMessage());
                    } catch (InvocationTargetException e) {
                        logger.error(e.getMessage());
                    }
                    orders.add(orderInfo);
                }
            }
    
            for (RJoinInfo order : orders) {
                rJoinInfo.setOrderNum(order.getOrderNum());
                rJoinInfo.setOrderID(order.getOrderID());
                rJoinInfo.setCustomerName(order.getCustomerName());
                rJoinInfo.setCustomerAddr(order.getCustomerAddr());
    
    //          只输出key即可,value可以使用nullwritable
                context.write(rJoinInfo,NullWritable.get());
            }
        }
    }
    
    

    代码解释:根据productID会分为不同的组发到reduce端,reduce端拿到后一组数据后,其中有一个产品对象和多个订单对象。
    遍历每一个对象,根据flag区分产品和订单。保存产品对象,获取每个订单对象到一个集合中。当我们对每个对象都分好
    类后,遍历订单集合将订单和产品信息集合,然后输出。

    注意:我们这里效率虽然不是最高的,主要是想说明join的思路。

    编写Driver

    public class RJoinDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
    //        conf.set("mapreduce.framework.name","yarn");
    //        conf.set("yarn.resourcemanager.hostname","server1");
    //        conf.set("fs.defaultFS","hdfs://server1:9000");
            conf.set("mapreduce.framework.name","local");
            conf.set("fs.defaultFS","file:///");
    
            Job job = Job.getInstance(conf);
    
    //       如果是本地运行,可以不用设置jar包的路径,因为不用拷贝jar到其他地方
            job.setJarByClass(RJoinDriver.class);
    //        job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar");
    
            job.setMapperClass(RJoinMapper.class);
            job.setReducerClass(RJoinReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(RJoinInfo.class);
            job.setOutputKeyClass(RJoinInfo.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
            FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output"));
    
            boolean waitForCompletion = job.waitForCompletion(true);
            System.out.println(waitForCompletion);
        }
    }
    

    上面实现的这种方式有个缺点,就是join操作是在reduce阶段完成的,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

    实现方式二-map端join

    这种方式适用于关联表中有小表的情形:
    可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join操作并输出结果,
    可以大大提高join操作的并发度,加快处理速度。
    

    编写Mapper

    在Mapper端我们一次性加载数据或者用Distributedbache将文件拷贝到每一个运行的maptask的节点上加载
    
    这里我们使用第二种,在mapper类中定义好小表进行join
    
    static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{
    
            private static Map<String, RJoinInfo> productMap = new HashMap<>();
    
    //      在循环调用map方法之前会先调用setup方法。因此我们可以在setup方法中,先对文件进行处理
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
                //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
                URI[] cacheFiles = context.getCacheFiles();
                System.out.println(Arrays.toString(new URI[]{cacheFiles[0]}));
    
    //          直接指定名字,默认在工作文件夹的目录下查找 1⃣
                try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){
    
                    String line;
                    while ((line = bufferedReader.readLine())!=null){
                        String[] split = line.split("	");
                        String productID = split[0];
                        String setProductBrand = split[1];
                        String productModel = split[2];
    
                        RJoinInfo rJoinInfo = new RJoinInfo();
                        rJoinInfo.setProductID(productID);
                        rJoinInfo.setProductBrand(setProductBrand);
                        rJoinInfo.setProductModel(productModel);
                        rJoinInfo.setFlag(0);
                        productMap.put(productID, rJoinInfo);
                    }
                }
    
                super.setup(context);
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                FileSplit fileSplit = (FileSplit)context.getInputSplit();
    
                String name = fileSplit.getPath().getName();
    
                if (name.contains("orders")) {
                    String line = value.toString();
    
                    String[] split = line.split("	");
                    String orderID = split[0];
                    String customerName = split[1];
                    String cutsomerAddr = split[2];
                    String productID = split[3];
                    String orderNum = split[4];
    
                    RJoinInfo rJoinInfo = productMap.get(productID);
                    rJoinInfo.setProductID(productID);
                    rJoinInfo.setCustomerName(customerName);
                    rJoinInfo.setCustomerAddr(cutsomerAddr);
                    rJoinInfo.setOrderID(orderID);
                    rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
                    rJoinInfo.setFlag(1);
    
                    context.write(rJoinInfo, NullWritable.get());
                }
            }
        }
    

    代码解释:这里我们又重写了一个setup()方法,这个方法会在执行map()方法前先执行,因此我们可以在这个方法中事先加载好数据。
    在上述代码中,我们直接指定名字就拿到了product.txt文件,这个究竟这个文件是怎么复制在maptask的节点上的呢,还要看下面的driver

    编写Driver

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
            Configuration conf = new Configuration();
            conf.set("mapreduce.framework.name","local");
            conf.set("fs.defaultFS","file:///");
    
            Job job = Job.getInstance(conf);
            job.setJarByClass(RJoinDemoInMapDriver.class);
    
            job.setMapperClass(RjoinMapper.class);
            job.setOutputKeyClass(RJoinInfo.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
            FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2"));
    
    //        指定需要缓存一个文件到所有的maptask运行节点工作目录
    //        job.addFileToClassPath(); 将普通文件缓存到task运行节点的classpath下
    //        job.addArchiveToClassPath();缓存jar包到task运行节点的classpath下
    //        job.addCacheArchive();缓存压缩包文件到task运行节点的工作目录
    //        job.addCacheFile();将普通文件 1⃣
            job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt"));
    
    //      设置reduce的数量为0
            job.setNumReduceTasks(0);
    
    
            boolean waitForCompletion = job.waitForCompletion(true);
            System.out.println(waitForCompletion);
    
        }
    

    代码解释:上述Driver中,我们通过job.addCacheFile()指定了一个URI本地地址,运行时mapreduce就会将这个文件拷贝到maptask的运行工作目录中。

    好啦~本期分享代码量偏多,主要是想分享如何使用mapreduce进行join操作的思路。下一篇我会再讲一下 计算共同好友的思路以及代码~

    		公众号搜索:喜讯XiCent    获取更多福利资源~~~~
    

    本文由博客一文多发平台 OpenWrite 发布!

  • 相关阅读:
    CAS实战の自定义注销
    CAS实战の自定义登录
    MongoDB学习总结
    Django登录使用的技术和组件
    Docker搭建Hadoop环境
    配置Nginx的坑及思路
    Centos7 升级 sqlite3
    Python基础题
    pandas的数据筛选之isin和str.contains函数
    CentOS7 下Docker最新入门教程 超级详细 (安装以及简单的使用)
  • 原文地址:https://www.cnblogs.com/xicent/p/11835434.html
Copyright © 2011-2022 走看看