zoukankan      html  css  js  c++  java
  • Spark:导入数据到oracle

    方案一:

     1     //overwrite JdbcDialect fitting for Oracle
     2     val OracleDialect = new JdbcDialect {
     3       override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
     4 
     5       //getJDBCType is used when writing to a JDBC table
     6       override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
     7         case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
     8         case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
     9         case IntegerType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
    10         case LongType => Some(JdbcType("NUMBER(16)", java.sql.Types.NUMERIC))
    11         case DoubleType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
    12         case FloatType => Some(JdbcType("NUMBER(16,4)", java.sql.Types.NUMERIC))
    13         case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
    14         case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
    15         case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
    16         case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    17         case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    18         //        case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    19         case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,2)", java.sql.Types.NUMERIC))
    20         case _ => None
    21       }
    22     }
    23     //Registering the OracleDialect
    24     JdbcDialects.registerDialect(OracleDialect)
    25 
    26     val connectProperties = new java.util.Properties()
    27     connectProperties.put("user", username)
    28     connectProperties.put("password", password)
    29     Class.forName(driver).newInstance()
    30 
    31     //write back Oracle
    32     //Note: When writing the results back orale, be sure that the target table existing
    33     JdbcUtils.saveTable(mr_case_df, oracleDriverUrl, "MR", connectProperties)

    方案二:

    val test_df=hiveContext.sql("select * from test")
    
    test_df.foreachPartition(rows => {
      Class.forName(driver)
    
      val connection: Connection = DriverManager.getConnection(oracleDriverUrl, username, password)
      val prepareStatement: PreparedStatement = connection.prepareStatement("insert into RES_CELL(City,Latiude,longitude)values(?,?,?);")
    
      rows.foreach(row => {
        prepareStatement.setString(1, row.getAs[String]("city"))      
        prepareStatement.setString(2, row.getAs[String]("latitude"))
        prepareStatement.setString(3, row.getAs[String]("longitude"))
        prepareStatement.addBatch()
      })
      prepareStatement.executeBatch()
    
      prepareStatement.close()
      connection.close()
    })

    方案三:

    使用sqlloader从spark任务提交节点读取文件导入到oracle。

    为什么操作,原因直接从spark中读取hive中的数据使用网络IO连接到集群外的oracle服务器是spark集群不乐意做的事情,对SPARK宝贵的网络IO来说开销特别大。

  • 相关阅读:
    mhWaveEdit 1.4.8
    FFmpeg — 屏幕录制器材
    GNOME 主题: Troll
    cGmail — 主动反省邮件
    最小化布置 Ubuntu
    GNOME Do — 疾速翻开法式和文件
    PyTone 一个控制台音乐播放器
    高恪守编辑器 VIM-把持篇(2)
    Cankiri:玲珑实用的屏幕录像机
    LiVES 0.9.6pre4
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7017960.html
Copyright © 2011-2022 走看看