zoukankan      html  css  js  c++  java
  • Spark2.x写Hbase1-2.x

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    /**
      * Spark写HBase
      */
    object SparkWriteHbase {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
        val sc = new SparkContext(conf)
        val tableName = "student"
    
    
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    
        val job = new Job(sc.hadoopConfiguration)
    
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
    
    
        val inDataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27"))
    
        val rdd = inDataRDD.map(_.split(",")).map(arr=>{
          val put = new Put(Bytes.toBytes(arr(0)))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3)))
          (new ImmutableBytesWritable(),put)
        })
    
        rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
    
    
      }
    
    }
  • 相关阅读:
    Django请求的生命周期图解及流程
    Django中请求的生命周期
    127.0.0.1和0.0.0.0和本机IP的区别
    Linux中errno的含义
    wireshark 过滤表达式
    GDB调试
    【LinuxShell】grep用法详解
    Linux下 tftp 服务器的安装与使用
    设备掐断重启
    GDB disassemble
  • 原文地址:https://www.cnblogs.com/zxbdboke/p/12749534.html
Copyright © 2011-2022 走看看