zoukankan      html  css  js  c++  java
  • spark-sql-02

      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
        val session: SparkSession = SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
        val sc: SparkContext = session.sparkContext
        sc.setLogLevel("ERROR")
    
        // dataFrame = 数据 + 元数据
        //Spark 的Dataset  既可以按collection,类似于rdd的方法操作,也可以按SQL领域语言定义的方式操作数据
        val dataLists: RDD[String] = sc.textFile("data/person.txt")  //这里的RDD类型影响下面数据的类型,也可以返回DataSet
        val rddRow = dataLists
          .map(_.split(" ")).map(arr => Row.apply(arr(0), arr(1).toInt))
        
        val fields = Array(
          StructField.apply("name", DataTypes.StringType, true),
          StructField.apply("age", DataTypes.IntegerType, true)
        
        val schema = StructType.apply(fields)
        val dataFrame = session.createDataFrame(rddRow, schema)
    
        dataFrame.show()

    +--------+---+
    | name|age|
    +--------+---+
    |zhangsan| 18|
    | lisi| 22|
    | wangwu| 99|
    | xiaoke| 22|
    +--------+---+

    
    
        dataFrame.printSchema()

    root
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)

    
    
        dataFrame.createTempView("person") //设置表名
    
        session.sql("select * from person where name = 'xiaoke'").show()

    +------+---+
    | name|age|
    +------+---+
    |xiaoke| 22|
    +------+---+

    动态获取类型

       val userSchema = Array(
          "name string",
          "hobby string",
          "sex int"
        )
    
        // info = (zhangsan, 0)
        //  info = (18, 2)
        // 注意是Any类型
        def toDataType1(info: (String, Int)):Any ={
          userSchema(info._2).split(" ")(1)
          match{
            case "string" => info._1.toString
            case "int" => info._1.toInt
          }
        }
    
        // zhangsan lanqiu 18
        val rowRdd = dataLists.map(_.split(" "))
          .map(
          // [(zhangsan, 0), (lanqiu, 1), (18, 2)]
          x => x.zipWithIndex)
          .map(x => x.map(toDataType1(_)))
          .map(x => Row.fromSeq(x))
    
        // 1.RDD 2.structtype
        def getDataType2(v: String) ={
          v match {
            case "string" => DataTypes.StringType
            case "int" => DataTypes.IntegerType
          }
        }
        val fields = userSchema.map(_.split(" ")).map(x => StructField.apply(x(0), getDataType2(x(1))))
        val scheme = StructType.apply(fields)
        val dataFrame = session.createDataFrame(rowRdd, scheme)
        dataFrame.show()

    +--------+-------+---+
    | name| hobby|sex|
    +--------+-------+---+
    |zhangsan| PB| 18|
    | lisi|xiangqi| 22|
    | wangwu| lanqiu| 99|
    | xiaoke| wan| 22|
    +--------+-------+---+

    
        dataFrame.printSchema()

    root
    |-- name: string (nullable = true)
    |-- hobby: string (nullable = true)
    |-- sex: integer (nullable = true)

     

    使用实体类进行映射类型

    class Person  extends  Serializable {
      @BeanProperty
      var name :String = ""
      @BeanProperty
      var age:Int  =  0
    }
    
    
        val rdd: RDD[String] = sc.textFile("data/person.txt")
        // 需要序列化
        val p = new Person
            val rddBean: RDD[Person] = rdd.map(_.split(" "))
              .map(arr => {
        //        val p = new Person
                p.setName(arr(0))
                p.setAge(arr(2).toInt)
                p
              })
        val df = session.createDataFrame(rddBean, classOf[Person])
        df.show()
        df.printSchema()
    +---+--------+
    |age|    name|
    +---+--------+
    | 18|zhangsan|
    | 22|    lisi|
    | 99|  wangwu|
    | 22|  xiaoke|
    +---+--------+
    
    root
     |-- age: integer (nullable = false)
     |-- name: string (nullable = true)
        // dataFrame操作的是RDD, Dataset操作的是sql
            val ds01: Dataset[String] = session.read.textFile("data/person.txt")
            val person: Dataset[(String, Int)] = ds01.map(
              line => {
                val strs: Array[String] = line.split(" ")
                (strs(0), strs(2).toInt)
              }
    // Encoders 相当于 import  session.implicits._ session隐式转换
            )(Encoders.tuple(Encoders.STRING, Encoders.scalaInt))
    
            val cperson: DataFrame = person.toDF("name","age")
            cperson.show()
            cperson.printSchema()

    // dataFrame操作API   Dataset操作sql

        import  session.implicits._
        val dataDF: DataFrame = List(
          "hello world",
          "hello world",
          "hello msb",
          "hello world",
          "hello world",
          "hello spark",
          "hello world",
          "hello spark"
        ).toDF("line")
    
        //设置表名
        dataDF.createTempView("ooxx")
        val df: DataFrame = session.sql("select * from ooxx")
        df.show()

    +-----------+
    | line|
    +-----------+
    |hello world|
    |hello world|
    | hello msb|
    |hello world|
    |hello world|
    |hello spark|
    |hello world|
    |hello spark|
    +-----------+

    
        df.printSchema()

    root
    |-- line: string (nullable = true)

    // 面向dataset操作
        session.sql(" select word, count(*) from   (select explode(split(line,' ')) as word from ooxx) as tt   group by tt.word  ").show()

    +-----+--------+
    | word|count(1)|
    +-----+--------+
    |hello| 8|
    |spark| 2|
    |world| 5|
    | msb| 1|
    +-----+--------+

    // 面向api的时候 df相当于from tab
        val res = dataDF.selectExpr("explode(split(line, ' ')) as word").groupBy("word").count()
    
        res.write.mode(SaveMode.Append).parquet("data/out/ooxx")
    
    
    
    
    
        val frame: DataFrame = session.read.parquet("data/out/ooxx")
        frame.show()

    +-----+-----+
    | word|count|
    +-----+-----+
    |hello| 8|
    |spark| 2|
    |world| 5|
    | msb| 1|
    +-----+-----+

    
        frame.printSchema()

    root
    |-- word: string (nullable = true)
    |-- count: long (nullable = true)



       /*
        基于文件的行式:
        session.read.parquet()
        session.read.textFile()
        session.read.json()
        session.read.csv()
        读取任何格式的数据源都要转换成DF
        res.write.parquet()
        res.write.orc()
        res.write.text()
        */
    
     
        基于文件的行式:
        session.read.parquet()
        session.read.textFile()
        session.read.json()
        session.read.csv()


        读取任何格式的数据源都要转换成DF
        res.write.parquet()
        res.write.orc()
        res.write.text()
     
  • 相关阅读:
    smary里Js正则表达式不正常
    php7下对微信支付退款申请通知的解密处理
    apache下设置deflate/gzip
    从大表里随机取若干行的效率问题
    Ecshop、Discuz! 等开源产品的局限
    mysql实现ORACLE的connect by prior父子递归查询
    SQL计算字符串里的子字符串出现个数
    UCENTER同步登录工作原理和配置要点
    云服务器:西部数码VS阿里云
    用ftp命令实现主机文件批量更新
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14490322.html
Copyright © 2011-2022 走看看