zoukankan      html  css  js  c++  java
  • 48、Spark SQL之与Spark Core整合之每日top3热点搜索词统计案例实战

    一、概述

    1、需求分析

    数据格式:
    日期 用户 搜索词 城市 平台 版本
    
    需求:
    1、筛选出符合查询条件(城市、平台、版本)的数据
    2、统计出每天搜索uv排名前3的搜索词
    3、按照每天的top3搜索词的uv搜索总次数,倒序排序
    4、将数据保存到hive表中
    
    
    
    
    
    ###数据 keyword.txt
    

    2018-10-1:leo:water:beijing:android:1.0
    2018-10-1:leo1:water:beijing:android:1.0
    2018-10-1:leo2:water:beijing:android:1.0
    2018-10-1:jack:water:beijing:android:1.0
    2018-10-1:jack1:water:beijing:android:1.0
    2018-10-1:leo:seafood:beijing:android:1.0
    2018-10-1:leo1:seafood:beijing:android:1.0
    2018-10-1:leo2:seafood:beijing:android:1.0
    2018-10-1:leo:food:beijing:android:1.0
    2018-10-1:leo1:food:beijing:android:1.0
    2018-10-1:leo2:meat:beijing:android:1.0
    2018-10-2:leo:water:beijing:android:1.0
    2018-10-2:leo1:water:beijing:android:1.0
    2018-10-2:leo2:water:beijing:android:1.0
    2018-10-2:jack:water:beijing:android:1.0
    2018-10-2:leo1:seafood:beijing:android:1.0
    2018-10-2:leo2:seafood:beijing:android:1.0
    2018-10-2:leo3:seafood:beijing:android:1.0
    2018-10-2:leo1:food:beijing:android:1.0
    2018-10-2:leo2:food:beijing:android:1.0
    2018-10-2:leo:meat:beijing:android:1.0


    ####

    1、如果文本案例使用的是txt编辑,将文本保存ANSI格式,否则在groupByKey的时候,第一行默认会出现一个空格,分组失败。

    2、文本的最后禁止出现空行,否则在split的时候会报错,出现数组越界的错误;


    2、思路

    1、针对原始数据(HDFS文件),获取输入的RDD
    
    2、使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合查询条件的数据。
      2.1 普通的做法:直接在fitler算子函数中,使用外部的查询条件(Map),但是,这样做的话,是不是查询条件Map,
      会发送到每一个task上一份副本。(性能并不好)
      2.2 优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量进行数据筛选。
      
    3、将数据转换为“(日期_搜索词, 用户)”格式,然后呢,对它进行分组,然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,
    并统计去重后的数量,即为每天每个搜索词的uv。最后,获得“(日期_搜索词, uv)”
    
    4、将得到的每天每个搜索词的uv,RDD,映射为元素类型为Row的RDD,将该RDD转换为DataFrame
    
    5、将DataFrame注册为临时表,使用Spark SQL的开窗函数,来统计每天的uv数量排名前3的搜索词,以及它的搜索uv,最后获取,是一个DataFrame
    
    6、将DataFrame转换为RDD,继续操作,按照每天日期来进行分组,并进行映射,计算出每天的top3搜索词的搜索uv的总数,然后将uv总数作为key,
    将每天的top3搜索词以及搜索次数,拼接为一个字符串
    
    7、按照每天的top3搜索总uv,进行排序,倒序排序
    
    8、将排好序的数据,再次映射回来,变成“日期_搜索词_uv”的格式
    
    9、再次映射为DataFrame,并将数据保存到Hive中即可


    二、java实现

    package cn.spark.study.sql;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.hive.HiveContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import scala.Tuple2;
    
    import java.util.*;
    
    public class DailyTop3Keyword {
        @SuppressWarnings("deprecation")
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            JavaSparkContext jsc = new JavaSparkContext(conf);
            SQLContext sqlContext = new HiveContext(jsc.sc());
            
            // 伪造数据(这些数据可以来自mysql数据库)
            final HashMap<String, List<String>> queryParaMap = new HashMap<String, List<String>>();
            queryParaMap.put("city", Arrays.asList("beijing"));
            queryParaMap.put("platform", Arrays.asList("android"));
            queryParaMap.put("version", Arrays.asList("1.0", "1.2", "2.0", "1.5"));
            
            // 将数据进行广播
            final Broadcast<HashMap<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParaMap);
            
            // 针对HDFS文件中的日志,获取输入RDD
            JavaRDD<String> rowRDD = jsc.textFile("hdfs://spark1:9000/spark-study/keyword.txt");
            
            // filter算子进行过滤
            JavaRDD<String> filterRDD = rowRDD.filter(new Function<String, Boolean>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Boolean call(String log) throws Exception {
                    // 切割原始日志,获取城市、平台和版本
                    String[] logSplit = log.split(":");
                    String city = logSplit[3];
                    String platform = logSplit[4];
                    String version = logSplit[5];
                    
                    // 与查询条件进行比对,任何一个条件,只要该条件设置了,且日志中的数据没有满足条件
                    // 则直接返回false,过滤掉该日志
                    // 否则,如果所有设置的条件,都有日志中的数据,则返回true,保留日志
                    HashMap<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
                    List<String> cities = queryParamMap.get("city");
                    if (!cities.contains(city) && cities.size() > 0) {
                        return false;
                    }
                    List<String> platforms = queryParamMap.get("platform");
                    if (!platforms.contains(platform)) {
                        return false;
                    }
                    List<String> versions = queryParamMap.get("version");
                    if (!versions.contains(version)) {
                        return false;
                    }
    
                    return true;
                }
            });
            
            // 过滤出来的原始日志,映射为(日期_搜索词,用户)格式
            JavaPairRDD<String, String> dateKeyWordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, String> call(String log) throws Exception {
                    String[] logSplit = log.split(":");
                    String date = logSplit[0];
                    String user = logSplit[1];
                    String keyword = logSplit[2];
                    return new Tuple2<String, String>(date + "_" + keyword, user);
                }
            });
            
            // 进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)
            JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeyWordUserRDD.groupByKey();
            List<Tuple2<String, Iterable<String>>> collect1 = dateKeywordUsersRDD.collect();
            for (Tuple2<String, Iterable<String>> tuple2 : collect1) {
                System.out.println("进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)" + tuple2._2);
                System.out.println(tuple2);
            }
    
            // 对每天每个搜索词的搜索用户  去重操作  获得前uv
            JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair
                    (new PairFunction<Tuple2<String, Iterable<String>>, String, Long>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> dataKeywordUsers) throws Exception {
                            String dateKeyword = dataKeywordUsers._1;
                            Iterator<String> users = dataKeywordUsers._2.iterator();
                            // 对用户去重   并统计去重后的数量
                            List<String> distinctUsers = new ArrayList<String>();
                            while (users.hasNext()) {
                                String user = users.next();
                                if (!distinctUsers.contains(user)) {
                                    distinctUsers.add(user);
                                }
                            }
                            // 获取uv
                            long uv = distinctUsers.size();
                            // 日期_搜索词,用户个数
                            return new Tuple2<String, Long>(dateKeyword, uv);
                        }
                    });
            List<Tuple2<String, Long>> collect2 = dateKeywordUvRDD.collect();
            for (Tuple2<String, Long> stringLongTuple2 : collect2) {
                System.out.println("对每天每个搜索词的搜索用户  去重操作  获得前uv");
                System.out.println(stringLongTuple2);
            }
    
    
            // 将每天每个搜索词的uv数据,转换成DataFrame
            JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(new Function<Tuple2<String, Long>, Row>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {
                    String date = dateKeywordUv._1.split("_")[0];
                    String keyword = dateKeywordUv._1.split("_")[1];
                    long uv = dateKeywordUv._2;
                    return RowFactory.create(date, keyword, uv);
                }
            });
            ArrayList<StructField> fields = new ArrayList<StructField>();
            fields.add(DataTypes.createStructField("date", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("keyword", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("uv", DataTypes.LongType, true));
            StructType structType = DataTypes.createStructType(fields);
            DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType);
            dateKeywordUvDF.registerTempTable("sales");
            
            // 使用开窗函数,统计每天搜索uv排名前三的热点搜索词
            // 日期  搜索词   人数个数  前三名
            final DataFrame dailyTop3KeyWordDF = sqlContext.sql("select date,keyword,uv from (select date, keyword, uv, row_number() over (partition by date order by uv DESC ) rank from sales ) tmp_sales where rank <=3");
            // 将DataFrame转换为RDD, 映射,
            JavaRDD<Row> dailyTop3KeyWordRDD = dailyTop3KeyWordDF.javaRDD();
    
            JavaPairRDD<String, String> dailyTop3KeywordRDD = dailyTop3KeyWordRDD.mapToPair(new PairFunction<Row, String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, String> call(Row row) throws Exception {
                    String date = String.valueOf(row.get(0));
                    String keyword = String.valueOf(row.get(1));
                    String uv = String.valueOf(row.get(2));
                    // 映射为  日期  搜索词_总个数
                    return new Tuple2<String, String>(date, keyword + "_" + uv);
                }
            });
    
            List<Tuple2<String, String>> collect = dailyTop3KeywordRDD.collect();
            for (Tuple2<String, String> stringStringTuple2 : collect) {
                System.out.println("开窗函数操作");
                System.out.println(stringStringTuple2);
            }
    
    
            // 根据 日期分组
            JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = dailyTop3KeywordRDD.groupByKey();
            // 进行映射
            JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Long, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
                    String date = tuple._1;
                    // 搜索词_总个数  集合
                    Iterator<String> KeyWordUviterator = tuple._2.iterator();
                    long totalUv = 0L;
                    String dateKeyword = date;
                    while (KeyWordUviterator.hasNext()) {
                        // 搜索词_个数
                        String keywoarUv = KeyWordUviterator.next();
                        Long uv = Long.valueOf(keywoarUv.split("_")[1]);
                        totalUv += uv;
                        dateKeyword = dateKeyword + "," + keywoarUv;
                    }
    
                    return new Tuple2<Long, String>(totalUv, dateKeyword);
                }
            });
            JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
            List<Tuple2<Long, String>> rows = sortedUvDateKeywordsRDD.collect();
            for (Tuple2<Long, String> row : rows) {
                System.out.println(row._2 + "    " + row._1);
            }
    
    
            // 映射
            JavaRDD<Row> resultRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long, String>, Row>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<Row> call(Tuple2<Long, String> tuple) throws Exception {
                    String dateKeywords = tuple._2;
                    String[] dateKeywordsSplit = dateKeywords.split(",");
                    String date = dateKeywordsSplit[0];
                    ArrayList<Row> rows = new ArrayList<Row>();
                    rows.add(RowFactory.create(date, dateKeywordsSplit[1].split("_")[0],
                            Long.valueOf(dateKeywordsSplit[1].split("_")[1])));
    
                    rows.add(RowFactory.create(date, dateKeywordsSplit[2].split("_")[0],
                            Long.valueOf(dateKeywordsSplit[2].split("_")[1])));
    
                    rows.add(RowFactory.create(date, dateKeywordsSplit[3].split("_")[0],
                            Long.valueOf(dateKeywordsSplit[3].split("_")[1])));
    
                    return rows;
                }
            });
            
            // 将最终的数据,转换为DataFrame,并保存到Hive表中
            DataFrame finalDF = sqlContext.createDataFrame(resultRDD, structType);
    //        List<Row> rows1 = finalDF.javaRDD().collect();
    //        for (Row row : rows1) {
    //            System.out.println(row);
    //        }
            finalDF.saveAsTable("daily_top3_keyword_uv");
    
            jsc.close();
    
        }
    }
  • 相关阅读:
    【持续更新】GDB使用笔记
    PL/SQL Developer-官网下载地址
    kali 下程序卸载方法
    python2 安装scrapy出现错误提示解决办法~
    pyHook监听用户鼠标、键盘事件
    pip 安装pandas报UnicodeDecodeError: 'ascii' codec can't decode byte 0xd5错
    Python模块常用的几种安装方式
    解决Kali Linux没有声音
    关于破解路由器密码
    解决Python2.7的UnicodeEncodeError: ‘ascii’ codec can’t encode异常错误
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11319744.html
Copyright © 2011-2022 走看看