zoukankan      html  css  js  c++  java
  • spark_hive_to_hbase

    object Hive_ODS_PaidMember {

    private val logger = LoggerFactory.getLogger(Hive_ODS_PaidMember.getClass)

    val prop = new Properties()
    val is: InputStream = this.getClass().getResourceAsStream("/hbase.properties")
    prop.load(is)

    val conf = new SparkConf().setAppName("ReadHiveToHbase")
    val sc = new SparkContext(conf)

    val configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.property.clientPort", prop.getProperty("hbase.clientPort"))
    configuration.set("hbase.zookeeper.quorum", prop.getProperty("hbase.quorum"))
    // configuration.set("hbase.master", "slave01-sit.cnsuning.com:60000")

    def main(args: Array[String]): Unit = {

    val jobConf = new JobConf(configuration)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "ns_sospdm:mds_offline_data")

    val sparkConf = new SparkConf().setAppName("HiveToHbase")
    sparkConf.set("spark.sql.hive.metastorePartitionPruning", "false")
    var querySql = "select * from sospdm.tdm_super_cust_status_init"
    val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val resultDF = session.sql(querySql)
    val tuple = resultDF.rdd.filter(row => {
    !CommonUtils.isEmpty(row.getAs[String]("cust_num"))
    }).map(row => {
    val cust_num: String = row.getAs[String]("cust_num")
    val paid_type: String = row.getAs[String]("paid_type")
    val eff_date: String = row.getAs[String]("eff_date")
    val levl_change_time: String = row.getAs[String]("levl_change_time")

    val put = new Put(Bytes.toBytes(cust_num.reverse))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("cust_num"), Bytes.toBytes(cust_num))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("postPaidType"), Bytes.toBytes(paid_type))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("postEffDate"), Bytes.toBytes(eff_date))
    put.add(Bytes.toBytes("bean_json"), Bytes.toBytes("levelChgTs"), Bytes.toBytes(levl_change_time))

    (new ImmutableBytesWritable, put)
    })

    tuple.saveAsHadoopDataset(jobConf)
    }
    }

  • 相关阅读:
    练习二十七:递归函数应用
    mysql8.0数据库忘记密码时进行修改方法
    格式化字符串两种方式
    练习二十六:阶乘计算(递归)
    练习二十五:阶乘之和计算
    Dapper批量添加
    c# FTP操作类(转)
    c# 依赖注入之---反射(转)
    c# 依赖注入之---setterInjection(转)
    php遍历数组赋值
  • 原文地址:https://www.cnblogs.com/yin-fei/p/12055077.html
Copyright © 2011-2022 走看看