zoukankan      html  css  js  c++  java
  • HBase1.2.0增删改查Scala代码实现

    增删改查工具类

    class HbaseUtils {
    
      /**
        * 获取管理员对象
        *
        * @param conf 对hbase client配置一些参数
        * @return 返回hbase的HBaseAdmin管理员对象
        */
      def getAdmin(conf: Configuration): HBaseAdmin = {
        val conn = ConnectionFactory.createConnection(conf)
        conn.getAdmin().asInstanceOf[HBaseAdmin]
      }
    
      /**
        * 根据指定的管理员,表名,列族名称创建表
        *
        * @param admin         创建HBaseAdmin对象
        * @param tName         需要创建的表名
        * @param columnFamilys 列族名称的集合
        */
      def createTable(admin: HBaseAdmin, tName: String, columnFamilys: List[String]): Unit = {
        if (admin.tableExists(TableName.valueOf(tName))) {
          println("table already exists!")
          admin.disableTable(tName)
          admin.deleteTable(tName)
        }
        try {
          val tableDesc = new HTableDescriptor(TableName.valueOf(tName))
          columnFamilys.foreach(columnFamilysName => tableDesc.addFamily(new HColumnDescriptor(columnFamilysName)))
          admin.createTable(tableDesc)
          println("create table success!")
        }
        catch {
          case e: Exception => e.printStackTrace()
        }
      }
    
      /**
        * 单条数据插入 根据表名、rowkey、列族名、列名、值、增加数据
        *
        * @param conf         当前对象的配置信息
        * @param tableName    表名
        * @param rowKey       行键
        * @param columnFamily 列族名称
        * @param column       列
        * @param value        值
        */
      def insertData(conf: Configuration, tableName: String, rowKey: String, columnFamily: String, column: String, value: String): Unit = {
    
        val con = ConnectionFactory.createConnection(conf)
        val table = con.getTable(TableName.valueOf(tableName))
        val put = new Put(Bytes.toBytes(rowKey))
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value))
        table.put(put)
        close(table, con)
        print("数据插入成功")
      }
    
      /**
        * 批量插入数据
        *
        * @param conf         当前对象的配置信息
        * @param tableName    表名
        * @param rowKey       行键
        * @param columnFamily 列族
        * @param column       列
        * @param value        值
        */
      def batchInsertData(conf: Configuration, tableName: String, rowKey: String, columnFamily: String, column: String, value: String): Unit = {
        val con = ConnectionFactory.createConnection(conf)
        val table: BufferedMutator = con.getBufferedMutator(TableName.valueOf(tableName))
        val p = new Put(Bytes.toBytes(rowKey))
        p.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value))
        val mutations = new util.ArrayList[Mutation]()
        mutations.add(p)
        table.mutate(mutations)
        table.flush()
        if (con != null)
          con.close()
        if (table != null)
          table.close()
        print("数据插入成功")
      }
    
      /**
        * 删除数据
        *
        * @param conf      当前对象的配置信息
        * @param tableName 表名
        */
      def deleteData(conf: Configuration, tableName: String): Unit = {
        val admin = getAdmin(conf)
        try {
          if (admin.tableExists(tableName)) {
            admin.disableTable(tableName)
            admin.deleteTable(tableName)
          }
        } catch {
          case e: Exception => e.printStackTrace()
        }
        print("删除数据成功")
      }
    
      /**
        * 根据指定的配置信息全表扫描指定的表
        *
        * @param conf      配置信息
        * @param tableName 表名
        * @return Cell单元格数组
        */
      def getByScan(conf: Configuration, tableName: String): ArrayBuffer[Array[Cell]] = {
        var arrayBuffer = ArrayBuffer[Array[Cell]]()
        val scanner = new Scan()
        val conn = ConnectionFactory.createConnection(conf)
        val table = conn.getTable(TableName.valueOf(tableName))
        val results = table.getScanner(scanner)
        var res: Result = results.next()
        while (res != null) {
          arrayBuffer += res.rawCells()
          res = results.next()
        }
        arrayBuffer
      }
    
      /**
        * 根据行键获取具体的某一个行
        *
        * @param conf      配置信息
        * @param tableName 表名
        * @param row       行键
        * @return Array[Cell]
        */
      def getRow(conf: Configuration, tableName: String, row: String): Array[Cell] = {
        val con = ConnectionFactory.createConnection(conf)
        val table = con.getTable(TableName.valueOf(tableName))
        val get = new Get(Bytes.toBytes(row))
        val res = table.get(get)
        res.rawCells()
      }
    
      /**
        * 删除指定表的指定row数据
        *
        * @param conf      配置信息
        * @param tableName 表名
        * @param row       行键
        */
      def delRow(conf: Configuration, tableName: String, row: String): Unit = {
        val con = ConnectionFactory.createConnection(conf)
        val table = con.getTable(TableName.valueOf(tableName))
        table.delete(new Delete(Bytes.toBytes(row)))
        println("删除数据成功")
      }
    
      def close(table: Table, con: Connection): Unit = {
        if (table != null)
          table.close()
        if (con != null)
          con.close()
      }
    
    }

    测试用例

    class HbaseTest {
    
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", "master66")
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      val hBaseUtils = new HbaseUtils()
      val admin = hBaseUtils.getAdmin(conf)
    
      /**
        * 创建表
        */
      @Test
      def createTable(): Unit = {
        //    val list = List("family1", "family2")
        val list = List("Stat2")
        hBaseUtils.createTable(admin, "PageViewStream2", list)
      }
    
      /**
        * 插入数据
        */
      @Test
      def insertData(): Unit = {
        hBaseUtils.insertData(conf, "test2", "rowkey1", "family1", "李四", "lisi2")
      }
    
      /**
        * 批量插入数据
        */
      @Test
      def batchInsertData: Unit = {
        hBaseUtils.batchInsertData(conf, "test2", "rowkey2", "family2", "name", "lisi")
      }
    
      /**
        * 获取指定的一行
        */
      @Test
      def getRow: Unit = {
        val row: Array[Cell] = hBaseUtils.getRow(conf, "test2", "rowkey2")
        row.foreach(a => {
          println(new String(a.getRow()) + " " + a.getTimestamp + " " + new String(a.getFamily()) + " " + new String(a.getValue))
        })
      }
    
      /**
        * 删除指定的一行
        */
      @Test
      def delRow: Unit = {
        hBaseUtils.delRow(conf, "test2", "rowkey1")
      }
    
      /**
        * 扫描全表
        */
      @Test
      def getByScan: Unit = {
        val all: ArrayBuffer[Array[Cell]] = hBaseUtils.getByScan(conf, "PageViewStream2")
        all.foreach(arrBuffer => arrBuffer.foreach(cell => {
          println(new String(cell.getRowArray, cell.getRowOffset, cell.getRowLength) + "-->Row")
          println(cell.getTimestamp + "-->timpsstamp  ")
          println(new String(cell.getFamilyArray, cell.getFamilyOffset, cell.getFamilyLength) + "-->family  ")
          println(new String(cell.getValueArray, cell.getValueOffset, cell.getValueLength) + "-->value  ")
          println(new String(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength) + " -->Tags")
        }))
      }
    }
  • 相关阅读:
    [转]CTO谈豆瓣网和校内网技术架构变迁
    Hashtable Dictionary[必看]
    DotFuscator 小记
    博客园随笔添加自己的版权信息 [转]
    [转]关于支付宝API开发的一点心得
    .NET下实现分布式缓存系统Memcached
    4.9 利用对应的泛型替换Hashtable[转]
    dllhost.exe 解释
    C#命名规范,SqlServer命名规范
    用XenoCode 2006 加密dll(.NET
  • 原文地址:https://www.cnblogs.com/itboys/p/9156789.html
Copyright © 2011-2022 走看看