zoukankan      html  css  js  c++  java
  • SparkSession

    在2.0版本之前,使用Spark必须先创建SparkConf和SparkContext

    catalog:目录

    Spark2.0中引入了SparkSession的概念,SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession 当中,并且可以通过 builder 的方式创建;可以通过 SparkSession 创建并操作 Dataset 和 DataFrame

    SparkSession  The entry point to programming Spark with the Dataset and DataFrame API.

    scala> import org.apache.spark.sql.SparkSession
    SparkSession SparkSessionExtensions

    scala> val spsession=SparkSession.builder().getOrCreate()
    spsession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@577d07b

    scala> session.
    baseRelationToDataFrame conf emptyDataFrame implicits range sessionState sql streams udf
    catalog createDataFrame emptyDataset listenerManager read sharedState sqlContext table version
    close createDataset experimental newSession readStream sparkContext stop time

    scala> spsession.read.
    csv format jdbc json load option options orc parquet schema table text textFile

    --------------------------------------------------------------------------------------------------------------------------------------


    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession


    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    //session的导入隐式转换

    scala> import spsession.implicits._
    import spsession.implicits._


    scala> lines.show
    +-----------------+
    | value|
    +-----------------+
    |2,zhangsan,50,866|
    | 4,laoliu,522,30|
    |5,zhangsan,20,565|
    | 6,limi,522,65|
    | 1,xiliu,50,6998|
    | 7,llihmj,23,565|
    +-----------------+


    scala> val rowrdd=lines.map(x=>{val arr=x.split("[,]");(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt)})
    rowrdd: org.apache.spark.sql.Dataset[(Long, String, Int, Int)] = [_1: bigint, _2: string ... 2 more fields]

    scala> val personDF=rowrdd.toDF("id","name","age","fv")
    personDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

    scala> personDF.printSchema
    root
    |-- id: long (nullable = false)
    |-- name: string (nullable = true)
    |-- age: integer (nullable = false)
    |-- fv: integer (nullable = false)


    scala> personDF.show
    +---+--------+---+----+
    | id| name|age| fv|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

     -------------------------------------------------------------------------

    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession


    scala> val spsession=SparkSession.builder().getOrCreate()
    spsession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4c89c98a

    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> val rowDF=lines.map(x=>{val arr=x.split("[,]");(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt)})
    rowDF: org.apache.spark.sql.Dataset[(Long, String, Int, Int)] = [_1: bigint, _2: string ... 2 more fields]


    scala> rowDF.printSchema
    root
    |-- _1: long (nullable = false)
    |-- _2: string (nullable = true)
    |-- _3: integer (nullable = false)
    |-- _4: integer (nullable = false)

    scala> rowDF.show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

    scala> rowDF.createTempView("Aaa")

    scala> spsession.sql("select * from Aaa").show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

    scala> import spsession.implicits._
    import spsession.implicits._

    scala> lines.show
    +-----------------+
    | value|
    +-----------------+
    |2,zhangsan,50,866|
    | 4,laoliu,522,30|
    |5,zhangsan,20,565|
    | 6,limi,522,65|
    | 1,xiliu,50,6998|
    | 7,llihmj,23,565|
    +-----------------+

    scala> val wordDF=lines.flatMap(_.split(","))
    wordDF: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> wordDF.groupBy($"value" as "word").count
    res24: org.apache.spark.sql.DataFrame = [word: string, count: bigint]


    scala> wordDF.groupBy($"value" as "word").agg(count("*") as "count")
    res30: org.apache.spark.sql.DataFrame = [word: string, count: bigint]

    scala> rowDF.groupBy($"_3" as "age").agg(count("*") as "count",avg($"_4") as "avg").show
    +---+-----+------+
    |age|count| avg|
    +---+-----+------+
    | 20| 1| 565.0|
    | 23| 1| 565.0|
    | 50| 2|3932.0|
    |522| 2| 47.5|
    +---+-----+------+

    scala> rowDF.groupBy($"_3" as "age").agg(count("*"),avg($"_4")).show
    +---+--------+-------+
    |age|count(1)|avg(_4)|
    +---+--------+-------+
    | 20| 1| 565.0|
    | 23| 1| 565.0|
    | 50| 2| 3932.0|
    |522| 2| 47.5|
    +---+--------+-------+

    A DataFrame is a Dataset organized into named columns.

    scala> val jsonDF=spsession.read.json("/tmp/pdf1json/part*")
    jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]

    scala> spsession.read.json("/tmp/pdf1json/part*").show
    +---+----+--------+
    |age| fv| name|
    +---+----+--------+
    | 50|6998| xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565| llihmj|
    +---+----+--------+


    scala> spsession.read.format("json").load("/tmp/pdf1json/part*").show
    +---+----+--------+
    |age| fv| name|
    +---+----+--------+
    | 50|6998| xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565| llihmj|
    +---+----+--------+

    scala> val jsonDF=spsession.read.json("/tmp/pdf1json/part*")
    jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]

    scala> jsonDF.cube("age").mean("fv").show
    +----+-------+
    | age|avg(fv)|
    +----+-------+
    | 20| 565.0|
    |null| 2248.5|
    | 50| 3932.0|
    | 23| 565.0|
    +----+-------+

    scala> jsonDF.cube("age").agg(max("fv"),count("name"),sum("fv")).show
    +----+-------+-----------+-------+
    | age|max(fv)|count(name)|sum(fv)|
    +----+-------+-----------+-------+
    | 20| 565| 1| 565|
    |null| 6998| 4| 8994|
    | 50| 6998| 2| 7864|
    | 23| 565| 1| 565|

    ---------------------------------------------------------------

    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> lines.show
    +-----------------+
    | value|
    +-----------------+
    |2,zhangsan,50,866|
    | 4,laoliu,522,30|
    |5,zhangsan,20,565|
    | 6,limi,522,65|
    | 1,xiliu,50,6998|
    | 7,llihmj,23,565|
    +-----------------+


    scala> val lineds=lines.map(x=>{val arr=x.split(",");(arr(0),arr(1),arr(2),arr(3))})
    lineds: org.apache.spark.sql.Dataset[(String, String, String, String)] = [_1: string, _2: string ... 2 more fields]

    scala> lineds.show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+


    scala> val personDF= lineds.withColumnRenamed("_1","id").withColumnRenamed("_2","name")
    personDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

    scala> personDF.show
    +---+--------+---+----+
    | id| name| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+


    scala> personDF.sort($"id" desc).show
    warning: there was one feature warning; re-run with -feature for details
    +---+--------+---+----+
    | id| name| _3| _4|
    +---+--------+---+----+
    | 7| llihmj| 23| 565|
    | 6| limi|522| 65|
    | 5|zhangsan| 20| 565|
    | 4| laoliu|522| 30|
    | 2|zhangsan| 50| 866|
    | 1| xiliu| 50|6998|
    +---+--------+---+----+


    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> lines.map(x=>{val arr= x.split(",");(arr(0),arr(1),arr(2),arr(3))}).toDF("id","name","age","fv").show
    +---+--------+---+----+
    | id| name|age| fv|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

  • 相关阅读:
    sudo apt-get install openssh-server时提示需要安装1:6.6p1-2ubuntu1的解决办法(图文详解)
    Elasticsearch之Hadoop插件的安装(图文详解)
    [转]VC++的类头文件
    [转]VC++中对文件的写入和读取
    [转]在VS2010 VC++项目中引用Lib静态库(以Openssl为例)
    [转]List of Visual Studio Project Type GUIDs
    [转]如何使用MFC和类型库创建自动化项目
    [转]深入浅出WPF(7)——数据的绿色通道,Binding
    [转]自定义ASP.NET MVC JsonResult序列化结果
    [转]ASP.NET MVC Json()处理大数据异常解决方法 json maxjsonlength
  • 原文地址:https://www.cnblogs.com/playforever/p/8144312.html
Copyright © 2011-2022 走看看