zoukankan      html  css  js  c++  java
  • scala spark2.0 sparksql 连接mysql8.0 操作多表 使用 dataframe 及RDD进行数据处理

    1、配置文件

    package config
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    case object conf {
       private val master = "local[*]"
       val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")
    //   val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs")
       val sc = new SparkContext(confs)
       sc.setLogLevel("ERROR")
       val spark_session: SparkSession = SparkSession.builder()
        .appName("jobs").config(confs).getOrCreate()
    
    //   设置支持笛卡尔积 对于spark2.0来说
       spark_session.conf.set("spark.sql.crossJoin.enabled",true)
    }
    

      

    2、连接mysql8.0 操作多表

    package operationMysql
    import config.conf.{sc, spark_session}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row}
    import config.conf.spark_session.implicits._
    object readingMysqlOperation {
      def main(args: Array[String]): Unit = {
    
        /*
    
        val df: DataFrame = spark_session.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/junsheng?useUnicode=true&characterEncoding=utf-8")
          .option("dbtable", "订单")
          .option("user", "root")
          .option("password", "123456")
          .load()
        df.show()
        * */
    
        //以jdbc方式连接mysql
        val url="jdbc:mysql://localhost:3306/junsheng?useUnicode=true&characterEncoding=utf-8" //&useSSL=true
        //设置用户名和密码信息
        val prop = new java.util.Properties
        prop.setProperty("user","root")
        prop.setProperty("password","123456")
        //创建sqlContext对象
    
        //读取dat_order_item表
        val df1: DataFrame = spark_session.read.jdbc(url,"订单明细","订单ID",0,5000000,4,prop)
        val df2: DataFrame = spark_session.read.jdbc(url, "订单", "订单ID", 0, 5000000,4,prop)
        //读取dat_order表
    
        //将dat_order_item和dat_order DF注册成spark临时表
        df1.createOrReplaceTempView("data1")
        df2.createOrReplaceTempView("data2")
        //使用sqlContext.sql("XXX")方式执行查询语句
    //    df2.show()
        val ywSQL:String= "SELECT dt1.`订单ID`,dt2.`客户ID`,dt1.`产品ID`,dt1.`单价`,dt1.`数量` " +
      "FROM data1 AS dt1 LEFT JOIN  data2 as dt2 ON dt1.`订单ID`=dt2.`订单ID`"
        val df: DataFrame = spark_session.sql(ywSQL)
        df.rdd.map(lines=>{(lines(0).toString,lines(2).toString.toDouble,lines(4).toString.toInt)})
          .toDF("订单ID","产品单价","订购数量").show()
    
    
    
      }
    }
    

      

  • 相关阅读:
    Win10家庭版无法远程桌面连接的解决方法
    分分钟用上C#中的委托和事件之窗体篇
    分分钟用上C#中的委托和事件
    Fiddler工具使用介绍三
    Fiddler工具使用介绍二
    Fiddler工具使用介绍一
    Hexo主题开发
    IDEA 编译 Jmeter 4.0 ( 二次开发_1 )
    Pinpoint 安装部署
    接口测试总结
  • 原文地址:https://www.cnblogs.com/wuzaipei/p/12624104.html
Copyright © 2011-2022 走看看