背景
sparkJDBC在写入时提供了overwrite模式。当写入数据之前,会将之前的表drop掉,然后根据DataFrame类型推断生成Create语句新建一张表。
在某些小众的数据库,spark内部没有提供对应的方言。这是spark会使用一个NoopCommon的默认方言,这时候很容易推断错误。
案例
以vertica为例,在DataFrame中包含String类型时,如下:
root |-- time: timestamp (nullable = true) |-- AMP: double (nullable = true) |-- NOZP: integer (nullable = true) |-- value: integer (nullable = true) |-- reason: string (nullable = true)
这时在直接用jdbc写入时会报
java.sql.SQLSyntaxErrorException: [Vertica][VJDBC](5108) ERROR: Type "TEXT" does not exist
因此需要注册一个vertica方言,将String类型转为一个合适varchar类型。如下
object VerticaDialect extends JdbcDialect { override def canHandle(url: String): Boolean = { url.toLowerCase(Locale.ROOT).startsWith("jdbc:vertica") } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("LONG VARCHAR", Types.LONGVARCHAR)) case _ => None } }
因为String是无界的,所以转为long varchar最为保险。这里只需要转String类型即可,因为在JdbcUtils里面
private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}")) }
sparkJDBC会优先找注册的方言,然后还会在CommonJDBCType中找一次。所以其他通用的类型不需要在方言中重复指定。
最后在driver中注册方言
JdbcDialects.registerDialect(VerticaDialect)