zoukankan      html  css  js  c++  java
  • SparkSession

    在2.0版本之前,使用Spark必须先创建SparkConf和SparkContext

    catalog:目录

    Spark2.0中引入了SparkSession的概念,SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession 当中,并且可以通过 builder 的方式创建;可以通过 SparkSession 创建并操作 Dataset 和 DataFrame

    SparkSession  The entry point to programming Spark with the Dataset and DataFrame API.

    scala> import org.apache.spark.sql.SparkSession
    SparkSession SparkSessionExtensions

    scala> val spsession=SparkSession.builder().getOrCreate()
    spsession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@577d07b

    scala> session.
    baseRelationToDataFrame conf emptyDataFrame implicits range sessionState sql streams udf
    catalog createDataFrame emptyDataset listenerManager read sharedState sqlContext table version
    close createDataset experimental newSession readStream sparkContext stop time

    scala> spsession.read.
    csv format jdbc json load option options orc parquet schema table text textFile

    --------------------------------------------------------------------------------------------------------------------------------------


    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession


    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    //session的导入隐式转换

    scala> import spsession.implicits._
    import spsession.implicits._


    scala> lines.show
    +-----------------+
    | value|
    +-----------------+
    |2,zhangsan,50,866|
    | 4,laoliu,522,30|
    |5,zhangsan,20,565|
    | 6,limi,522,65|
    | 1,xiliu,50,6998|
    | 7,llihmj,23,565|
    +-----------------+


    scala> val rowrdd=lines.map(x=>{val arr=x.split("[,]");(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt)})
    rowrdd: org.apache.spark.sql.Dataset[(Long, String, Int, Int)] = [_1: bigint, _2: string ... 2 more fields]

    scala> val personDF=rowrdd.toDF("id","name","age","fv")
    personDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

    scala> personDF.printSchema
    root
    |-- id: long (nullable = false)
    |-- name: string (nullable = true)
    |-- age: integer (nullable = false)
    |-- fv: integer (nullable = false)


    scala> personDF.show
    +---+--------+---+----+
    | id| name|age| fv|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

     -------------------------------------------------------------------------

    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession


    scala> val spsession=SparkSession.builder().getOrCreate()
    spsession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4c89c98a

    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> val rowDF=lines.map(x=>{val arr=x.split("[,]");(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt)})
    rowDF: org.apache.spark.sql.Dataset[(Long, String, Int, Int)] = [_1: bigint, _2: string ... 2 more fields]


    scala> rowDF.printSchema
    root
    |-- _1: long (nullable = false)
    |-- _2: string (nullable = true)
    |-- _3: integer (nullable = false)
    |-- _4: integer (nullable = false)

    scala> rowDF.show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

    scala> rowDF.createTempView("Aaa")

    scala> spsession.sql("select * from Aaa").show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

    scala> import spsession.implicits._
    import spsession.implicits._

    scala> lines.show
    +-----------------+
    | value|
    +-----------------+
    |2,zhangsan,50,866|
    | 4,laoliu,522,30|
    |5,zhangsan,20,565|
    | 6,limi,522,65|
    | 1,xiliu,50,6998|
    | 7,llihmj,23,565|
    +-----------------+

    scala> val wordDF=lines.flatMap(_.split(","))
    wordDF: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> wordDF.groupBy($"value" as "word").count
    res24: org.apache.spark.sql.DataFrame = [word: string, count: bigint]


    scala> wordDF.groupBy($"value" as "word").agg(count("*") as "count")
    res30: org.apache.spark.sql.DataFrame = [word: string, count: bigint]

    scala> rowDF.groupBy($"_3" as "age").agg(count("*") as "count",avg($"_4") as "avg").show
    +---+-----+------+
    |age|count| avg|
    +---+-----+------+
    | 20| 1| 565.0|
    | 23| 1| 565.0|
    | 50| 2|3932.0|
    |522| 2| 47.5|
    +---+-----+------+

    scala> rowDF.groupBy($"_3" as "age").agg(count("*"),avg($"_4")).show
    +---+--------+-------+
    |age|count(1)|avg(_4)|
    +---+--------+-------+
    | 20| 1| 565.0|
    | 23| 1| 565.0|
    | 50| 2| 3932.0|
    |522| 2| 47.5|
    +---+--------+-------+

    A DataFrame is a Dataset organized into named columns.

    scala> val jsonDF=spsession.read.json("/tmp/pdf1json/part*")
    jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]

    scala> spsession.read.json("/tmp/pdf1json/part*").show
    +---+----+--------+
    |age| fv| name|
    +---+----+--------+
    | 50|6998| xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565| llihmj|
    +---+----+--------+


    scala> spsession.read.format("json").load("/tmp/pdf1json/part*").show
    +---+----+--------+
    |age| fv| name|
    +---+----+--------+
    | 50|6998| xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565| llihmj|
    +---+----+--------+

    scala> val jsonDF=spsession.read.json("/tmp/pdf1json/part*")
    jsonDF: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]

    scala> jsonDF.cube("age").mean("fv").show
    +----+-------+
    | age|avg(fv)|
    +----+-------+
    | 20| 565.0|
    |null| 2248.5|
    | 50| 3932.0|
    | 23| 565.0|
    +----+-------+

    scala> jsonDF.cube("age").agg(max("fv"),count("name"),sum("fv")).show
    +----+-------+-----------+-------+
    | age|max(fv)|count(name)|sum(fv)|
    +----+-------+-----------+-------+
    | 20| 565| 1| 565|
    |null| 6998| 4| 8994|
    | 50| 6998| 2| 7864|
    | 23| 565| 1| 565|

    ---------------------------------------------------------------

    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> lines.show
    +-----------------+
    | value|
    +-----------------+
    |2,zhangsan,50,866|
    | 4,laoliu,522,30|
    |5,zhangsan,20,565|
    | 6,limi,522,65|
    | 1,xiliu,50,6998|
    | 7,llihmj,23,565|
    +-----------------+


    scala> val lineds=lines.map(x=>{val arr=x.split(",");(arr(0),arr(1),arr(2),arr(3))})
    lineds: org.apache.spark.sql.Dataset[(String, String, String, String)] = [_1: string, _2: string ... 2 more fields]

    scala> lineds.show
    +---+--------+---+----+
    | _1| _2| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+


    scala> val personDF= lineds.withColumnRenamed("_1","id").withColumnRenamed("_2","name")
    personDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

    scala> personDF.show
    +---+--------+---+----+
    | id| name| _3| _4|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+


    scala> personDF.sort($"id" desc).show
    warning: there was one feature warning; re-run with -feature for details
    +---+--------+---+----+
    | id| name| _3| _4|
    +---+--------+---+----+
    | 7| llihmj| 23| 565|
    | 6| limi|522| 65|
    | 5|zhangsan| 20| 565|
    | 4| laoliu|522| 30|
    | 2|zhangsan| 50| 866|
    | 1| xiliu| 50|6998|
    +---+--------+---+----+


    scala> val lines=spsession.read.textFile("/tmp/person.txt")
    lines: org.apache.spark.sql.Dataset[String] = [value: string]

    scala> lines.map(x=>{val arr= x.split(",");(arr(0),arr(1),arr(2),arr(3))}).toDF("id","name","age","fv").show
    +---+--------+---+----+
    | id| name|age| fv|
    +---+--------+---+----+
    | 2|zhangsan| 50| 866|
    | 4| laoliu|522| 30|
    | 5|zhangsan| 20| 565|
    | 6| limi|522| 65|
    | 1| xiliu| 50|6998|
    | 7| llihmj| 23| 565|
    +---+--------+---+----+

  • 相关阅读:
    November 13th 2016 Week 47th Sunday The 1st Day
    November 12th 2016 Week 46th Saturday
    November 11th 2016 Week 46th Friday
    November 10th 2016 Week 46th Thursday
    November 9th 2016 Week 46th Wednesday
    November 8th 2016 Week 46th Tuesday
    windows 7文件共享方法
    Win7无线网络共享设置方法
    常量指针和指针常量
    如何查找局域网的外网ip
  • 原文地址:https://www.cnblogs.com/playforever/p/8144312.html
Copyright © 2011-2022 走看看