zoukankan      html  css  js  c++  java
  • SPark SQL 从 DB 读取数据方法和方式 scala

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import java.util.HashMap
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.types.DataTypes
    import java.util.ArrayList
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.DataFrame
    import com.mysql.jdbc.Connection
    import com.mysql.jdbc.Statement
    import java.sql.DriverManager
    
    /**
     * @author three
     */
    object JDBCDataSource{
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("JDBCDataSource").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        var options = new HashMap[String, String]();
        options.put("url", "jdbc:mysql://192.168.5.111:3306/testdb");
        options.put("user", "spark");
        options.put("password", "spark2016");
    
        options.put("dbtable", "student_info");
        var studentInfosDF = sqlContext.read.format("jdbc").options(options).load()
    
        options.put("dbtable", "student_score");
        var studentScoresDF = sqlContext.read.format("jdbc").options(options).load()
    
        // 将两个DataFrame转换成JavePairRDD,进行join操作  //需要有相同的key 做join
        var rdd1 = studentInfosDF.map(x => (x.getString(0), x.getInt(1)))
        var rdd2 = studentScoresDF.map(x => (x.getString(0), x.getInt(1)))
        var studentsRDD = rdd1.join(rdd2)
    
        // 将JavaPairRDD转换为JavaRDD<Row>
        var studentRowsRDD = studentsRDD.map(x => (x._1, x._2._1.toInt, x._2._2.toInt))
        var filteredStudentRowsRDD = studentRowsRDD.filter(_._3 > 80).map(x => (Row(x._1, x._2.toInt, x._3.toInt)))
    
        // 继续转换为DataFrame
        var structFields = new ArrayList[StructField]();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
    
        var structType = DataTypes.createStructType(structFields);
        var studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
    
        var rows = studentsDF.collect();
        for (row <- rows) {
          println(row);
        }
    
        // 将DataFrame数据保存到MySQL表中
        studentsDF.foreach { row =>
          {
            var sql = "insert into good_student_info values(".+("'").+(row.getString(0)).+("',").+(row.getInt(1)).+(",").+(row.getInt(2)).+(")")
            //println(sql)
            Class.forName("com.mysql.jdbc.Driver");
            var conn = DriverManager.getConnection("jdbc:mysql://192.168.5.111:3306/testdb", "spark", "spark2016");
            var stat = conn.createStatement();
            stat.executeUpdate(sql);
    
            if (stat != null) {
              stat.close();
            }
            if (conn != null) {
              conn.close();
            }
          }
        }
      }
    }

  • 相关阅读:
    环境配置文件 ① /etc/profile、② ~/.bash_profile、③ ~/.bashrc、④ /etc/bashrc
    RHEL 7.0已发布CentOS 7即将到来
    《上海交通大学饮水思源paper(论文)板实用手册(第二版)》出炉
    SCI论文投稿Cover Letter的写作
    grub.cfg —— Window、Fedora、CentOS
    SCI新手成长策略
    计算机类SCI前三区期刊
    SCI期刊——导航
    SCI收录的外文期刊(计算机类)
    《嵌入式开发》——三次作业
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501296.html
Copyright © 2011-2022 走看看