zoukankan      html  css  js  c++  java
  • 02Spark的左连接

    两个文件,一个是用户的数据,一个是交易的数据。

    用户:

     交易:

    流程如下:

    分为以下几个步骤: (1)分别读取user文件和transform文件,并转为两个RDD.

    * (2)对上面两个RDD执行maptopair操作。生成userpairRdd和transformpairRdd

    * (3)对transformpairRdd和userpairRdd执行union操作,就是把上面的数据放在一起,生成allRdd

    * (4)然后把allRdd用groupBykey分组,把同一个UserID的数据都放在一起。生成groupRdd。

    * (5)对grouprdd处理,生成productLoctionRdd:(p1,UT),(p2,UT)这种productlistRdd。

    * (6)productlistRdd这里面有数据重复,需要去重。

     

     

    代码结构:

    代码:

    package com.test.book;
    
    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    public class LeftJoinCmain {
    
        /*
         * 分为以下几个步骤: (1)分别读取user文件和transform文件,并转为RDD.
         * (2)对上面两个RDD执行maptopair操作。生成userpairRdd和transformpairRdd
         * (3)对transformpairRdd和userpairRdd执行union操作,就是把上面的数据放在一起,生成allRdd
         * (4)然后把allRdd用groupBykey分组,把同一个UserID的数据都放在一起。生成groupRdd。
         * (5)对grouprdd处理,生成productLoctionRdd:(p1,UT),(p2,UT)这种productlistRdd。
         * (6)productlistRdd这里面有数据重复,需要去重。
         * 
         */
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("LeftJoinCmain").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 导入user的数据
            JavaRDD<String> user = sc.textFile("/Users/mac/Desktop/user.txt");
            // 导入transform的数据
            JavaRDD<String> transform = sc.textFile("/Users/mac/Desktop/transactions.txt");
    
            // 生成一个JavaPairRDD,KEY是uerID,Value是Tuple的形式,("L",地址)
            JavaPairRDD<String, Tuple2<String, String>> userpairRdd = user
                    .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
    
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception {
                            String[] args = line.split(" ");
                            return new Tuple2<String, Tuple2<String, String>>(args[0],
                                    new Tuple2<String, String>("L", args[1]));
                        }
    
                    });
    
            // 生成一个transform,
            JavaPairRDD<String, Tuple2<String, String>> transformpairRdd = transform
                    .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
    
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception {
                            String[] args = line.split(" ");
    
                            return new Tuple2<String, Tuple2<String, String>>(args[2],
                                    new Tuple2<String, String>("P", args[1]));
                        }
                    });
    
            /**
             * allRdd的格式是: { (userID,Tuple("L","UT")), (userID,Tuple("P","p3")) . . . }
             */
            JavaPairRDD<String, Tuple2<String, String>> allRdd = userpairRdd.union(transformpairRdd);
    
            /**
             * 这一步就是把同一个uerID的数据放在一起,结果是: (userID1,List[(Tuple2("L","UT"),//一个用户地址信息
             * Tuple2("P","p1"),//其他的都是商品信息 Tuple2("P","p2") ] )
             */
            JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupRdd = allRdd.groupByKey();
    
            /**
             * 这一步就是从groupRdd中去掉userID,生成productLoctionRdd:(p1,UT),(p2,UT)这种。
             * 
             */
    
            JavaPairRDD<String, String> productlistRdd = groupRdd.flatMapToPair(
                    new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() {
    
                        @Override
                        public Iterable<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> t)
                                throws Exception {
    
                            String location = "UNKNOWN";
                            Iterable<Tuple2<String, String>> pairs = t._2;
                            List<String> products = new ArrayList<String>();
                            for (Tuple2<String, String> pair : pairs) {
    
                                if (pair._1.equals("L"))
                                    location = pair._2;
                                if (pair._1.equals("P")) {
                                    products.add(pair._2);
                                }
    
                            }
    
                            List<Tuple2<String, String>> kvList = new ArrayList<Tuple2<String, String>>();
    
                            for (String product : products) {
                                kvList.add(new Tuple2<String, String>(product, location));
    
                            }
                            return kvList;
                        }
                    });
    
            // 把一个商品的所有地址都查出来
    
            JavaPairRDD<String, Iterable<String>> productbylocation = productlistRdd.groupByKey();
            List<Tuple2<String, Iterable<String>>> debug3 = productbylocation.collect();
    
            for (Tuple2<String, Iterable<String>> value : debug3) {
    
                Iterator<String> iterator = value._2.iterator();
    
                while (iterator.hasNext()) {
                    System.out.println(value._1 + ":" + iterator.next());
                }
    
            }
    
            /**
             * 上述代码经过调试, 结果如下: p2:GA p4:GA p4:UT p4:CA p1:UT p1:UT p1:GA p3:UT
             * 
             * 
             * 发现有相同的商品和地址。我们需要把这个重复的结果去除。
             */
            // 处理如下:我们用mapvalues()函数
    
            JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByuniqueLocation = productbylocation
                    .mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
    
                        @Override
                        public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
                            Set<String> uniquelocations = new HashSet<String>();
    
                            Iterator<String> iterator = v1.iterator();
    
                            while (iterator.hasNext()) {
    
                                String value = iterator.next();
                                uniquelocations.add(value);
    
                            }
    
                            // 返回一个商品的所有地址,以及地址的个数。
                            return new Tuple2<Set<String>, Integer>(uniquelocations, uniquelocations.size());
                        }
                    });
    
            List<Tuple2<String, Tuple2<Set<String>, Integer>>> finalresult = productByuniqueLocation.collect();
            for (Tuple2<String, Tuple2<Set<String>, Integer>> vTuple2 : finalresult) {
    
                String aa=vTuple2._1;
                Iterator<String> iterator=vTuple2._2._1.iterator();
                while(iterator.hasNext())
                {
                    System.out.println("商品的名字:"+aa+"所有的地址"+iterator.next());
                    
                    
                }
                
                
            }
    
        }
    
    }

    运行结果:

    去重后的结果:

     

     

     

  • 相关阅读:
    HDU 3572 Task Schedule(拆点+最大流dinic)
    POJ 1236 Network of Schools(Tarjan缩点)
    HDU 3605 Escape(状压+最大流)
    HDU 1166 敌兵布阵(分块)
    Leetcode 223 Rectangle Area
    Leetcode 219 Contains Duplicate II STL
    Leetcode 36 Valid Sudoku
    Leetcode 88 Merge Sorted Array STL
    Leetcode 160 Intersection of Two Linked Lists 单向链表
    Leetcode 111 Minimum Depth of Binary Tree 二叉树
  • 原文地址:https://www.cnblogs.com/shenxiaoquan/p/8737731.html
Copyright © 2011-2022 走看看