zoukankan      html  css  js  c++  java
  • spark-sql缩减版样例:获取每日top3搜索词和各自的次数,包括总次数

    //获取出每天前3的搜索词
            ArrayList<String> log = new ArrayList<String>();
            log.add("2015-10-01,leo,a1,beijing,android");
            log.add("2015-10-01,leo,a1,beijing,android");
            log.add("2015-10-01,tom,a1,beijing,android");
            log.add("2015-10-01,jack,a1,beijing,android");
            log.add("2015-10-01,marry,a1,beijing,android");
            log.add("2015-10-01,tom,bbf,beijing,android");
            log.add("2015-10-01,jack,bbf,beijing,iphone");
            log.add("2015-10-01,jack,bbf,beijing,android");
            log.add("2015-10-01,leo,ttyu,beijing,android");
            log.add("2015-10-01,leo,ttyu,beijing,android");
            log.add("2015-10-01,wede,a1,beijing,android");
            log.add("2015-10-01,wede,bbf,beijing,iphone");
            log.add("2015-10-02,leo,a2,beijing,android");
            log.add("2015-10-02,tom,a2,beijing,android");
            log.add("2015-10-02,tom,a2,beijing,android");
            log.add("2015-10-02,jack,a1,beijing,android");
            log.add("2015-10-02,marry,a1,beijing,android");
            log.add("2015-10-02,leo,bbf,beijing,iphone");
            log.add("2015-10-02,jack,bbf,beijing,android");
            log.add("2015-10-02,wede,bbf,beijing,android");
            log.add("2015-10-02,leo,ttyu,beijing,android");
            log.add("2015-10-02,leo,ttyu,beijing,android");
            log.add("2015-10-02,jack,a1,beijing,android");
            log.add("2015-10-02,wede,tour,beijing,android");
    
            SparkConf conf = new SparkConf()
    //                .setMaster("local")
                    .setAppName("Top3UV");
            JavaSparkContext sc = new JavaSparkContext(conf);
            HiveContext sqlContext = new HiveContext(sc.sc());
    
            JavaRDD<String> rdd_list = sc.parallelize(log, 2);
            //0条件使用broadcast(每个worker节点共享一个变量)
            final org.apache.spark.broadcast.Broadcast<String> bc = sc.broadcast("iphone");
    
            //1条件过滤
            JavaRDD<String> rdd_filter_list = rdd_list.filter(new Function<String, Boolean>() {
                @Override
                public Boolean call(String v1) throws Exception {
                    String ary[] = v1.split(",");
                    String platform = ary[4];
                    if (platform.contains(bc.value()))
                        return false;
                    return true;
                }
            });
            //2将每行数据构建成tuple2
            JavaPairRDD<String, String> rdd_tuple2_list = rdd_filter_list.mapToPair(new PairFunction<String, String, String>() {
                @Override
                public Tuple2<String, String> call(String s) throws Exception {
                    String ary[] = s.split(",");
                    String time = ary[0];
                    String word = ary[2];
                    String userName = ary[1];
                    return new Tuple2<String, String>(time + "_" + word, userName);
                }
            });
            //3按照tuple._1进行combiner
            JavaPairRDD<String, Iterable<String>> rdd_byKey = rdd_tuple2_list.groupByKey();
    
            //4按照tuple._1进行用户数量去重后的统计
            JavaPairRDD<String, Integer> rdd_byKey_uv = rdd_byKey.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, Integer>() {//tuple._1仍然为时间_搜索词,而tuple._2变为用户去重后的数量
                @Override
                public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
                    String tuple_1 = stringIterableTuple2._1();
                    Iterable<String> userNames = stringIterableTuple2._2();
                    Set<String> userNameSet = new HashSet<String>();
                    for (String item : userNames) {
                        userNameSet.add(item);//用户名称
                    }
                    return new Tuple2<String, Integer>(tuple_1, userNameSet.size());
                }
            });
    
            //5构建rdd<Row>用来映射DataFrame
            JavaRDD<Row> rdd_byKey_row_uv = rdd_byKey_uv.map(new Function<Tuple2<String, Integer>, Row>() {
                @Override
                public Row call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    String ary[] = stringIntegerTuple2._1().split("_");
                    return RowFactory.create(ary[0], ary[1], stringIntegerTuple2._2());
                }
            });
    
            List<StructField> list = new ArrayList<StructField>();
            list.add(DataTypes.createStructField("date", DataTypes.StringType, true));
            list.add(DataTypes.createStructField("word", DataTypes.StringType, true));
            list.add(DataTypes.createStructField("uv_num", DataTypes.IntegerType, true));
            StructType tmpType = DataTypes.createStructType(list);
            DataFrame df_tuple = sqlContext.createDataFrame(rdd_byKey_row_uv, tmpType);
            df_tuple.registerTempTable("tuple_keyDS_valUN");
    
            //6使用DataFrame结合开窗函数row_number分组后过滤出访问量前3的搜索词
            StringBuilder _sb = new StringBuilder();
            _sb.append("select date,word,uv_num from ( ");
            _sb.append(" select date,word,uv_num, row_number() OVER (PARTITION BY date ORDER BY uv_num DESC ) as rank from tuple_keyDS_valUN ");
            _sb.append(" ) tmp_group_top3 where rank<=3");
    
            DataFrame df_tuple_groupTop3 = sqlContext.sql(_sb.toString()).cache();
            //df_tuple_groupTop3.show();//***************在最下面打印
    
            //=====到这里已经获取到每天前3的“搜索词“和“uv数“,并倒叙排序
    
            //在获取每天排名前三“搜索词”的总uv数
    
            //7将结果从DataFrame转换回rdd,并拼接成tuple2(日期,总访问量_访问词)
            JavaPairRDD<String, String> rdd_uvKey = df_tuple_groupTop3.javaRDD().mapToPair(new PairFunction<Row, String, String>() {
                @Override
                public Tuple2<String, String> call(Row row) throws Exception {
                    String date = row.getString(0);
                    String word = row.getString(1);
                    Integer uv_mun = row.getInt(2);
                    return new Tuple2<String, String>(date, uv_mun + "_" + word);
                }
            });
    
            //8mapToPair后继续按照key合并
            JavaPairRDD<String, Iterable<String>> rdd_dateKey_group = rdd_uvKey.groupByKey();
    
            JavaPairRDD<Integer, String> rdd_uvKey_combiner = rdd_dateKey_group.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
                    Integer uv_sum = 0;
                    String data_word = "";
                    Iterable<String> uv_word = stringIterableTuple2._2();
                    Iterator<String> uv_word_it = uv_word.iterator();
                    for (; uv_word_it.hasNext(); ) {
                        String uv_word_str = uv_word_it.next();
                        String ary[] = uv_word_str.split("_");
                        Integer uv = Integer.valueOf(ary[0]);
                        uv_sum += uv;//累加总uv数
                        String word = ary[1];
                        data_word += stringIterableTuple2._1() + "_" + word + "|";
                    }
    
                    return new Tuple2<Integer, String>(uv_sum, data_word);
                }
            });
    
            JavaPairRDD<Integer, String> rdd_uvKey_sort = rdd_uvKey_combiner.sortByKey(false);
    
            List<Tuple2<Integer, String>> ret = rdd_uvKey_sort.collect();
            for (Tuple2<Integer, String> item : ret) {
                System.out.println(item._1() + "<--->" + item._2());
            }
            df_tuple_groupTop3.show();
  • 相关阅读:
    结巴分词 0.14 版发布,Python 中文分词库
    Lazarus 1.0.2 发布,Pascal 集成开发环境
    Android全屏 去除标题栏和状态栏
    服务器日志现 Android 4.2 传将添多项新特性
    Percona XtraBackup 2.0.3 发布
    长平狐 Android 强制设置横屏或竖屏 设置全屏
    NetBeans 7.3 Beta 发布,全新的 HTML5 支持
    CppDepend现在已经支持Linux
    GromJS 1.7.18 发布,服务器端的 JavaScript
    Apache OpenWebBeans 1.1.6 发布
  • 原文地址:https://www.cnblogs.com/zzq-include/p/8783811.html
Copyright © 2011-2022 走看看