zoukankan      html  css  js  c++  java
  • Spark 学习(九) SparkSQL 函数自定义和数据源

    一,简介

    二,SparkSQL 的函数自定义

      2.1 函数定义

      2.2 函数注册

      2.3 示例

    三,spark的数据源读取

      3.1 JSON

      3.2 JDBC

      3.3 ParQuet

      3.4 CSV

    正文

    一,简介

      很多时候sql中的内置函数无法满足我们的日常开发需求,这就需要我们进行函数的自定义。同时Spark的数据源来源广泛,如JSON,MYSQL等都可以作为我们的数据源。

    二,SparkSQL 的函数自定义

      2.1 定义函数

    val fun1 = (arg: String) => {
        arg + "aaa"
    }

      2.2 注册函数

    spark.udf.register("addString", fun1)

      2.3 示例

    package cn.edu360.spark07
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    
    object AutoFun {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession.builder()
                .appName("sparkDateSet1")
                .master("local[2]")
                .getOrCreate()
            val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/")
            import spark.implicits._
            val words: Dataset[String] = lines.flatMap(_.split(" "))
            // 注册视图操作SQL形式
            words.createTempView("v_wc")
            // 定义函数
            val fun1 = (arg: String) => {
                arg + "aaa"
            }
            // 对函数进行注册
            spark.udf.register("addString", fun1)
            val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value")
            result.show()
            spark.stop()
        }
    }

    三,spark的数据源读取

      3.1 JSON

    object JsonDataSource {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("JdbcDataSource")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._
    
        //指定以后读取json类型的数据(有表头)
        val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json")
        val filtered: DataFrame = jsons.where($"age" <=500)
        filtered.printSchema()
        filtered.show()
        spark.stop()
        
      }
    }

      3.2 JDBC

    package cn.edu360.spark07
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object JDBCSource {
        def main(args: Array[String]): Unit = {
    
            val spark = SparkSession.builder().appName("JdbcDataSource")
                .master("local[*]")
                .getOrCreate()
            import spark.implicits._
            val log: DataFrame = spark.read.format("jdbc").options(
                Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true",
                    "driver" -> "com.mysql.jdbc.Driver",
                    "dbtable" -> "log",
                    "user" -> "root",
                    "password" -> "qwe123"
                )
            ).load()
            val result: DataFrame = log.select($"id", $"name", $"age")
            result.show()
            spark.stop()
        }
    }

      3.3  ParQuet

    package cn.edu360.day7
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * Created by zx on 2017/9/18.
      */
    object ParquetDataSource {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("ParquetDataSource")
          .master("local[*]")
          .getOrCreate()
        //指定以后读取json类型的数据
        val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet")
        //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq")
        parquetLine.printSchema()
        //show是Action
        parquetLine.show()
        spark.stop()
      }
    }

      3.4 CSV

    package cn.edu360.day7
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * Created by zx on 2017/9/18.
      */
    object CsvDataSource {
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("CsvDataSource")
          .master("local[*]")
          .getOrCreate()
        //指定以后读取json类型的数据
        val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv")
        csv.printSchema()
        val pdf: DataFrame = csv.toDF("id", "name", "age")
        pdf.show()
        spark.stop()
        
      }
    }
  • 相关阅读:
    C++STL中的unique函数解析
    STL中erase()的用法
    刷题技巧——简易哈希表的实现
    经典面试题目——找到第n个丑数(参考《剑指offer(第二版)》面试题49)
    C++中sort函数小结
    谈谈交叉验证法(个人小结)
    数字序列中某一位数字(《剑指offer》面试题44)
    求1~n整数中1出现的次数(《剑指offer》面试题43)
    2018年美团春招(第二批)题解
    C/C++中字符串和数字互转小结
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10999613.html
Copyright © 2011-2022 走看看