zoukankan      html  css  js  c++  java
  • Spark 学习(九) SparkSQL 函数自定义和数据源

    一,简介

    二,SparkSQL 的函数自定义

      2.1 函数定义

      2.2 函数注册

      2.3 示例

    三,spark的数据源读取

      3.1 JSON

      3.2 JDBC

      3.3 ParQuet

      3.4 CSV

    正文

    一,简介

      很多时候sql中的内置函数无法满足我们的日常开发需求,这就需要我们进行函数的自定义。同时Spark的数据源来源广泛,如JSON,MYSQL等都可以作为我们的数据源。

    二,SparkSQL 的函数自定义

      2.1 定义函数

    val fun1 = (arg: String) => {
        arg + "aaa"
    }

      2.2 注册函数

    spark.udf.register("addString", fun1)

      2.3 示例

    package cn.edu360.spark07
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    
    object AutoFun {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession.builder()
                .appName("sparkDateSet1")
                .master("local[2]")
                .getOrCreate()
            val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/")
            import spark.implicits._
            val words: Dataset[String] = lines.flatMap(_.split(" "))
            // 注册视图操作SQL形式
            words.createTempView("v_wc")
            // 定义函数
            val fun1 = (arg: String) => {
                arg + "aaa"
            }
            // 对函数进行注册
            spark.udf.register("addString", fun1)
            val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value")
            result.show()
            spark.stop()
        }
    }

    三,spark的数据源读取

      3.1 JSON

    object JsonDataSource {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("JdbcDataSource")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._
    
        //指定以后读取json类型的数据(有表头)
        val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json")
        val filtered: DataFrame = jsons.where($"age" <=500)
        filtered.printSchema()
        filtered.show()
        spark.stop()
        
      }
    }

      3.2 JDBC

    package cn.edu360.spark07
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object JDBCSource {
        def main(args: Array[String]): Unit = {
    
            val spark = SparkSession.builder().appName("JdbcDataSource")
                .master("local[*]")
                .getOrCreate()
            import spark.implicits._
            val log: DataFrame = spark.read.format("jdbc").options(
                Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true",
                    "driver" -> "com.mysql.jdbc.Driver",
                    "dbtable" -> "log",
                    "user" -> "root",
                    "password" -> "qwe123"
                )
            ).load()
            val result: DataFrame = log.select($"id", $"name", $"age")
            result.show()
            spark.stop()
        }
    }

      3.3  ParQuet

    package cn.edu360.day7
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * Created by zx on 2017/9/18.
      */
    object ParquetDataSource {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("ParquetDataSource")
          .master("local[*]")
          .getOrCreate()
        //指定以后读取json类型的数据
        val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet")
        //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq")
        parquetLine.printSchema()
        //show是Action
        parquetLine.show()
        spark.stop()
      }
    }

      3.4 CSV

    package cn.edu360.day7
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * Created by zx on 2017/9/18.
      */
    object CsvDataSource {
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("CsvDataSource")
          .master("local[*]")
          .getOrCreate()
        //指定以后读取json类型的数据
        val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv")
        csv.printSchema()
        val pdf: DataFrame = csv.toDF("id", "name", "age")
        pdf.show()
        spark.stop()
        
      }
    }
  • 相关阅读:
    ASP.NET 错误
    linux下使用蓝牙设备【转】
    AIDL Android中的远程接口 [转]
    Handler理解
    Hid Report Descriptor
    Android kernel x86 编译方法
    Android Init Language
    DBUS 资源
    Analysing Bluetooth Keyboard Traffic with hcidump
    DBUS基础知识
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10999613.html
Copyright © 2011-2022 走看看