zoukankan      html  css  js  c++  java
  • HBase读写的几种方式(二)spark篇

    1. HBase读写的方式概况

    主要分为:

    1. 纯Java API读写HBase的方式;
    2. Spark读写HBase的方式;
    3. Flink读写HBase的方式;
    4. HBase通过Phoenix读写的方式;

    第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

    注意:

    这里我们使用HBase2.1.2版本,spark2.4版本,scala-2.12版本,以下代码都是基于该版本开发的。

    2. Spark上读写HBase

     Spark上读写HBase主要分为新旧两种API,另外还有批量插入HBase的,通过Phoenix操作HBase的。

    2.1 spark读写HBase的新旧API

    2.1.1 spark写数据到HBase

    使用旧版本saveAsHadoopDataset保存数据到HBase上。

    /**
     * saveAsHadoopDataset
     */
    def writeToHBase(): Unit ={
      // 屏蔽不必要的日志显示在终端上
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    
      /* spark2.0以前的写法
      val conf = new SparkConf().setAppName("SparkToHBase").setMaster("local")
      val sc = new SparkContext(conf)
      */
      val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
      val sc = sparkSession.sparkContext
    
      val tableName = "test"
    
      //创建HBase配置
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") //设置zookeeper集群,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
      hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") //设置zookeeper连接端口,默认2181
      hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    
      //初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
      val jobConf = new JobConf(hbaseConf)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
    
      val dataRDD = sc.makeRDD(Array("12,jack,16", "11,Lucy,15", "15,mike,17", "13,Lily,14"))
    
      val data = dataRDD.map{ item =>
          val Array(key, name, age) = item.split(",")
          val rowKey = key.reverse
          val put = new Put(Bytes.toBytes(rowKey))
          /*一个Put对象就是一行记录,在构造方法中指定主键
           * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
           * Put.addColumn 方法接收三个参数:列族,列名,数据*/
          put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
          put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
          (new ImmutableBytesWritable(), put)
      }
      //保存到HBase表
      data.saveAsHadoopDataset(jobConf)
      sparkSession.stop()
    }

     使用新版本saveAsNewAPIHadoopDataset保存数据到HBase上

    a.txt文件内容为:

    100,hello,20
    101,nice,24
    102,beautiful,26
    /**
     * saveAsNewAPIHadoopDataset
     */
     def writeToHBaseNewAPI(): Unit ={
       // 屏蔽不必要的日志显示在终端上
       Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
       val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
       val sc = sparkSession.sparkContext
    
       val tableName = "test"
       val hbaseConf = HBaseConfiguration.create()
       hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
       hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
       hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
    
       val jobConf = new JobConf(hbaseConf)
       //设置job的输出格式
       val job = Job.getInstance(jobConf)
       job.setOutputKeyClass(classOf[ImmutableBytesWritable])
       job.setOutputValueClass(classOf[Result])
       job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    
       val input = sc.textFile("v2120/a.txt")
    
       val data = input.map{item =>
       val Array(key, name, age) = item.split(",")
       val rowKey = key.reverse
       val put = new Put(Bytes.toBytes(rowKey))
       put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
       put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
       (new ImmutableBytesWritable, put)
       }
       //保存到HBase表
       data.saveAsNewAPIHadoopDataset(job.getConfiguration)
       sparkSession.stop()
    }

    2.1.2 spark从HBase读取数据

    使用newAPIHadoopRDD从hbase中读取数据,可以通过scan过滤数据

    /**
     * scan
     */
     def readFromHBaseWithHBaseNewAPIScan(): Unit ={
       //屏蔽不必要的日志显示在终端上
       Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
       val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local").getOrCreate()
       val sc = sparkSession.sparkContext
    
       val tableName = "test"
       val hbaseConf = HBaseConfiguration.create()
       hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
       hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
       hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName)
    
       val scan = new Scan()
       scan.addFamily(Bytes.toBytes("cf1"))
       val proto = ProtobufUtil.toScan(scan)
       val scanToString = new String(Base64.getEncoder.encode(proto.toByteArray))
       hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, scanToString)
    
       //读取数据并转化成rdd TableInputFormat是org.apache.hadoop.hbase.mapreduce包下的
       val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
       val dataRDD = hbaseRDD
         .map(x => x._2)
         .map{result =>
           (result.getRow, result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("name")), result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("age")))
         }.map(row => (new String(row._1), new String(row._2), new String(row._3)))
         .collect()
         .foreach(r => (println("rowKey:"+r._1 + ", name:" + r._2 + ", age:" + r._3)))
    }

    2.2 spark利用BulkLoad往HBase批量插入数据

    BulkLoad原理是先利用mapreduce在hdfs上生成相应的HFlie文件,然后再把HFile文件导入到HBase中,以此来达到高效批量插入数据。

    /**
     * 批量插入 多列
     */
     def insertWithBulkLoadWithMulti(): Unit ={
    
       val sparkSession = SparkSession.builder().appName("insertWithBulkLoad").master("local[4]").getOrCreate()
       val sc = sparkSession.sparkContext
    
       val tableName = "test"
       val hbaseConf = HBaseConfiguration.create()
       hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
       hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
       hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    
       val conn = ConnectionFactory.createConnection(hbaseConf)
       val admin = conn.getAdmin
       val table = conn.getTable(TableName.valueOf(tableName))
    
       val job = Job.getInstance(hbaseConf)
       //设置job的输出格式
       job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
       job.setMapOutputValueClass(classOf[KeyValue])
       job.setOutputFormatClass(classOf[HFileOutputFormat2])
       HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(tableName)))
    
       val rdd = sc.textFile("v2120/a.txt")
         .map(_.split(","))
         .map(x => (DigestUtils.md5Hex(x(0)).substring(0, 3) + x(0), x(1), x(2)))
         .sortBy(_._1)
         .flatMap(x =>
           {
             val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]
             val kv1: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(x._2 + ""))
             val kv2: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(x._3 + ""))
             listBuffer.append((new ImmutableBytesWritable, kv2))
             listBuffer.append((new ImmutableBytesWritable, kv1))
             listBuffer
           }
         )
       //多列的排序,要按照列名字母表大小来
       
       isFileExist("hdfs://node1:9000/test", sc)
    
       rdd.saveAsNewAPIHadoopFile("hdfs://node1:9000/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
       val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
       bulkLoader.doBulkLoad(new Path("hdfs://node1:9000/test"), admin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
    }
    
    /**
     * 判断hdfs上文件是否存在,存在则删除
     */
    def isFileExist(filePath: String, sc: SparkContext): Unit ={
      val output = new Path(filePath)
      val hdfs = FileSystem.get(new URI(filePath), new Configuration)
      if (hdfs.exists(output)){
        hdfs.delete(output, true)
      }
    }

    2.3 spark利用Phoenix往HBase读写数据

    利用Phoenix,就如同msyql等关系型数据库的写法,需要写jdbc

    def readFromHBaseWithPhoenix: Unit ={
       //屏蔽不必要的日志显示在终端上
       Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    
       val sparkSession = SparkSession.builder().appName("SparkHBaseDataFrame").master("local[4]").getOrCreate()
    
       //表小写,需要加双引号,否则报错
       val dbTable = ""test""
    
       //spark 读取 phoenix 返回 DataFrame的第一种方式
       val rdf = sparkSession.read
         .format("jdbc")
         .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
         .option("url", "jdbc:phoenix:192.168.187.201:2181")
         .option("dbtable", dbTable)
         .load()
    
       val rdfList = rdf.collect()
       for (i <- rdfList){
         println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))
       }
       rdf.printSchema()
    
       //spark 读取 phoenix 返回 DataFrame的第二种方式
       val df = sparkSession.read
         .format("org.apache.phoenix.spark")
         .options(Map("table" -> dbTable, "zkUrl" -> "192.168.187.201:2181"))
         .load()
       df.printSchema()
       val dfList = df.collect()
       for (i <- dfList){
          println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))
       }
       //spark DataFrame 写入 phoenix,需要先建好表
       /*df.write
         .format("org.apache.phoenix.spark")
         .mode(SaveMode.Overwrite)
         .options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> "jdbc:phoenix:192.168.187.201:2181"))
         .save()
    */
       sparkSession.stop()
    }

    3. 总结

    HBase连接的几种方式(一)java篇 可以查看纯Java API读写HBase

    HBase读写的几种方式(三)flink篇 可以查看flink读写HBase

    【github地址】

    https://github.com/SwordfallYeung/HBaseDemo

    【参考资料】

    https://my.oschina.net/uchihamadara/blog/2032481

    https://www.cnblogs.com/simple-focus/p/6879971.html

    https://www.cnblogs.com/MOBIN/p/5559575.html

    https://blog.csdn.net/Suubyy/article/details/80892023

    https://www.jianshu.com/p/b09283b14d84

    https://www.jianshu.com/p/8e3fdf70dc06

    https://www.cnblogs.com/wumingcong/p/6044038.html

    https://blog.csdn.net/zhuyu_deng/article/details/43192271

    https://www.jianshu.com/p/4c908e419b60

    https://blog.csdn.net/Colton_Null/article/details/83387995

    https://www.jianshu.com/p/b09283b14d84

    https://cloud.tencent.com/developer/article/1189464

    https://blog.bcmeng.com/post/hbase-bulkload.html Hive数据源使用的HDFS集群和HBase表使用的HDFS集群不是同一个集群的做法

  • 相关阅读:
    iptables防火墙-SNAT和DNAT
    exists & in
    系统演化之路
    promethue 采集traefik指标列表
    Grafana中变量
    Wireshark抓包
    http协议格式 基于ABNF语义定义
    Prometheus 管理常用知识点
    python时间转换
    通过salt-api获取minion的ip地址
  • 原文地址:https://www.cnblogs.com/swordfall/p/10517177.html
Copyright © 2011-2022 走看看