展示从Oracle与sqlServer数据写入到Hive中
在idea的resources文件夹放入配置文件:core-site.xml、hdfs-site.xml,hive-site.xml
hive-site.xml内容 <configuration> <property> <!--根据ambari中General内容的属性值进行修改 --> <name>hive.metastore.uris</name> <value>thrift://dnode5:9083</value> </property> <property> <name>hive.execution.engine</name> <value>mr</value> </property> </configuration>
代码
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object WriteToHive {
def main(args: Array[String]): Unit = {
//创建sparkConf
val sparkConf = new SparkConf().setAppName("Oracle_SqlSever_Hive").setMaster("local[*]")
//创建SparkSession
val spark = SparkSession.builder.config(sparkConf)
.enableHiveSupport//开启hive支持
.getOrCreate
//通过jdbc获取数据转化为DataFrame
val o_table = ReadOracle(spark)
val sqs_table = ReadSqlserver(spark)
//创建临时表
o_table.createOrReplaceTempView("v_account")
sqs_table.createOrReplaceTempView("v_record_copy")
val sql = "select " +
"vr.CardData carddata," +
"va.SNO pcode," +
"vr.PName pname," +
"vr.OccTime occtime," +
"vr.CodeIndex codeindex," +
"vr.PortNum portnum," +
"vr.EquptID equptid," +
"vr.EquptName equptname," +
"vr.LctnName lctnname," +
"date_format(occtime,'yyyy') occyear," +
"date_format(occtime,'MM') occmonth," +
"date_format(occtime,'dd') occday from v_record_copy vr " +
"left join v_account va on vr.CardData = va.CARDID " +
"order by vr.OccTime desc"
//通过sparkSql操作hive执行hive语句
spark.sql(sql).createOrReplaceTempView("v_table")
spark.sql("use sjkm")
spark.sql("select * from v_table").write.mode("overwrite").saveAsTable("view_record")
spark.stop()
}
//通过jdbc连接oracle
def ReadOracle(sparkSession: SparkSession): DataFrame = {
sparkSession.read.format("jdbc")
.option("url", "jdbc:oracle:thin:@ip:5521/服务名")
.option("dbtable", "库名.表名")
.option("user", "xxx")
.option("password", "xxxx")
.option("driver", "oracle.jdbc.OracleDriver")
.load()
}
//通过jdbc连接sqlserver
def ReadSqlserver(sparkSession: SparkSession): DataFrame = {
sparkSession.read.format("jdbc")
.option("url", "jdbc:sqlserver://ip:5521;databaseName=数据库名")
.option("dbtable", "模式名.表名")
.option("user", "xxxx")
.option("password", "xxxx")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
}
}