zoukankan      html  css  js  c++  java
  • spark hive 小结

    依赖:

     

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.1.1</version>
                <scope>provided</scope>
            </dependency>

    集成Hive

    Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的 一点是,如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。

    ps:一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了

    1.使用内置hive

    ps:版本为1.2.1

    ps:需要注意内置hive是非常容易出现问题的
    1.先启动集群/opt/software/spark-2.2.0-bin-hadoop2.7/sbin/start-all.sh
    2.进入到spark-shell模式/opt/software/spark-2.2.0-bin-hadoop2.7/bin/spark-shell --master spark://hadoop01:7077
    3.在spark-shell下操作hive
    spark.sql("show tables").show 查询所有hive的表
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING)") 创建表
    spark.sql("LOAD DATA LOCAL INPATH '/opt/software/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src") 添加数据
    spark.sql("SELECT * FROM src").show 查询表中数据
    会出现一个问题FileNotFoundException 没有找到文件
    通过在主节点和从节点查询可以发现,主节点存在spark-warehouse目录 目录中是存在数据的
    但是在从节点中没有这个文件夹,所以此时将文件夹分发到从节点
    scp -r ./spark-warehouse/ root@hadoop02:$PWD
    再次执行查询
    ps:这样表面看查询是没什么问题了,但是实际问题在于是讲master节点上的数据分发到从节点上的,那么不可能说每次操作有了数据都执行拷贝操作,所以此时就需要使用HDFS来进行存储数据了
    所以先将所有从节点上的spark-warehouse删除掉
    删除后将主节点上的spark-warehouse和metastor_db删除掉
    然后在启动集群的时候添加一个命令
    --conf spark.sql.warehouse.dir=hdfs://hadoop01:8020/spark_warehouse
    此方法只做一次启动即可 后续再启动集群的时候就无需添加这个命了 因为记录在metastore_db中了
    ps:spark-sql中可以直接使用SQL语句操作

    2.集成外部hive(重要)

    1.将Hive中的hive-site.xml软连接到Spark安装目录下的conf目录下。[主节点有即可]
    ln -s /opt/software/apache-hive-1.2.1-bin/conf/hive-site.xml /opt/software/spark-2.2.0-bin-hadoop2.7/conf/hive-site.xml
    2.打开spark shell,注意带上访问Hive元数据库的JDBC客户端
    将mysql驱动jar包拷贝到spark的bin目录下
    ./spark-shell --master spark://hadoop01:7077 --jars mysql-connector-java-5.1.36.jar

    ps:做完外部hive链接需要注意,因为hive-site.xml文件是在Spark的conf目录下,若直接启动spark-shell无论是单机版还是集群版都会出现报错 Error creating transactional connection factory 原因在于,启动时会加载hive-site.xml文件,所以必须添加jar路径, 为了以后使用建议删除软连接,需要的时候在做外部hive的连接
    删除软连接方式:
    rm -rf 软连接方式

     

    总结:

    若要把Spark SQL连接到一个部署好的Hive上,你必须把hive-site.xml复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

    3.通过代码操作(重要)

    ps:需要有Hadoop本地环境

    import org.apache.spark.sql.{Row, SparkSession}
    object HiveCode {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("HiveCode")
          .config("spark.sql.warehouse.dir", "D:\spark-warehouse")
          .master("local[*]")
          .enableHiveSupport()
          .getOrCreate()
        import spark.implicits._
        import spark.sql
       // sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)")
        sql("LOAD DATA LOCAL INPATH  'dir/kv1.txt' INTO TABLE src_1")
        sql("SELECT * FROM src_1").show()
        sql("SELECT COUNT(*) FROM src_1").show()
        val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key")
        sqlDF.as("mixing").show()
        val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
        recordsDF.createOrReplaceTempView("records")
        sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show()
      }
    }
    case class Record(key: Int, value: String)
     

    ps:本地若出现"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder和权限异常问题

    在cmd命令下进入到D:/hadoop2.7.1/bin目录下执行命令即可

    winutils.exe chmod 777 /tmp/hive

    连接服务器hive

    ps:需要添加hive-site.xml hdfs-site.xml core-site.xml

    这里出现了一个异常 Could not find the uri with key [dfs.encryption.key.provider.uri] to create a KeyProvider !! 这个异常并没有找到解决问题的办法,结果自己能执行了!!!!! 谁能解决麻烦备注说明!

    我的Hadoop集群是高可用所以我在windows下配置了C:WindowsSystem32driversetc路径下的

    hosts文件 只要应对我的 mycluster (这一个不配置应该是可以的)

    192.168.223.111 mycluster 192.168.223.112 mycluster

    import java.io.File
    
    import org.apache.spark.sql.{Row, SparkSession}
    
    object HiveCode {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("HiveCode")
          .config("spark.sql.warehouse.dir", "hdfs://hadoop01:9000/spark_warehouse")
          .master("spark://hadoop01:7077")
          .enableHiveSupport()
          .getOrCreate()
        import spark.implicits._
        import spark.sql
        //sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)")
       // sql("LOAD DATA LOCAL INPATH 'dir/kv1.txt' INTO TABLE src_1")
        sql("SELECT * FROM src_1").show()
      sql("SELECT COUNT(*) FROM src_1").show()
       val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key")
       sqlDF.as("mixing").show()
        val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
        recordsDF.createOrReplaceTempView("records")
        sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show()
      }
    }
    case class Record(key: Int, value: String)

  • 相关阅读:
    hdoj 2586 How far away?(最近公共祖先)
    poj 1330 A-Nearest Common Ancestors
    心形图
    B1928 日期差值
    B1022 D进制的A+B
    B1009 说反话
    hihocoder 1498 签到
    51Nod 1082 与7无关的数
    51Nod 1015 水仙花数
    51Nod 1283 最小周长
  • 原文地址:https://www.cnblogs.com/lshan/p/13964168.html
Copyright © 2011-2022 走看看