原先使用批次提交更新 但数据库无变化,不得不一条一条的插入 公司数据量不大 还未做数据量大的测试 但实时更新是可以的
关键sql :
insert into area_user_amt (date,country,provence,amt) values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}进行老的主键key的实时更新
areaStartAmt.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { if (partitionOfRecords.isEmpty) { println("This RDD is not null but partition is null") } else { // Class.forName("com.mysql.jdbc.Driver") // var connection: Connection = null // try { // connection = DriverManager.getConnection(url, username, password) // connection.setAutoCommit(false) // val time = System.currentTimeMillis() / 1000 // // val sql = "insert into test (bc_person,amt) values(?,?) ON DUPLICATE KEY UPDATE `amt`= ?" // val sql1 = "insert into area_user_amt (date,country,provence,amt) values(?,?,?,?) ON DUPLICATE KEY UPDATE `amt`= ?" // // val sql3 = "insert into area_user_amt (date,country,provence,amt) values(?,?,?,?) " // val pstmt = connection.prepareStatement(sql1) // var count = 0 // partitionOfRecords.foreach(record => { // // pstmt.setString(1, record._1) // // pstmt.setInt(2, record._2.toInt) // // pstmt.setInt(3, record._2.toInt) // val info = record._1.split("_") // // if(info.length==3){ // pstmt.setString(1, info(2)) // pstmt.setString(2, info(0)) // pstmt.setString(3, info(1)) // pstmt.setInt(4, record._2.toInt) // pstmt.setInt(5, record._2.toInt) // pstmt.addBatch() // count += 1 // if (count % 500 == 0) { // pstmt.executeBatch() // connection.commit() // } // }) // pstmt.execute() // connection.commit() // } finally { // if (connection != null) // connection.close() // } val connection = DriverManager.getConnection(url, username, password) partitionOfRecords.foreach(record => { var datekey = record._1.split("_")(2) var countrykey = record._1.split("_")(0) var provencekey = record._1.split("_")(1) var amt = record._2 val sql1 = s"insert into area_user_amt (date,country,provence,amt) values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}'" // val sql = s"select * from area_user_amt where date='${datekey}' and country='${countrykey}' and provence='${provencekey}'" val stmt = connection.createStatement() val code = stmt.executeUpdate(sql1) //返回值 if (code < 0) { println("更新失败") } else { // println("更新成功") }