zoukankan      html  css  js  c++  java
  • Spark 之 读取配置连接Mysql和上传到HDFS

    一、读取配置

    driver.properties

    #mysql
    driver=com.mysql.jdbc.Driver
    url=jdbc:mysql://192.168.56.111:3306/myshops2
    user=root
    password=root
    
    #hadoop
    hadoop_url=hdfs://192.168.56.111:9000
    package com.njbdqn.util
    
    import java.io.FileInputStream
    import java.util.Properties
    
    object ReadPropertiesFileTool {
      def readProperties(flag:String): Map[String,String] ={
        val prop = new Properties()
        prop.load(new FileInputStream
          (ReadPropertiesFileTool.getClass.getClassLoader.getResource("driver.properties").getPath))
        var map:Map[String,String] = Map.empty
        if(flag.equalsIgnoreCase("mysql")){
          map+=("driver"->prop.getProperty("driver"))
          map+=("url"->prop.getProperty("url"))
          map+=("user"->prop.getProperty("user"))
          map+=("password"->prop.getProperty("password"))
        }else{
          map+=("hadoop_url"->prop.getProperty("hadoop_url"))
        }
        map
      }
    
    }

    二、读取resource中配置,操作Mysql

    package com.njbdqn.util
    import java.util.Properties
    
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    
    object MYSQLConnection {
    
      val paramMap = ReadPropertiesFileTool.readProperties("mysql")
    
        // 读取数据库中指定的表
        def readMySql(spark:SparkSession,tableName:String): DataFrame ={
          val map:Map[String,String] = Map(
            "driver"->paramMap("driver"),
            "url"->paramMap("url"),
            "user"->paramMap("user"),
            "password"->paramMap("password"),
            "dbtable"->tableName
          )
          spark.read.format("jdbc").options(map) // Adds input options for the underlying data source
            .load()
        }
    
      // 将df写入数据库到指定的表
      def writeTable(spark:SparkSession,df:DataFrame,tableName:String): Unit ={
        val prop = new Properties()
        prop.put("user","root")
        prop.put("password","root")
        df.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.56.111:3306/myshops2",tableName,prop)
      }
    
    }

    三、上传/下载HDFS

    package com.njbdqn.util
    
    import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    
    /**
     * HDFS操作
     */
    object HDFSConnection {
    
      val paramMap = ReadPropertiesFileTool.readProperties("hadoop")
    
      /**
       * 将数据写入到hdfs
       */
      def writeDataToHDFS(path:String,df:DataFrame): Unit ={
        df.write.mode(SaveMode.Overwrite).save(paramMap("hadoop_url")+path)
      }
    
      /**
       * 从hdfs的指定位置读到内存中
       */
      def readDataToHDFS(spark:SparkSession,path:String): DataFrame ={
        spark.read.parquet(paramMap("hadoop_url")+path)
      }
      /**
       * 从hdfs读取LR
       */
      def readLRModelToHDFS(path:String): LogisticRegressionModel ={
        LogisticRegressionModel.read.load(paramMap("hadoop_url")+path)
      }
    
      /**
       *  LR模型写入HDFS
       */
      def writeLRModelToHDFS(lr:LogisticRegressionModel,path:String): Unit ={
        lr.save(paramMap("hadoop_url")+path)
      }
    
    }
  • 相关阅读:
    Sqoop架构(四)
    为什么选择Sqoop?(三)
    Sqoop 是什么?(二)
    Sqoop 产生背景(一)
    Ambari是啥?主要是干啥的?
    Ambari架构及安装
    Hadoop Hive概念学习系列之HDFS、Hive、MySQL、Sqoop之间的数据导入导出(强烈建议去看)(十八)
    Effective C++ -- 构造析构赋值运算
    pcduino v2安装opencv2.4.8
    Struts2 拦截器具体配置过程
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13874061.html
Copyright © 2011-2022 走看看