zoukankan      html  css  js  c++  java
  • [Spark SQL]Spark SQL读取Kudu,写入Hive

    SparkUnit
    Function:用于获取Spark Session

    package com.example.unitl

    import org.apache.spark.sql.SparkSession

    object SparkUnit {
    def getLocal(appName: String): SparkSession = {
    SparkSession.builder().appName(appName).master("local[*]").getOrCreate()
    }

    def getLocal(appName: String, supportHive: Boolean): SparkSession = {
    if (supportHive) getLocal(appName,"local[*]",true)
    else getLocal(appName)
    }

    def getLocal(appName:String,master:String,supportHive:Boolean): SparkSession = {
    if (supportHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
    else SparkSession.builder().appName(appName).master(master).getOrCreate()
    }

    def stopSs(ss:SparkSession): Unit ={
    if (ss != null) {
    ss.stop()
    }
    }
    }

     
    log4j.properties
    Function:设置控制台输出级别

    # Set everything to be logged to the console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

    # Set the default spark-shell log level to WARN. When running the spark-shell, the
    # log level for this class is used to overwrite the root logger's log level, so that
    # the user can have different defaults for the shell and regular Spark apps.
    log4j.logger.org.apache.spark.repl.Main=WARN

    # Settings to quiet third party logs that are too verbose
    log4j.logger.org.spark_project.jetty=WARN
    log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
    log4j.logger.org.apache.parquet=ERROR
    log4j.logger.parquet=ERROR

    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
     
    KTV
    Function:读取kudu,写入hive。Kudu_To_Hive,简称KTV

    package com.example.dao

    import com.example.unitl.SparkUnit
    import org.apache.spark.sql.SparkSession

    object KTV {
    def getKuduTableDataFrame(ss: SparkSession): Unit = {
    // 读取kudu
    // 获取tb对象
    val kuduTb = ss.read.format("org.apache.kudu.spark.kudu")
    .option("kudu.master", "10.168.1.12:7051")
    .option("kudu.table", "impala::realtimedcs.wtr31") // Tips:注意指定库
    .load()

    // create view
    kuduTb.createTempView("wtr31")

    val kudu_unit1_df = ss.sql(
    """
    |SELECT * FROM `wtr31`
    |WHERE `splittime` = "2021-07-11"
    |""".stripMargin)

    // print
    kudu_unit1_df.printSchema()
    kudu_unit1_df.show()

    // load of memory
    kudu_unit1_df.createOrReplaceTempView("wtr31_bakup")
    }

    def insertHive(ss: SparkSession): Unit = {
    // create table
    ss.sql(
    """
    |USE `realtimebakup`
    |""".stripMargin)

    ss.sql(
    """
    | CREATE TABLE IF NOT EXISTS `dcs_wtr31_bakup`(
    | `id` int,
    | `packtimestr` string,
    | `dcs_name` string,
    | `dcs_type` string,
    | `dcs_value` string,
    | `dcs_as` string,
    | `dcs_as2` string)
    | PARTITIONED BY (
    | `splittime` string)
    |""".stripMargin)
    println("创建表成功!")

    // create view
    ss.sql(
    """
    |INSERT INTO `dcs_wtr31_bakup`
    |SELECT * FROM wtr31_bakup
    |""".stripMargin)
    println("保存成功!")
    }

    def main(args: Array[String]): Unit = {
    //get ss
    val ss = SparkUnit.getLocal("KTV", true)
    // 做动态分区, 所以要先设定partition参数
    // default是false, 需要额外下指令打开这个开关
    ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
    ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");

    // 调用方法
    getKuduTableDataFrame(ss)
    insertHive(ss)

    // 关闭连接
    SparkUnit.stopSs(ss)
    }

    运行截图:
    注意到那个红框了吗?运行时请将hive的配置文件 hive-site.xml文件,复制到项目resource下,否则必报错。


    hue查看写入的数据:

     

    ————————————————
    版权声明:本文为CSDN博主「NBA首席形象大使阿坤」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_44491709/article/details/118656681

  • 相关阅读:
    jsp用equals判断两个字符串变量是否相等
    使用session在jsp页面之间传递多维数组,用于实现全局变量的效果
    C++实现对MySQL数据库的连接,以及增删改查
    VS2017项目中使用代码连接MySQL数据库,以及进行数据添加
    VS2017中遇到不存在从string到const char*的转换函数的解决方法
    windows系统转linux系统后磁盘的处理
    redis集群节点重启后恢复
    Jenkins 与Docker/Kubernetes的自动化CI流水(笔记)
    shell的运用 : jenkins 编译 打包前端发布 生产(tomcat)
    云服务器linux系统修改时间和时区
  • 原文地址:https://www.cnblogs.com/javalinux/p/15078647.html
Copyright © 2011-2022 走看看