zoukankan      html  css  js  c++  java
  • sparksql系列(四) sparksql 操作数据库

    一:SparkSql操作mysql

    老规矩:先抽出来公共的方法:

    import java.util.Arrays

    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
    import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
    import org.apache.spark.sql.functions.concat
    import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.sql.SaveMode
    import java.util.ArrayList


    object WordCount {

      def dataAndJdbcoption() = {  

        val sparkSession= SparkSession.builder().master("local").getOrCreate()
        val javasc = new JavaSparkContext(sparkSession.sparkContext)

        val nameRDD1 = javasc.parallelize(Arrays.asList("{'id':'7'}","{'id':'8'}","{'id':'9'}"));
        val nameRDD1df = sparkSession.read.json(nameRDD1)

        val prop = new java.util.Properties
        prop.setProperty("user","root")
        prop.setProperty("password","123456")
        prop.setProperty("driver","com.mysql.jdbc.Driver")
        prop.setProperty("dbtable","blog")
        prop.setProperty("url","jdbc:mysql://127.0.0.1:3306/test")

        (nameRDD1df,prop)

      }

    }

    读mysql

        val df = dataAndJdbcoption()._1
        val prop = dataAndJdbcoption()._2

        val sparkSession= SparkSession.builder().master("local").getOrCreate()
        val data = sparkSession.read.format("jdbc").option("user","root").option("password","123456")
          .option("driver","com.mysql.jdbc.Driver")
    .      option("url","jdbc:mysql://127.0.0.1:3306/test").option("dbtable", "blog")
          .load()
        data.show(100)

    写mysql

      val df = dataAndJdbcoption()._1
      val prop = dataAndJdbcoption()._2
      df.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), prop.getProperty("dbtable"), prop)

    二:SparkSql操作Hive

    公司读Hive数据

                         其实是读Hive表的location的文件,生成最终的文件。

    公司写Hive数据

                         生成文件后将数据load进Hive

    直接使用Sql操作Hive的数据       

        val conf = new SparkConf().setAppName("WordCount")
        //合并小文件,sparksql默认有200个task执行文件,会生成很多小文件。其实有很多参数可以优化详见sparkSession.sql("SET -v")

        conf.set("mapreduce.input.fileinputformat.split.minsize","1024000000")
        conf.set("mapreduce.input.fileinputformat.split.maxsize","1024000000")
        conf.set("mapreduce.input.fileinputformat.split.minsize.per.node","1024000000")
        conf.set("mapreduce.input.fileinputformat.split.maxsize.per.node","1024000000")
        conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack","1024000000")
        conf.set("mapreduce.input.fileinputformat.split.maxsize.per.rack","1024000000")
        val sparkSession= SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()

          sparkSession.sql("insert into table table1 select aa from sparksqlTempTable")

        除了上述方法可以合并文件之外,还有一种方法可以合并文件:

        val dataFrame = sparkSession.sql("select aa from table ").coalesce(3);//日志看task数量3
        dataFrame.createOrReplaceTempView("sparksqlTempTable")
        sparkSession.sql("insert into table table1 select aa from sparksqlTempTable")

        但是这种方法并不实用,因为大部分操作的Sql操作是需要insert。

        网上说还有第三种方法:

        即在Sql中插入一个REPARTITION(4),但是我在实验过程中并没有作用,可能这种语法只是针对于HiveSql本身,使用SparkSql并没有作用。

        栗子:select /*+ REPARTITION(4) */ aa from table 

  • 相关阅读:
    SQL 数据库中将某表中的一列数据拆分作为查询条件
    SQL数据库导入数据时提示未在本地计算机上注册“Microsoft.ACE.OLEDB.12.0”提供程序。 (System.Data)
    SQL常用内置函数
    SQL常用语句
    关于网页中鼠标双击文字选中设置
    SQL数据库查询列的类型及长度
    ASP. NET MVC项目 使用iTextSharp将网页代码生成PDF文件
    eslint-config-airbnb vs prettier vs standard
    windows批处理(bat脚本)
    python日志库loguru
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/11707341.html
Copyright © 2011-2022 走看看