原先使用批次提交更新 但数据库无变化,不得不一条一条的插入 公司数据量不大 还未做数据量大的测试 但实时更新是可以的
关键sql :
insert into area_user_amt (date,country,provence,amt) values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}进行老的主键key的实时更新
areaStartAmt.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.isEmpty) {
println("This RDD is not null but partition is null")
} else {
// Class.forName("com.mysql.jdbc.Driver")
// var connection: Connection = null
// try {
// connection = DriverManager.getConnection(url, username, password)
// connection.setAutoCommit(false)
// val time = System.currentTimeMillis() / 1000
// // val sql = "insert into test (bc_person,amt) values(?,?) ON DUPLICATE KEY UPDATE `amt`= ?"
// val sql1 = "insert into area_user_amt (date,country,provence,amt) values(?,?,?,?) ON DUPLICATE KEY UPDATE `amt`= ?"
// // val sql3 = "insert into area_user_amt (date,country,provence,amt) values(?,?,?,?) "
// val pstmt = connection.prepareStatement(sql1)
// var count = 0
// partitionOfRecords.foreach(record => {
// // pstmt.setString(1, record._1)
// // pstmt.setInt(2, record._2.toInt)
// // pstmt.setInt(3, record._2.toInt)
// val info = record._1.split("_")
// // if(info.length==3){
// pstmt.setString(1, info(2))
// pstmt.setString(2, info(0))
// pstmt.setString(3, info(1))
// pstmt.setInt(4, record._2.toInt)
// pstmt.setInt(5, record._2.toInt)
// pstmt.addBatch()
// count += 1
// if (count % 500 == 0) {
// pstmt.executeBatch()
// connection.commit()
// }
// })
// pstmt.execute()
// connection.commit()
// } finally {
// if (connection != null)
// connection.close()
// }
val connection = DriverManager.getConnection(url, username, password)
partitionOfRecords.foreach(record => {
var datekey = record._1.split("_")(2)
var countrykey = record._1.split("_")(0)
var provencekey = record._1.split("_")(1)
var amt = record._2
val sql1 = s"insert into area_user_amt (date,country,provence,amt) values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}'"
// val sql = s"select * from area_user_amt where date='${datekey}' and country='${countrykey}' and provence='${provencekey}'"
val stmt = connection.createStatement()
val code = stmt.executeUpdate(sql1)
//返回值
if (code < 0) {
println("更新失败")
}
else {
// println("更新成功")
}