方案一:
1 //overwrite JdbcDialect fitting for Oracle 2 val OracleDialect = new JdbcDialect { 3 override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") 4 5 //getJDBCType is used when writing to a JDBC table 6 override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { 7 case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) 8 case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) 9 case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) 10 case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC)) 11 case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) 12 case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC)) 13 case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) 14 case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) 15 case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) 16 case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) 17 case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) 18 // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) 19 case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,2)", java.sql.Types.NUMERIC)) 20 case _ => None 21 } 22 } 23 //Registering the OracleDialect 24 JdbcDialects.registerDialect(OracleDialect) 25 26 val connectProperties = new java.util.Properties() 27 connectProperties.put("user", username) 28 connectProperties.put("password", password) 29 Class.forName(driver).newInstance() 30 31 //write back Oracle 32 //Note: When writing the results back orale, be sure that the target table existing 33 JdbcUtils.saveTable(mr_case_df, oracleDriverUrl, "MR", connectProperties)
方案二:
val test_df=hiveContext.sql("select * from test") test_df.foreachPartition(rows => { Class.forName(driver) val connection: Connection = DriverManager.getConnection(oracleDriverUrl, username, password) val prepareStatement: PreparedStatement = connection.prepareStatement("insert into RES_CELL(City,Latiude,longitude)values(?,?,?);") rows.foreach(row => { prepareStatement.setString(1, row.getAs[String]("city")) prepareStatement.setString(2, row.getAs[String]("latitude")) prepareStatement.setString(3, row.getAs[String]("longitude")) prepareStatement.addBatch() }) prepareStatement.executeBatch() prepareStatement.close() connection.close() })
方案三:
使用sqlloader从spark任务提交节点读取文件导入到oracle。
为什么操作,原因直接从spark中读取hive中的数据使用网络IO连接到集群外的oracle服务器是spark集群不乐意做的事情,对SPARK宝贵的网络IO来说开销特别大。