依赖:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency>
集成Hive
Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的 一点是,如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。
ps:一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了
1.使用内置hive
ps:版本为1.2.1
ps:需要注意内置hive是非常容易出现问题的
1.先启动集群/opt/software/spark-2.2.0-bin-hadoop2.7/sbin/start-all.sh
2.进入到spark-shell模式/opt/software/spark-2.2.0-bin-hadoop2.7/bin/spark-shell --master spark://hadoop01:7077
3.在spark-shell下操作hive
spark.sql("show tables").show 查询所有hive的表
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING)") 创建表
spark.sql("LOAD DATA LOCAL INPATH '/opt/software/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src") 添加数据
spark.sql("SELECT * FROM src").show 查询表中数据
会出现一个问题FileNotFoundException 没有找到文件
通过在主节点和从节点查询可以发现,主节点存在spark-warehouse目录 目录中是存在数据的
但是在从节点中没有这个文件夹,所以此时将文件夹分发到从节点
scp -r ./spark-warehouse/ root@hadoop02:$PWD
再次执行查询
ps:这样表面看查询是没什么问题了,但是实际问题在于是讲master节点上的数据分发到从节点上的,那么不可能说每次操作有了数据都执行拷贝操作,所以此时就需要使用HDFS来进行存储数据了
所以先将所有从节点上的spark-warehouse删除掉
删除后将主节点上的spark-warehouse和metastor_db删除掉
然后在启动集群的时候添加一个命令
--conf spark.sql.warehouse.dir=hdfs://hadoop01:8020/spark_warehouse
此方法只做一次启动即可 后续再启动集群的时候就无需添加这个命了 因为记录在metastore_db中了
ps:spark-sql中可以直接使用SQL语句操作
2.集成外部hive(重要)
1.将Hive中的hive-site.xml软连接到Spark安装目录下的conf目录下。[主节点有即可]
ln -s /opt/software/apache-hive-1.2.1-bin/conf/hive-site.xml /opt/software/spark-2.2.0-bin-hadoop2.7/conf/hive-site.xml
2.打开spark shell,注意带上访问Hive元数据库的JDBC客户端
将mysql驱动jar包拷贝到spark的bin目录下
./spark-shell --master spark://hadoop01:7077 --jars mysql-connector-java-5.1.36.jar
ps:做完外部hive链接需要注意,因为hive-site.xml文件是在Spark的conf目录下,若直接启动spark-shell无论是单机版还是集群版都会出现报错 Error creating transactional connection factory 原因在于,启动时会加载hive-site.xml文件,所以必须添加jar路径, 为了以后使用建议删除软连接,需要的时候在做外部hive的连接
删除软连接方式:
rm -rf 软连接方式
总结:
若要把Spark SQL连接到一个部署好的Hive上,你必须把hive-site.xml复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
3.通过代码操作(重要)
ps:需要有Hadoop本地环境
import org.apache.spark.sql.{Row, SparkSession} object HiveCode { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("HiveCode") .config("spark.sql.warehouse.dir", "D:\spark-warehouse") .master("local[*]") .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql // sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'dir/kv1.txt' INTO TABLE src_1") sql("SELECT * FROM src_1").show() sql("SELECT COUNT(*) FROM src_1").show() val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key") sqlDF.as("mixing").show() val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show() } } case class Record(key: Int, value: String)
ps:本地若出现"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder和权限异常问题
在cmd命令下进入到D:/hadoop2.7.1/bin目录下执行命令即可
winutils.exe chmod 777 /tmp/hive
连接服务器hive
这里出现了一个异常 Could not find the uri with key [dfs.encryption.key.provider.uri] to create a KeyProvider !! 这个异常并没有找到解决问题的办法,结果自己能执行了!!!!! 谁能解决麻烦备注说明!
我的Hadoop集群是高可用所以我在windows下配置了C:WindowsSystem32driversetc路径下的
hosts文件 只要应对我的 mycluster (这一个不配置应该是可以的)
192.168.223.111 mycluster 192.168.223.112 mycluster
import java.io.File import org.apache.spark.sql.{Row, SparkSession} object HiveCode { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("HiveCode") .config("spark.sql.warehouse.dir", "hdfs://hadoop01:9000/spark_warehouse") .master("spark://hadoop01:7077") .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql //sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)") // sql("LOAD DATA LOCAL INPATH 'dir/kv1.txt' INTO TABLE src_1") sql("SELECT * FROM src_1").show() sql("SELECT COUNT(*) FROM src_1").show() val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key") sqlDF.as("mixing").show() val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show() } } case class Record(key: Int, value: String)