zoukankan      html  css  js  c++  java
  • Spark之 使用SparkSql操作mysql和DataFrame的Scala实现

    通过读取文件转换成DataFrame数据写入到mysql中

    package com.zy.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
    
    /**
      * 通过读取文件转换成DataFrame数据写入到mysql中
      */
    object SparkSqlToMysql {
      def main(args: Array[String]): Unit = {
        //创建sparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local").getOrCreate()
        //读取数据
        val sc: SparkContext = sparkSession.sparkContext
        val fileRDD: RDD[String] = sc.textFile("D:\person.txt")
        //切分
        val lineRDD: RDD[Array[String]] = fileRDD.map(_.split(","))
    
        //关联  通过StructType指定schema将rdd转换成DataFrame
        val rowRDD: RDD[Row] = lineRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
        val schema = (new StructType).add("id", IntegerType, true).add("name", StringType, true).add("age", IntegerType, true)
        //根据rdd和schema创建DataFrame
        val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
    
        //将df注册成表
        personDF.createOrReplaceTempView("person")
    
        //操作表
        val resultDF: DataFrame = sparkSession.sql("select * from person order by age desc")
    
        //将数据存到mysql中
        //创建properties对象 设置连接mysql的信息
        val prop: Properties = new Properties()
        prop.setProperty("user", "root")
        prop.setProperty("password", "root")
    
        /** mode方法可以指定数据插入模式
          * overwrite:覆盖,覆盖表中已经存在的数据,如果表不存在它会事先帮你创建
          * append:追加,向表中追加数据,如果表不存在它会事先帮你创建
          * ignore:忽略,表示如果表事先存在,就不进行任何操作
          * error :如果表存在就报错,它是默认选项
          */
        resultDF.write.mode("error").jdbc("jdbc:mysql://192.168.44.31:3306/spark", "person", prop)
    
        sparkSession.stop()
      }
    }

    从mysql中读取数据到DataFrame中

    package com.zy.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 从mysql中读取数据到DataFrame中
      */
    object DataFromMysql {
      def main(args: Array[String]): Unit = {
        //创建sparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local").getOrCreate()
        //创建properties对象 设置连接mysql的信息
        val prop: Properties = new Properties()
        prop.setProperty("user", "root")
        prop.setProperty("password", "root")
    
        //读取mysql数据
        val mysqlDF: DataFrame = sparkSession.read.jdbc("jdbc:mysql://192.168.44.31:3306/spark", "person", prop)
        mysqlDF.show()
    
        sparkSession.stop()
      }
    }
  • 相关阅读:
    20170417列表的count计数、index、reverse、sort函数
    (一)grpc-创建一个简单的grpc 客户端和服务器
    通用装饰器
    Git学习(一):Git介绍、仓库和分支等基本概念解释
    APP测试
    接口测试用例设计
    笔记整理
    接口测试
    gzip -压缩与解压缩
    declare 命令 -声明shell 变量
  • 原文地址:https://www.cnblogs.com/blazeZzz/p/9851154.html
Copyright © 2011-2022 走看看