zoukankan      html  css  js  c++  java
  • sparksql系列(二) sparksql常规操作

    import java.util.Arrays

    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
    import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
    import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.sql.SaveMode

    object WordCount {

      def initSparkAndData() : DataFrame = {

        val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
        val javasc = new JavaSparkContext(sparkSession.sparkContext)
        val nameRDD = javasc.parallelize(Arrays.asList("{'name':'wangwu','age':'18','vip':'t'}",
          "{'name':'sunliu','age':'19','vip':'t'}","{'name':'zhangsan','age':'20','vip':'f'}"));
        val namedf = sparkSession.read.json(nameRDD)

        namedf
      }

      def main(args: Array[String]): Unit = {
        val data = initSparkAndData()


      }
    }

    上面讲sparksession初始化和数据的加载定义为一个方法,方便后续叙述

    select,filter,groupby,sum

      val data = initSparkAndData()

      val simpleoption = data.select(col("name"),col("age"),col("vip"))
        .filter(col("name") =!= "zhangsan" && col("vip") === "t")             //其实是zhangsan的过滤,主要是展示===和=!=的写法
        .groupBy(col("vip"))                                                                        //sql理解就是 group by 语句
        .agg(sum(col("age")) as "sumage")                                             //sql理解就是 sum语句
        .show(100)                                                                                     //显示vip、sumage两列,sparksql自动补齐 

      val simpleoption = data.select(col("name"),col("age"),col("vip"))
        .filter(col("name") =!= "zhangsan" && col("vip") === "t")
        .groupBy(col("vip"))
        .agg(avg(col("age")) as "avgage")                                               //求平均值
        .show(100)

      val simpleoption = data.select(col("name"),col("age"),col("vip"))
        .filter(col("name") =!= "zhangsan" && col("vip") === "t")
        .groupBy(col("vip"))
        .agg(count(col("vip")) as "vipnumber")                                       //count个数
        .show(100)

      val simpleoption = data.select(col("name"),col("age"),col("vip"))
        .filter(col("name") =!= "zhangsan" && col("vip") === "t")
        .groupBy(col("vip"))
        .agg(countDistinct(col("vip")) as "vipnumber")                            //count个数的时候去重
        .show(100)

    DataFrame中sum值转换为数字

                    DataFrame中没有将sum的值转换为数字的直接方法,所以需要自己手动写

        

        val data = initSparkAndData()
        val simpleoption = data.select(col("name"),col("age"),col("vip"))
          .filter(col("name") === "zhangsan" && col("vip") =!= "t")
          .groupBy(col("vip"))
    .      agg(sum(col("age")) as "sumage")

        val collection = simpleoption.select(col("sumage")).rdd.collect()
        val value = if(collection.length > 0) collection.apply(0).toString().replace("[", "").replace("]", "").toString() else "0"
        println(collection.apply(0).toString())
        println(value)

    DataFrame中列in使用

         val data = initSparkAndData()

        val nameList1 = List("wangwu","zhangsan")
        val nameList2 = data.select(col("name")).rdd.map(r => r(0).toString).collect().toList
        println(nameList1)
        println(nameList2)
        val simpleoption = data.select(col("name"),col("age"),col("vip"))
          .filter(col("name").isin(nameList1:_*)).show(100)

    DataFrame中case when

                val sparkSession = SparkSession.builder().master("local").getOrCreate()
                val javasc = new JavaSparkContext(sparkSession.sparkContext)

                val nameRDD1 = javasc.parallelize(Arrays.asList("{'id':'7'}", "{'id':'8'}", "{'id':'9'}"));
                val nameRDD1df = sparkSession.read.json(nameRDD1)

                nameRDD1df.withColumn("idNumber",
                                        when(col("id") === "7","id7").when(col("id") === "8","id8")
                                        .when(col("id") === "9","id9").otherwise("idother")
                            ).show(100)

  • 相关阅读:
    基于OCR的SeeTest框架可行性分析总结
    第5章1节《MonkeyRunner源码剖析》Monkey原理分析-启动运行: 官方简介(原创)
    第3章3节《MonkeyRunner源码剖析》脚本编写示例: MonkeyImage API使用示例(原创)
    第3章2节《MonkeyRunner源码剖析》脚本编写示例: MonkeyDevice API使用示例(原创)
    第3章1节《MonkeyRunner源码剖析》脚本编写示例: MonkeyRunner API使用示例(原创)
    第4章3节《MonkeyRunner源码剖析》ADB协议及服务: ADB协议概览SYNC.TXT翻译参考(原创)
    第4章2节《MonkeyRunner源码剖析》ADB协议及服务: ADB服务SERVICES.TXT翻译参考(原创)
    第4章1节《MonkeyRunner源码剖析》ADB协议及服务: ADB协议概览OVERVIEW.TXT翻译参考(原创)
    最新HTML BroadcastChannel API引荐
    java 错误
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/11668437.html
Copyright © 2011-2022 走看看