zoukankan      html  css  js  c++  java
  • SparkSQL访问Hive源,MySQL源

    作者:黑暗行动

    一、SparkSQL访问Hive源

    软件环境

    hadoop2.7.6
    spark-2.3.0
    scala-2.11.12
    hive-2.1.1

    SparkSQL命令行模式可以直接连接Hive的

    将hive目录中的 D:Softapache-hive-2.1.1-binconfhive-site.xml 文件拷贝贝到 D:Softsparkconf spark目录中

    D:softsparkjars 目录中放 mysql-connector-java-5.1.30.jar 包

    Java程序SparkSQL连接Hive

    1)将hive目录中的 D:Softapache-hive-2.1.1-binconfhive-site.xml 文件拷贝到 srcmain esources 资源目录中

    2)添加依赖

     <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>2.3.1</version>
          <scope>provided</scope>
        </dependency>
     
     <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.30</version>
        </dependency>
    

    3) 创建SparkSession

     /**
         * SparkSession
         * 支持数据源:hive
         * @return
         */
        public static SparkSession getSparkSessionForHive() {
            return SparkSession
                    .builder()
                    .appName("SparkSQLForHive")
                    .master("local[*]")
                    .enableHiveSupport()
                    .getOrCreate();
        }
    
    1. 测试代码
     public static void main(String[] args) {
            SparkSession spark = SparkUtil.getSparkSessionForHive();
            spark.sql("show tables").show();
            spark.sql("select * from test1").show();
        }
    
    1. 运行结果
    18/11/18 22:36:44 INFO CodeGenerator: Code generated in 234.231366 ms
    18/11/18 22:36:44 INFO CodeGenerator: Code generated in 11.285122 ms
    +--------+--------------+-----------+
    |database|     tableName|isTemporary|
    +--------+--------------+-----------+
    | default|bucket_persion|      false|
    | default|   bucket_temp|      false|
    | default|         hdfs1|      false|
    | default|         hdfs2|      false|
    | default|           pt1|      false|
    | default|        tbcsv1|      false|
    | default|        tbcsv2|      false|
    | default|         test1|      false|
    | default|  test_table_2|      false|
    +--------+--------------+-----------+
     
    .........
     
    18/11/18 22:36:46 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1346 bytes result sent to driver
    18/11/18 22:36:46 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 237 ms on localhost (executor driver) (1/1)
    18/11/18 22:36:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    18/11/18 22:36:46 INFO DAGScheduler: ResultStage 0 (show at redHive.java:14) finished in 0.313 s
    18/11/18 22:36:46 INFO DAGScheduler: Job 0 finished: show at redHive.java:14, took 0.352593 s
    +-------+---+-------+------+
    |   name|age|address|school|
    +-------+---+-------+------+
    |    chy|  1|     芜湖|    示范|
    |    zjj|  2|     南京|    南开|
    |gaoxing|  3|    马鞍山|   安工大|
    +-------+---+-------+------+
     
    18/11/18 22:36:46 INFO SparkContext: Invoking stop() from shutdown hook
    

    二、SparkSQL访问MySql源

    Spark环境

    spark-2.3.0

    添加依赖

    <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.0</version>
    </dependency>
     
    <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.22</version>
    </dependency>
    

    创建SparkSession

    /**
         * SparkSession
         * 支持数据源:textFile,load,csv,json,text,format,jdbc
         * @return
         */
        public static SparkSession getSparkSession() {
            return SparkSession
                    .builder()
                    .appName("SparkSQL")
                    .master("local[*]")
                    .getOrCreate();
        }
    

    访问Mysql方式1:

    public static void test(){
            String url="jdbc:mysql://localhost:3306/sparksql?user=root&password=123456";
            String tableName="users";
            SparkSession spark= SparkUtil.getSparkSession();
            Map<String,String> map=new HashMap<>();
            map.put("driver","com.mysql.jdbc.Driver");
            map.put("url",url);
            map.put("dbtable",tableName);
            map.put("fetchSize","100");
     
            //读取users信息
            Dataset<Row> jdbcDF = spark.read()
                    .format("jdbc")
                    .options(map)
                    .load();
     
            //读取users信息,保存到users_copy表
            jdbcDF.write()
                    .format("jdbc")
                    .option("url", url)
                    .option("dbtable", "users_copy")
                    .save();
           
    }
    

    访问Mysql方式2:

    public static void test2(){
            String url="jdbc:mysql://localhost:3306/sparksql";
            String tempTableName=" (select id,name from users) as u";
            SparkSession spark= SparkUtil.getSparkSession();
            Properties connectionProperties = new Properties();
            connectionProperties.put("user", "root");
            connectionProperties.put("password", "123456");
            connectionProperties.put("isolationLevel","REPEATABLE_READ");
            //读取users信息
            Dataset<Row> jdbcDF2 = spark.read()
                    .jdbc(url, tempTableName, connectionProperties);
            //读取users信息,保存到users1表
            jdbcDF2.write()
                    .jdbc(url, "users1", connectionProperties);
     
    }
    
  • 相关阅读:
    SharedPreferences介绍,用来做数据存储
    android中的回调简单认识
    git的使用
    Android Studio插件美化Android Studio,文艺清新范
    arp欺骗技术
    进程和线程的关系
    Win下常用命令大全
    JavaWeb系列之:Servlet
    JavaWeb系列之:监听器
    JavaWeb系列之:过滤器
  • 原文地址:https://www.cnblogs.com/aixing/p/13327403.html
Copyright © 2011-2022 走看看