zoukankan      html  css  js  c++  java
  • sparkSQL以JDBC为数据源

    一、环境准备

    安装oracle后,创建测试表、数据:

    
    
    1. create table test (
    2.  username varchar2(32) primary key ,
    3.  password varchar2(32)
    4. ); 
    5. insert into test values('John','1234');
    6. insert into test values('Mike','1234');
    7. insert into test values('Jim','1234');
    8. insert into test values('Ana','1234');
    9. insert into test values('Ennerson','1234');
    10. commit;

     

    二、实现代码

    1、建立JDBC连接读取数据

    
    
    1. SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL").setMaster("local[6]");
    2. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    3. SQLContext sqlContext = new SQLContext(jsc);
    4.  
    5. Map<String, String> options = new HashMap<String, String>();
    6. options.put("url", "jdbc:oracle:thin:@192.168.168.100:1521/orcl");
    7. options.put("user", "flume");
    8. options.put("password","1234");
    9. //读取test表
    10. options.put("dbtable", "test");
    11. Dataset<Row> df = sqlContext.read().format("jdbc").options(options).load();
    12. df.show();
    13. /*+--------+--------+
    14. |USERNAME|PASSWORD|
    15. +--------+--------+
    16. |    John|    1234|
    17. |    Mike|    1234|
    18. |     Jim|    1234|
    19. |     Ana|    1234|
    20. |Ennerson|    1234|
    21. +--------+--------+*/

    2、遍历Dataset<Row>集合

    
    
    1. //遍历Dataset<Row>集合
    2. List<Row> list = df.collectAsList();
    3. //读取test表中username字段的数据
    4. for(int i = 0;< list.size();i++){
    5. System.out.println(list.get(i).<String>getAs("USERNAME"));
    6. }
    7. /*John
    8. Mike
    9. Jim
    10. Ana
    11. Ennerson*/

    3、执行SQL语句

    
    
    1. //执行sql语句
    2. //一定要有df.createOrReplaceTempView("test"); 否则会报
    3. //“Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: test; line 1 pos 0”
    4. df.createOrReplaceTempView("test");
    5. sqlContext.sql("insert into test values('Obama','6666')");

    4、引入spark-sql依赖包

    在pom.xml文件中引入sparksql依赖包

    
    
    1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    2. <dependency>
    3.     <groupId>org.apache.spark</groupId>
    4.     <artifactId>spark-sql_2.11</artifactId>
    5.     <version>2.1.1</version>
    6.     <scope>runtime</scope>
    7. </dependency>

    三、No suitable driver解决

    1、在Eclipse上运行报Exception in thread "main" java.sql.SQLException: No suitable driver错误:

    
    
    1. Exception in thread "main" java.sql.SQLException: No suitable driver    at java.sql    .DriverManager    .getDriver(DriverManager.java:315)    at org.apache.spark.sql.execution.datasources.jdbc    .JDBCOptions$$anonfun$7    .apply(JDBCOptions.scala:84)    at org.apache.spark.sql.execution.datasources.jdbc    .JDBCOptions$$anonfun$7.apply    (JDBCOptions.scala:84)    at scala.Option.getOrElse(Option.scala:121    )    at org.apache.spark.sql.execution    .datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:83)    at org    .apache.spark.sql.execution.datasources    .jdbc.JDBCOptions.<init>(JDBCOptions.scala:34)    at org.apache    .spark.sql.execution.datasources.jdbc    .JdbcRelationProvider.createRelation(JdbcRelationProvider            .scala:32)    at org.apache.spark.sql    .execution.datasources.DataSource.resolveRelation(DataSource.scala:330    )    at org.apache.spark.sql    .DataFrameReader.load(DataFrameReader.scala:152)    at org.apache.spark.sql    .DataFrameReader.load    (DataFrameReader.scala:125)    at com.spark.test.JavaSparkSQL.main(JavaSparkSQL.java    :26    )

    原因是没有引用oracle的jdbc驱动包,配置pom.xml文件如下:

    
    
    1. <!-- oracle jdbc驱动 -->
    2. <dependency>      
    3.   <groupId>com.oracle</groupId>      
    4.   <artifactId>ojdbc5</artifactId>      
    5.   <version>11.2.0.1.0</version>
    6.   <scope>runtime</scope>
    7. </dependency>  

    由于Oracle授权问题,Maven不提供oracle JDBC driver,为了在Maven项目中应用Oracle JDBC driver,必须手动添加到本地仓库。

    具体可以参考:maven添加oracle jdbc依赖

    2、在spark集群环境上运行报Exception in thread "main" java.sql.SQLException: No suitable driver错误:

    
    
    1. Exception in thread "main" java.sql.SQLException: No suitable driver
    2. at java.sql.DriverManager.getDriver(DriverManager.java:315)
    3. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)
    4. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)
    5. at scala.Option.getOrElse(Option.scala:121)
    6. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:83)
    7. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:34)
    8. at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
    9. at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)
    10. at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    11. at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
    12. at com.spark.main.JavaLocalDirectKafkaSparkSQLCarNumber.main(JavaLocalDirectKafkaSparkSQLCarNumber.java:117)
    13. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    14. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    15. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    16. at java.lang.reflect.Method.invoke(Method.java:498)
    17. at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
    18. at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    19. at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    20. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    21. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    
    
    1. cd /usr/local/spark/spark-2.1.1-bin-hadoop2.7;
    2. bin/spark-submit 
    3. --master spark://master:7077 
    4. --class "com.spark.main.JavaLocalDirectKafkaSparkSQLCarNumber" 
    5. myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;

    需要在spark集群环境上,指定对应的jdbc驱动包:--driver-class-path myApp/ojdbc5.jar

    
    
    1. cd /usr/local/spark/spark-2.1.1-bin-hadoop2.7;
    2. bin/spark-submit 
    3. --driver-class-path myApp/ojdbc5.jar 
    4. --master spark://master:7077 
    5. --class "com.spark.main.JavaLocalDirectKafkaSparkSQLCarNumber" 
    6. myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;

     

    再次运行,成功!!!

     

  • 相关阅读:
    Openstack Swift 添加和删除 custom metadata name,通过 libcurl
    Windows Error Reporting, Mini dump 2, Full dump 1
    英语邮件结尾时常用的20个句子
    QT 编译时 Warning: The name 'layoutWidget' (QWidget) is already in use, defaulting to 'layoutWidget1'.
    QVBoxLayout 不能移动
    学生管理系统
    使用python发送邮件
    ElasticSearch 基础 2
    ElasticSearch 基础 1
    Linux系统命令
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723875.html
Copyright © 2011-2022 走看看