zoukankan      html  css  js  c++  java
  • 使用随机数以及扩容表进行join代码

            /**
             * 使用随机数和扩容表进行join
             */
            
            JavaPairRDD<String, Row> expandedRDD = userid2InfoRDD.flatMapToPair(
                    
                    new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                                throws Exception {
                            List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                            
                            for(int i = 0; i < 10; i++) {
                                list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                            }
                            
                            return list;
                        }
                        
                    });
            
            JavaPairRDD<String, String> mappedRDD = userid2PartAggrInfoRDD.mapToPair(
                    
                    new PairFunction<Tuple2<Long,String>, String, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                                throws Exception {
                            Random random = new Random();
                            int prefix = random.nextInt(10);
                            return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);  
                        }
                        
                    });
            
            JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);
            
            JavaPairRDD<String, String> finalRDD = joinedRDD.mapToPair(
                    
                    new PairFunction<Tuple2<String,Tuple2<String,Row>>, String, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, String> call(
                                Tuple2<String, Tuple2<String, Row>> tuple)
                                throws Exception {
                            String partAggrInfo = tuple._2._1;
                            Row userInfoRow = tuple._2._2;
                            
                            String sessionid = StringUtils.getFieldFromConcatString(
                                    partAggrInfo, "\|", Constants.FIELD_SESSION_ID);
                            
                            int age = userInfoRow.getInt(3);
                            String professional = userInfoRow.getString(4);
                            String city = userInfoRow.getString(5);
                            String sex = userInfoRow.getString(6);
                            
                            String fullAggrInfo = partAggrInfo + "|"
                                    + Constants.FIELD_AGE + "=" + age + "|"
                                    + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                    + Constants.FIELD_CITY + "=" + city + "|"
                                    + Constants.FIELD_SEX + "=" + sex;
                            
                            return new Tuple2<String, String>(sessionid, fullAggrInfo);
                        }
                        
                    });
  • 相关阅读:
    hive -- 协同过滤sql语句
    Hive
    hive的排序,分組练习
    Hive 外部表的练习(多表关联查询,以及分组,子查询)
    javascript高级程序编程-学习笔记(基础)
    nodejs-基础与深入
    4.npm模块安装和使用(axios异步请求,lodash工具库)
    node-xlsx
    Git在tortoiseGit以及eclipse的使用方法
    稻盛和夫----六项精进
  • 原文地址:https://www.cnblogs.com/gentle-awen/p/10144893.html
Copyright © 2011-2022 走看看