zoukankan      html  css  js  c++  java
  • spark_hive_to_hbase

    object Hive_ODS_PaidMember {

    private val logger = LoggerFactory.getLogger(Hive_ODS_PaidMember.getClass)

    val prop = new Properties()
    val is: InputStream = this.getClass().getResourceAsStream("/hbase.properties")
    prop.load(is)

    val conf = new SparkConf().setAppName("ReadHiveToHbase")
    val sc = new SparkContext(conf)

    val configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.property.clientPort", prop.getProperty("hbase.clientPort"))
    configuration.set("hbase.zookeeper.quorum", prop.getProperty("hbase.quorum"))
    // configuration.set("hbase.master", "slave01-sit.cnsuning.com:60000")

    def main(args: Array[String]): Unit = {

    val jobConf = new JobConf(configuration)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "ns_sospdm:mds_offline_data")

    val sparkConf = new SparkConf().setAppName("HiveToHbase")
    sparkConf.set("spark.sql.hive.metastorePartitionPruning", "false")
    var querySql = "select * from sospdm.tdm_super_cust_status_init"
    val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val resultDF = session.sql(querySql)
    val tuple = resultDF.rdd.filter(row => {
    !CommonUtils.isEmpty(row.getAs[String]("cust_num"))
    }).map(row => {
    val cust_num: String = row.getAs[String]("cust_num")
    val paid_type: String = row.getAs[String]("paid_type")
    val eff_date: String = row.getAs[String]("eff_date")
    val levl_change_time: String = row.getAs[String]("levl_change_time")

    val put = new Put(Bytes.toBytes(cust_num.reverse))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("cust_num"), Bytes.toBytes(cust_num))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("postPaidType"), Bytes.toBytes(paid_type))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("postEffDate"), Bytes.toBytes(eff_date))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("levelChgTs"), Bytes.toBytes(levl_change_time))

    (new ImmutableBytesWritable, put)
    })

    tuple.saveAsHadoopDataset(jobConf)
    }
    }

  • 相关阅读:
    ScheduledExecutorService改为一次性延时任务
    layer弹框倒计时结束后执行
    pom.xml如何使用本地库的jar-jar包上传到远程库-jar包安装到本地库
    Windows+WinRAR 压缩后备份文件夹
    java DES加密
    JAVA RSA加密公私钥
    Microsoft 语音服务异常 java.lang.UnsatisfiedLinkError: com.micros oft.cognitiveservices.speech.internal.carbon_javaJNI.swig_module_init()
    Java 线程池
    jsp页面导入excel文件的步骤及配置
    正则表达式校验时间格式(2018-01-02)
  • 原文地址:https://www.cnblogs.com/yin-fei/p/12055077.html
Copyright © 2011-2022 走看看