zoukankan      html  css  js  c++  java
  • sparkSql将不同数据库数据写入hive

    展示从Oracle与sqlServer数据写入到Hive中

    在idea的resources文件夹放入配置文件:core-site.xml、hdfs-site.xml,hive-site.xml

    hive-site.xml内容 
    <configuration>
        <property>
            <!--根据ambari中General内容的属性值进行修改 -->
            <name>hive.metastore.uris</name>
            <value>thrift://dnode5:9083</value>
        </property>
     
        <property>
            <name>hive.execution.engine</name>
            <value>mr</value>
        </property>
    </configuration>

    代码

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object WriteToHive {
      def main(args: Array[String]): Unit = {
    
        //创建sparkConf
        val sparkConf = new SparkConf().setAppName("Oracle_SqlSever_Hive").setMaster("local[*]")
    
        //创建SparkSession
        val spark = SparkSession.builder.config(sparkConf)
          .enableHiveSupport//开启hive支持
          .getOrCreate
    
    
        //通过jdbc获取数据转化为DataFrame
        val o_table = ReadOracle(spark)
        val sqs_table = ReadSqlserver(spark)
    
        //创建临时表
        o_table.createOrReplaceTempView("v_account")
        sqs_table.createOrReplaceTempView("v_record_copy")
    
        val sql = "select " +
          "vr.CardData carddata," +
          "va.SNO pcode," +
          "vr.PName pname," +
          "vr.OccTime occtime," +
          "vr.CodeIndex codeindex," +
          "vr.PortNum portnum," +
          "vr.EquptID equptid," +
          "vr.EquptName equptname," +
          "vr.LctnName lctnname," +
          "date_format(occtime,'yyyy') occyear," +
          "date_format(occtime,'MM') occmonth," +
          "date_format(occtime,'dd') occday from v_record_copy vr " +
          "left join v_account va on vr.CardData = va.CARDID " +
          "order by vr.OccTime desc"
    
        //通过sparkSql操作hive执行hive语句
        spark.sql(sql).createOrReplaceTempView("v_table")
        spark.sql("use sjkm")
        spark.sql("select * from v_table").write.mode("overwrite").saveAsTable("view_record")
        spark.stop()
      }
    
      //通过jdbc连接oracle
      def ReadOracle(sparkSession: SparkSession): DataFrame = {
        sparkSession.read.format("jdbc")
          .option("url", "jdbc:oracle:thin:@ip:5521/服务名")
          .option("dbtable", "库名.表名")
          .option("user", "xxx")
          .option("password", "xxxx")
          .option("driver", "oracle.jdbc.OracleDriver")
          .load()
    
      }
    
      //通过jdbc连接sqlserver
      def ReadSqlserver(sparkSession: SparkSession): DataFrame = {
        sparkSession.read.format("jdbc")
          .option("url", "jdbc:sqlserver://ip:5521;databaseName=数据库名")
          .option("dbtable", "模式名.表名")
          .option("user", "xxxx")
          .option("password", "xxxx")
          .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
          .load()
      }
    
    }
      
    

      

  • 相关阅读:
    Java 编程规范
    Java常考面试题
    SQL 实战
    快速排序
    剑指Offer(51-67)
    剑指Offer(41-50)
    移动端图片编辑器
    css隐藏和显示table的第一列
    sweetAlert1 设置弹窗宽度,及使用自定义样式
    js获取yyyy-mm-dd hh:mm:ss格式的当前系统时间
  • 原文地址:https://www.cnblogs.com/mergy/p/12793008.html
Copyright © 2011-2022 走看看