zoukankan      html  css  js  c++  java
  • sparkStreaming插入mysql 必须考虑到实时更新老的key

    原先使用批次提交更新 但数据库无变化,不得不一条一条的插入 公司数据量不大  还未做数据量大的测试 但实时更新是可以的

    关键sql :

    insert into area_user_amt (date,country,provence,amt)  values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}进行老的主键key的实时更新
    areaStartAmt.foreachRDD(rdd => {
          rdd.foreachPartition(partitionOfRecords => {
            if (partitionOfRecords.isEmpty) {
              println("This RDD is not null but partition is null")
            } else {
    
              //          Class.forName("com.mysql.jdbc.Driver")
              //          var connection: Connection = null
              //          try {
              //            connection = DriverManager.getConnection(url, username, password)
              //            connection.setAutoCommit(false)
              //            val time = System.currentTimeMillis() / 1000
              //            //  val sql = "insert into test (bc_person,amt)  values(?,?) ON DUPLICATE KEY UPDATE `amt`= ?"
              //            val sql1 = "insert into area_user_amt (date,country,provence,amt)  values(?,?,?,?) ON DUPLICATE KEY UPDATE `amt`= ?"
              //            //  val sql3 = "insert into area_user_amt (date,country,provence,amt)  values(?,?,?,?) "
              //            val pstmt = connection.prepareStatement(sql1)
              //            var count = 0
              //            partitionOfRecords.foreach(record => {
              //              //              pstmt.setString(1, record._1)
              //              //              pstmt.setInt(2, record._2.toInt)
              //              //              pstmt.setInt(3,  record._2.toInt)
              //              val info = record._1.split("_")
              //              //  if(info.length==3){
              //              pstmt.setString(1, info(2))
              //              pstmt.setString(2, info(0))
              //              pstmt.setString(3, info(1))
              //              pstmt.setInt(4, record._2.toInt)
              //              pstmt.setInt(5, record._2.toInt)
              //              pstmt.addBatch()
              //              count += 1
              //              if (count % 500 == 0) {
              //                pstmt.executeBatch()
              //                connection.commit()
              //              }
              //            })
              //            pstmt.execute()
              //            connection.commit()
              //          } finally {
              //            if (connection != null)
              //              connection.close()
              //          }
    
              val connection = DriverManager.getConnection(url, username, password)
              partitionOfRecords.foreach(record => {
                var datekey = record._1.split("_")(2)
                var countrykey = record._1.split("_")(0)
                var provencekey = record._1.split("_")(1)
                var amt = record._2
                val sql1 = s"insert into area_user_amt (date,country,provence,amt)  values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}'"
                // val sql = s"select * from area_user_amt where date='${datekey}' and country='${countrykey}' and provence='${provencekey}'"
                val stmt = connection.createStatement()
                val code = stmt.executeUpdate(sql1)
                //返回值
                if (code < 0) {
                  println("更新失败")
                }
                else {
    //              println("更新成功")
             }
    

      

  • 相关阅读:
    设计模式 享元模式(池化技术)
    设计模式 混合模式(整体部分模式)
    设计模式 适配器模式
    Flex3示例、 安装 、注册码
    VS2010错误
    转载:glut.h 与 stdlib.h中 的exit()重定义问题的解决
    宿迁软件QQ群(109233721)
    百度地图 开发API接口啦
    Sublime Text 插件个人使用总结&推荐
    sublime text2 使用安装插件中文乱码问题解决
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10283922.html
Copyright © 2011-2022 走看看