zoukankan      html  css  js  c++  java
  • Spark SQL External Data Sources JDBC简易实现

    在spark1.2版本中最令我期待的功能是External Data Sources,通过该API可以直接将External Data Sources注册成一个临时表,该表可以和已经存在的表等通过sql进行查询操作。External Data Sources API代码存放于org.apache.spark.sql包中。


    具体的分析可参见OopsOutOfMemory的两篇精彩博文:

    http://blog.csdn.net/oopsoom/article/details/42061077

    http://blog.csdn.net/oopsoom/article/details/42064075

    自己尝试实现了一个简易的读取关系型数据库的外部数据源,代码参见:https://github.com/luogankun/spark-jdbc

    支持MySQL/Oracle/DB2,以及几种简单的数据类型,暂时还不支持PrunedScan、PrunedFilteredScan,仅支持TableScan,后续在接着完善。

    使用步骤:

    1、编译spark-jdbc代码

    sbt package

    2、添加jar包到spark-env.sh

    export SPARK_CLASSPATH=/home/spark/software/source/spark_package/spark-jdbc/target/scala-2.10/spark-jdbc_2.10-0.1.jar:$SPARK_CLASSPATH
    export SPARK_CLASSPATH=/home/spark/lib/ojdbc6.jar:$SPARK_CLASSPATH
    export SPARK_CLASSPATH=/home/spark/lib/db2jcc4.jar:$SPARK_CLASSPATH
    export SPARK_CLASSPATH=/home/spark/lib/mysql-connector-java-3.0.10.jar:$SPARK_CLASSPATH

    3、SQL使用:启动spark-sql

    参数说明:

    url :关系型数据库url

    user :关系型数据库用户名

    password: 关系型数据库密码

    sql:关系型数据库sql查询语句

    MySQL: 

    CREATE TEMPORARY TABLE jdbc_table
    USING com.luogankun.spark.jdbc
    OPTIONS (
    url    'jdbc:mysql://hadoop000:3306/hive',
    user    'root',
    password    'root',
    sql 'select TBL_ID,TBL_NAME,TBL_TYPE FROM TBLS WHERE TBL_ID < 100'
    );
    
    SELECT * FROM jdbc_table;

    Oracle:

    CREATE TEMPORARY TABLE jdbc_table
    USING com.luogankun.spark.jdbc
    OPTIONS (
    url    'jdbc:oracle:thin:@hadoop000:1521/ora11g',
    user    'coc',
    password    'coc',
    sql 'select HISTORY_ID, APPROVE_ROLE_ID, APPROVE_OPINION from CI_APPROVE_HISTORY'
    );
    
    SELECT * FROM jdbc_table;

    DB2:

    CREATE TEMPORARY TABLE jdbc_table
    USING com.luogankun.spark.jdbc
    OPTIONS (
    url    'jdbc:db2://hadoop000:60000/CI',
    user    'ci',
    password    'ci',
    sql 'select LABEL_ID from coc.CI_APPROVE_STATUS'
    );
    
    SELECT * FROM jdbc_table;

    在测试过程中遇到的问题:

    如上的代码在连接MySQL数据库操作时没有问题,但是在操作Oracle或者DB2数据库时,报错如下:

    09:56:48,302 [Executor task launch worker-0] ERROR Logging$class : Error in TaskCompletionListener
    java.lang.AbstractMethodError: oracle.jdbc.driver.OracleResultSetImpl.isClosed()Z
        at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:99)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
        at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
        at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
        at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:85)
        at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:110)
        at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:108)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:108)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
    09:56:48,302 [Executor task launch worker-1] ERROR Logging$class : Error in TaskCompletionListener

    跟了下JdbcRDD源代码发现,问题在于:

    我在本案例中使用的oracle的驱动是ojdbc14-10.2.0.3.jar,查阅了些资料说是Oracle的实现类没有该方法;

    该issues详见: https://issues.apache.org/jira/browse/SPARK-5239

    解决办法:

    1)升级驱动包;

    2)暂时屏蔽掉这两个isClosed的判断方法(https://github.com/apache/spark/pull/4033)

    4、Scala API使用方式

    import  com.luogankun.spark.jdbc._
    val sqlContext = new HiveContext(sc)
    val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root","root","select id, name from city")
    cities.collect

    后续将会继续完善,现在的实现确实很“丑陋”,凑合着先能使用吧。

  • 相关阅读:
    RPC之总体架构
    Netty总结
    数据结构(2)
    数据结构(1)
    java初探(1)之秒杀项目总结
    java初探(1)之秒杀的安全
    java初探(1)之秒杀中的rabbitMQ
    java初探(1)之防止库存为负以及防超买
    java初探(1)之静态页面化——客户端缓存
    java初探(1)之缓存技术
  • 原文地址:https://www.cnblogs.com/luogankun/p/4235912.html
Copyright © 2011-2022 走看看