1.将$HIVE_HOME/conf/hive-site.xml文件复制一份到$SPARK_HOME/conf/hive-site.xml
cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf
2.直接启动spark-shell就能帮我们自动连接
./spark-shell --master local[2] --jars /usr/local/jar/mysql-connector-java-5.1.47.jar # --jars:是指定jar包
3.直接启动spark-shell就能帮我们自动连接
./spark-sql --master local[2] --jars /usr/local/jar/mysql-connector-java-5.1.47.jar --driver-class-path /usr/local/jar/mysql-connector-java-5.1.47.jar
4.我们可以启动一个启动thriftserver服务器server,7*24一直running
cd $SPARK_HOME/sbin ./start-thriftserver.sh --master local --jars /usr/local/jar/mysql-connector-java-5.1.47.jar # 启动默认监听端口10000
5.通过内置了一个客户端工具连接
cd $SPARK_HOME/bin/beeline ./beeline -u jdbc:hive2://192.168.104.94:10000
6.也可以使用代码连接
package com.imooc.bigdata.chapter06 import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} object JDBCClientApp { def main(args: Array[String]): Unit = { // 加载驱动 Class.forName("org.apache.hive.jdbc.HiveDriver") val conn: Connection = DriverManager.getConnection("jdbc:hive2://192.168.104.94:10000") val pstmt: PreparedStatement = conn.prepareStatement("show tables") val rs: ResultSet = pstmt.executeQuery() while(rs.next()) { println(rs.getObject(1) + " : " + rs.getObject(2)) } } }
package com.imooc.bigdata.chapter06 import java.util.Properties import com.typesafe.config.ConfigFactory import org.apache.spark.sql.{DataFrame, SparkSession} object HiveSourceApp { def main(args: Array[String]): Unit = { // 如果你想使用Spark来访问Hive的时候,一定需要开启Hive的支持 val spark: SparkSession = SparkSession.builder().master("local").appName("HiveSourceApp") .enableHiveSupport() //切记:一定要开启 .getOrCreate() // 走的就是连接 default数据库中的pk表,如果你是其他数据库的,那么也采用类似的写法即可 //spark.table("default.pk").show() // input(Hive/MySQL/JSON...) ==> 处理 ==> output (Hive) import spark.implicits._ val config = ConfigFactory.load() val url = config.getString("db.default.url") val user = config.getString("db.default.user") val password = config.getString("db.default.password") val driver = config.getString("db.default.driver") val database = config.getString("db.default.database") val table = config.getString("db.default.table") val sinkTable = config.getString("db.default.sink.table") val connectionProperties = new Properties() connectionProperties.put("user", user) connectionProperties.put("password", password) val jdbcDF: DataFrame = spark.read .jdbc(url, s"$database.$table", connectionProperties).filter($"cnt" > 100) //jdbcDF.show() jdbcDF.write.saveAsTable("browser_stat_hive") jdbcDF.write.insertInto("browser_stat_hive_1") spark.stop() } }