zoukankan      html  css  js  c++  java
  • 02Spark的左连接

    两个文件,一个是用户的数据,一个是交易的数据。

    用户:

     交易:

    流程如下:

    分为以下几个步骤: (1)分别读取user文件和transform文件,并转为两个RDD.

    * (2)对上面两个RDD执行maptopair操作。生成userpairRdd和transformpairRdd

    * (3)对transformpairRdd和userpairRdd执行union操作,就是把上面的数据放在一起,生成allRdd

    * (4)然后把allRdd用groupBykey分组,把同一个UserID的数据都放在一起。生成groupRdd。

    * (5)对grouprdd处理,生成productLoctionRdd:(p1,UT),(p2,UT)这种productlistRdd。

    * (6)productlistRdd这里面有数据重复,需要去重。

     

     

    代码结构:

    代码:

    package com.test.book;
    
    import java.util.ArrayList;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    public class LeftJoinCmain {
    
        /*
         * 分为以下几个步骤: (1)分别读取user文件和transform文件,并转为RDD.
         * (2)对上面两个RDD执行maptopair操作。生成userpairRdd和transformpairRdd
         * (3)对transformpairRdd和userpairRdd执行union操作,就是把上面的数据放在一起,生成allRdd
         * (4)然后把allRdd用groupBykey分组,把同一个UserID的数据都放在一起。生成groupRdd。
         * (5)对grouprdd处理,生成productLoctionRdd:(p1,UT),(p2,UT)这种productlistRdd。
         * (6)productlistRdd这里面有数据重复,需要去重。
         * 
         */
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("LeftJoinCmain").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 导入user的数据
            JavaRDD<String> user = sc.textFile("/Users/mac/Desktop/user.txt");
            // 导入transform的数据
            JavaRDD<String> transform = sc.textFile("/Users/mac/Desktop/transactions.txt");
    
            // 生成一个JavaPairRDD,KEY是uerID,Value是Tuple的形式,("L",地址)
            JavaPairRDD<String, Tuple2<String, String>> userpairRdd = user
                    .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
    
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception {
                            String[] args = line.split(" ");
                            return new Tuple2<String, Tuple2<String, String>>(args[0],
                                    new Tuple2<String, String>("L", args[1]));
                        }
    
                    });
    
            // 生成一个transform,
            JavaPairRDD<String, Tuple2<String, String>> transformpairRdd = transform
                    .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
    
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception {
                            String[] args = line.split(" ");
    
                            return new Tuple2<String, Tuple2<String, String>>(args[2],
                                    new Tuple2<String, String>("P", args[1]));
                        }
                    });
    
            /**
             * allRdd的格式是: { (userID,Tuple("L","UT")), (userID,Tuple("P","p3")) . . . }
             */
            JavaPairRDD<String, Tuple2<String, String>> allRdd = userpairRdd.union(transformpairRdd);
    
            /**
             * 这一步就是把同一个uerID的数据放在一起,结果是: (userID1,List[(Tuple2("L","UT"),//一个用户地址信息
             * Tuple2("P","p1"),//其他的都是商品信息 Tuple2("P","p2") ] )
             */
            JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupRdd = allRdd.groupByKey();
    
            /**
             * 这一步就是从groupRdd中去掉userID,生成productLoctionRdd:(p1,UT),(p2,UT)这种。
             * 
             */
    
            JavaPairRDD<String, String> productlistRdd = groupRdd.flatMapToPair(
                    new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() {
    
                        @Override
                        public Iterable<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> t)
                                throws Exception {
    
                            String location = "UNKNOWN";
                            Iterable<Tuple2<String, String>> pairs = t._2;
                            List<String> products = new ArrayList<String>();
                            for (Tuple2<String, String> pair : pairs) {
    
                                if (pair._1.equals("L"))
                                    location = pair._2;
                                if (pair._1.equals("P")) {
                                    products.add(pair._2);
                                }
    
                            }
    
                            List<Tuple2<String, String>> kvList = new ArrayList<Tuple2<String, String>>();
    
                            for (String product : products) {
                                kvList.add(new Tuple2<String, String>(product, location));
    
                            }
                            return kvList;
                        }
                    });
    
            // 把一个商品的所有地址都查出来
    
            JavaPairRDD<String, Iterable<String>> productbylocation = productlistRdd.groupByKey();
            List<Tuple2<String, Iterable<String>>> debug3 = productbylocation.collect();
    
            for (Tuple2<String, Iterable<String>> value : debug3) {
    
                Iterator<String> iterator = value._2.iterator();
    
                while (iterator.hasNext()) {
                    System.out.println(value._1 + ":" + iterator.next());
                }
    
            }
    
            /**
             * 上述代码经过调试, 结果如下: p2:GA p4:GA p4:UT p4:CA p1:UT p1:UT p1:GA p3:UT
             * 
             * 
             * 发现有相同的商品和地址。我们需要把这个重复的结果去除。
             */
            // 处理如下:我们用mapvalues()函数
    
            JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByuniqueLocation = productbylocation
                    .mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
    
                        @Override
                        public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
                            Set<String> uniquelocations = new HashSet<String>();
    
                            Iterator<String> iterator = v1.iterator();
    
                            while (iterator.hasNext()) {
    
                                String value = iterator.next();
                                uniquelocations.add(value);
    
                            }
    
                            // 返回一个商品的所有地址,以及地址的个数。
                            return new Tuple2<Set<String>, Integer>(uniquelocations, uniquelocations.size());
                        }
                    });
    
            List<Tuple2<String, Tuple2<Set<String>, Integer>>> finalresult = productByuniqueLocation.collect();
            for (Tuple2<String, Tuple2<Set<String>, Integer>> vTuple2 : finalresult) {
    
                String aa=vTuple2._1;
                Iterator<String> iterator=vTuple2._2._1.iterator();
                while(iterator.hasNext())
                {
                    System.out.println("商品的名字:"+aa+"所有的地址"+iterator.next());
                    
                    
                }
                
                
            }
    
        }
    
    }

    运行结果:

    去重后的结果:

     

     

     

  • 相关阅读:
    React元素渲染
    初识JSX
    微信小程序复制文本到剪切板
    微信小程序报错request:fail url not in domain list
    小程序,通过自定义编译条件,模拟推荐人功能
    积分抵扣逻辑
    微信小程序 switch 样式
    tomcat 配置开启 APR 模式
    tomcat8 传输json 报错 Invalid character found in the request target. The valid characters are defined in RFC 3986
    c++数组初始化误区
  • 原文地址:https://www.cnblogs.com/shenxiaoquan/p/8737731.html
Copyright © 2011-2022 走看看