zoukankan      html  css  js  c++  java
  • spark mysql读写

    val data2Mysql2 = (iterator: Iterator[(String, Int)]) => {
        var conn: Connection = null;
        var ps: PreparedStatement = null
        val sql = "Insert into location_info(location,counts,accesse_date) values(?,?,?)"
    
        try {
          conn = DriverManager.getConnection("jdbc://localhist:3306/bigdata","root","root")
          //整个分区的数据用了一个conn
          iterator.foreach(line =>{
            ps = conn.prepareStatement(sql)
            ps.setString(1,line._1)
            ps.setInt(2,line._2)
            ps.setDate(3,new Date(System.currentTimeMillis()))
            ps.executeUpdate()
          })
    
        } catch {
          case e: Exception => println("Mysql Exception")
        } finally {
          if (ps != null) ps.close()
          if (conn != null) conn.close()
        }
    rddres2.foreachPartition(data2MySQL)
      def mysql2Spark(){
        val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val connection = () => {
          Class.forName("com.mysql.jdbc.Driver").newInstance()
          DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root")
        }
        val jdbcRDD = new JdbcRDD(
          sc,
          connection,
          //location_info(location,counts
          "SELECT id, location FROM location_info where id >= ? AND id <= ?",
          1, 4, 2,
          r => {
            val id = r.getInt(1)
            val code = r.getString(2)
            (id, code)
          }
        )
        val jrdd = jdbcRDD.collect()
        println(jdbcRDD.collect().toBuffer)
        sc.stop()
      }
  • 相关阅读:
    寒假学习第九天
    寒假学习第八天
    寒假学习第七天
    寒假学习第六天
    寒假学习第五天
    寒假学习第四天
    寒假学习第三天
    寒假学习第二天
    寒假学习第一天
    阅读笔记
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7297781.html
Copyright © 2011-2022 走看看