zoukankan      html  css  js  c++  java
  • java spark sql 计算各个省份广告点击数的top3

    同这个需求一样,用spark sql的方式实现(相对来说简单一点)

    https://www.cnblogs.com/7749ha/p/12909115.html

    package sparksql;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.api.java.function.Function;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/5/15 16:44
     **/
    public class sparksql_test {
        public static void main(String[] args) throws Exception{
            SparkSession spark = SparkSession
                    .builder()
                    .appName("Java Spark SQL basic example")
                    .getOrCreate();
            SparkContext sc = spark.sparkContext();
            JavaRDD<String> stringJavaRDD = sc.textFile("D:/tmp/rizhi.txt", 1).toJavaRDD();
            // 创建列
            String schemaString = "ts province city user ad";
            List<StructField> fields = new ArrayList<>();
            for(String fieldname:schemaString.split(" ")){
                fields.add(DataTypes.createStructField(fieldname,DataTypes.StringType,true));
            }
            // 添加列
            StructType structType = DataTypes.createStructType(fields);
            JavaRDD<Row> rowRDD = stringJavaRDD.map(
                    (Function<String, Row>) record -> {
                String[] attributes = record.split(" ");
                return RowFactory.create(attributes[0], attributes[1].trim(),attributes[2],attributes[3],attributes[4]);
            });
            Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, structType);
            // 显示查看
            dataFrame.show();
            // 创建临时view
            dataFrame.createTempView("people");
            // sql处理需求
            String sql = "select * from (select province,ad,c,row_number() over(partition by province order by c desc) as ind from (select province,ad,count(*) as c from people group by province,ad)t)t where ind <3";
            spark.sql(sql).show();
    
    
        }
    }
  • 相关阅读:
    You Don't Know JS: Async & Performance(第2章,Callbacks)
    You Don't Know JS: this & Object Prototypes (第6章 Behavior Delegation)附加的ES6 class未读
    C#抓取网页内容
    输出jq对象
    ASP.NET MVC 常用内置验证特性
    MVC不错的学习资料
    让Entity Framework启动不再效验__MigrationHistory表
    多线程
    递归
    序列化
  • 原文地址:https://www.cnblogs.com/7749ha/p/12910407.html
Copyright © 2011-2022 走看看