zoukankan      html  css  js  c++  java
  • spark-sql缩减版样例:获取每日top3搜索词和各自的次数,包括总次数

    //获取出每天前3的搜索词
            ArrayList<String> log = new ArrayList<String>();
            log.add("2015-10-01,leo,a1,beijing,android");
            log.add("2015-10-01,leo,a1,beijing,android");
            log.add("2015-10-01,tom,a1,beijing,android");
            log.add("2015-10-01,jack,a1,beijing,android");
            log.add("2015-10-01,marry,a1,beijing,android");
            log.add("2015-10-01,tom,bbf,beijing,android");
            log.add("2015-10-01,jack,bbf,beijing,iphone");
            log.add("2015-10-01,jack,bbf,beijing,android");
            log.add("2015-10-01,leo,ttyu,beijing,android");
            log.add("2015-10-01,leo,ttyu,beijing,android");
            log.add("2015-10-01,wede,a1,beijing,android");
            log.add("2015-10-01,wede,bbf,beijing,iphone");
            log.add("2015-10-02,leo,a2,beijing,android");
            log.add("2015-10-02,tom,a2,beijing,android");
            log.add("2015-10-02,tom,a2,beijing,android");
            log.add("2015-10-02,jack,a1,beijing,android");
            log.add("2015-10-02,marry,a1,beijing,android");
            log.add("2015-10-02,leo,bbf,beijing,iphone");
            log.add("2015-10-02,jack,bbf,beijing,android");
            log.add("2015-10-02,wede,bbf,beijing,android");
            log.add("2015-10-02,leo,ttyu,beijing,android");
            log.add("2015-10-02,leo,ttyu,beijing,android");
            log.add("2015-10-02,jack,a1,beijing,android");
            log.add("2015-10-02,wede,tour,beijing,android");
    
            SparkConf conf = new SparkConf()
    //                .setMaster("local")
                    .setAppName("Top3UV");
            JavaSparkContext sc = new JavaSparkContext(conf);
            HiveContext sqlContext = new HiveContext(sc.sc());
    
            JavaRDD<String> rdd_list = sc.parallelize(log, 2);
            //0条件使用broadcast(每个worker节点共享一个变量)
            final org.apache.spark.broadcast.Broadcast<String> bc = sc.broadcast("iphone");
    
            //1条件过滤
            JavaRDD<String> rdd_filter_list = rdd_list.filter(new Function<String, Boolean>() {
                @Override
                public Boolean call(String v1) throws Exception {
                    String ary[] = v1.split(",");
                    String platform = ary[4];
                    if (platform.contains(bc.value()))
                        return false;
                    return true;
                }
            });
            //2将每行数据构建成tuple2
            JavaPairRDD<String, String> rdd_tuple2_list = rdd_filter_list.mapToPair(new PairFunction<String, String, String>() {
                @Override
                public Tuple2<String, String> call(String s) throws Exception {
                    String ary[] = s.split(",");
                    String time = ary[0];
                    String word = ary[2];
                    String userName = ary[1];
                    return new Tuple2<String, String>(time + "_" + word, userName);
                }
            });
            //3按照tuple._1进行combiner
            JavaPairRDD<String, Iterable<String>> rdd_byKey = rdd_tuple2_list.groupByKey();
    
            //4按照tuple._1进行用户数量去重后的统计
            JavaPairRDD<String, Integer> rdd_byKey_uv = rdd_byKey.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, Integer>() {//tuple._1仍然为时间_搜索词,而tuple._2变为用户去重后的数量
                @Override
                public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
                    String tuple_1 = stringIterableTuple2._1();
                    Iterable<String> userNames = stringIterableTuple2._2();
                    Set<String> userNameSet = new HashSet<String>();
                    for (String item : userNames) {
                        userNameSet.add(item);//用户名称
                    }
                    return new Tuple2<String, Integer>(tuple_1, userNameSet.size());
                }
            });
    
            //5构建rdd<Row>用来映射DataFrame
            JavaRDD<Row> rdd_byKey_row_uv = rdd_byKey_uv.map(new Function<Tuple2<String, Integer>, Row>() {
                @Override
                public Row call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    String ary[] = stringIntegerTuple2._1().split("_");
                    return RowFactory.create(ary[0], ary[1], stringIntegerTuple2._2());
                }
            });
    
            List<StructField> list = new ArrayList<StructField>();
            list.add(DataTypes.createStructField("date", DataTypes.StringType, true));
            list.add(DataTypes.createStructField("word", DataTypes.StringType, true));
            list.add(DataTypes.createStructField("uv_num", DataTypes.IntegerType, true));
            StructType tmpType = DataTypes.createStructType(list);
            DataFrame df_tuple = sqlContext.createDataFrame(rdd_byKey_row_uv, tmpType);
            df_tuple.registerTempTable("tuple_keyDS_valUN");
    
            //6使用DataFrame结合开窗函数row_number分组后过滤出访问量前3的搜索词
            StringBuilder _sb = new StringBuilder();
            _sb.append("select date,word,uv_num from ( ");
            _sb.append(" select date,word,uv_num, row_number() OVER (PARTITION BY date ORDER BY uv_num DESC ) as rank from tuple_keyDS_valUN ");
            _sb.append(" ) tmp_group_top3 where rank<=3");
    
            DataFrame df_tuple_groupTop3 = sqlContext.sql(_sb.toString()).cache();
            //df_tuple_groupTop3.show();//***************在最下面打印
    
            //=====到这里已经获取到每天前3的“搜索词“和“uv数“,并倒叙排序
    
            //在获取每天排名前三“搜索词”的总uv数
    
            //7将结果从DataFrame转换回rdd,并拼接成tuple2(日期,总访问量_访问词)
            JavaPairRDD<String, String> rdd_uvKey = df_tuple_groupTop3.javaRDD().mapToPair(new PairFunction<Row, String, String>() {
                @Override
                public Tuple2<String, String> call(Row row) throws Exception {
                    String date = row.getString(0);
                    String word = row.getString(1);
                    Integer uv_mun = row.getInt(2);
                    return new Tuple2<String, String>(date, uv_mun + "_" + word);
                }
            });
    
            //8mapToPair后继续按照key合并
            JavaPairRDD<String, Iterable<String>> rdd_dateKey_group = rdd_uvKey.groupByKey();
    
            JavaPairRDD<Integer, String> rdd_uvKey_combiner = rdd_dateKey_group.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
                    Integer uv_sum = 0;
                    String data_word = "";
                    Iterable<String> uv_word = stringIterableTuple2._2();
                    Iterator<String> uv_word_it = uv_word.iterator();
                    for (; uv_word_it.hasNext(); ) {
                        String uv_word_str = uv_word_it.next();
                        String ary[] = uv_word_str.split("_");
                        Integer uv = Integer.valueOf(ary[0]);
                        uv_sum += uv;//累加总uv数
                        String word = ary[1];
                        data_word += stringIterableTuple2._1() + "_" + word + "|";
                    }
    
                    return new Tuple2<Integer, String>(uv_sum, data_word);
                }
            });
    
            JavaPairRDD<Integer, String> rdd_uvKey_sort = rdd_uvKey_combiner.sortByKey(false);
    
            List<Tuple2<Integer, String>> ret = rdd_uvKey_sort.collect();
            for (Tuple2<Integer, String> item : ret) {
                System.out.println(item._1() + "<--->" + item._2());
            }
            df_tuple_groupTop3.show();
  • 相关阅读:
    Xcode 配置常用变量(SRCROOT, PROJECT_DIR, PROJECT_NAME)
    Git submodule实战
    Charles抓Https的包
    Vue-Quill-Editor 富文本编辑器的使用
    vue计算属性无法监听到数组内部变化
    移动端键盘弹起导致底部按钮上浮解决方案
    js中数组删除 splice和delete的区别,以及delete的使用
    js实现复制input的value到剪切板
    treetable
    vue中状态管理vuex的使用分享
  • 原文地址:https://www.cnblogs.com/zzq-include/p/8783811.html
Copyright © 2011-2022 走看看