方案一:
1 //overwrite JdbcDialect fitting for Oracle 2 val OracleDialect = new JdbcDialect { 3 override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") 4 5 //getJDBCType is used when writing to a JDBC table 6 override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { 7 case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) 8 case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) 9 case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) 10 case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) 11 case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) 12 case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) 13 case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) 14 case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) 15 case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) 16 case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) 17 case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) 18 // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) 19 case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,2)", java.sql.Types.NUMERIC)) 20 case _ => None 21 } 22 } 23 //Registering the OracleDialect 24 JdbcDialects.registerDialect(OracleDialect) 25 26 val connectProperties = new java.util.Properties() 27 connectProperties.put("user", username) 28 connectProperties.put("password", password) 29 Class.forName(driver).newInstance() 30 31 //write back Oracle 32 //Note: When writing the results back orale, be sure that the target table existing 33 JdbcUtils.saveTable(mr_case_df, oracleDriverUrl, "MR", connectProperties)
方案二:
val test_df=hiveContext.sql("select * from test")
test_df.foreachPartition(rows => {
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(oracleDriverUrl, username, password)
val prepareStatement: PreparedStatement = connection.prepareStatement("insert into RES_CELL(City,Latiude,longitude)values(?,?,?);")
rows.foreach(row => {
prepareStatement.setString(1, row.getAs[String]("city"))
prepareStatement.setString(2, row.getAs[String]("latitude"))
prepareStatement.setString(3, row.getAs[String]("longitude"))
prepareStatement.addBatch()
})
prepareStatement.executeBatch()
prepareStatement.close()
connection.close()
})
方案三:
使用sqlloader从spark任务提交节点读取文件导入到oracle。
为什么操作,原因直接从spark中读取hive中的数据使用网络IO连接到集群外的oracle服务器是spark集群不乐意做的事情,对SPARK宝贵的网络IO来说开销特别大。