zoukankan      html  css  js  c++  java
  • Hbase-1.1.1-java API

    1.工具类

      1 package com.lixin.stuty.hbase;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.commons.configuration.ConfigurationUtils;
      6 import org.apache.commons.lang.StringUtils;
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.hbase.Cell;
      9 import org.apache.hadoop.hbase.HBaseConfiguration;
     10 import org.apache.hadoop.hbase.HColumnDescriptor;
     11 import org.apache.hadoop.hbase.HTableDescriptor;
     12 import org.apache.hadoop.hbase.TableName;
     13 import org.apache.hadoop.hbase.client.Admin;
     14 import org.apache.hadoop.hbase.client.Connection;
     15 import org.apache.hadoop.hbase.client.ConnectionFactory;
     16 import org.apache.hadoop.hbase.client.Get;
     17 import org.apache.hadoop.hbase.client.Put;
     18 import org.apache.hadoop.hbase.client.Result;
     19 import org.apache.hadoop.hbase.client.ResultScanner;
     20 import org.apache.hadoop.hbase.client.Scan;
     21 import org.apache.hadoop.hbase.client.Table;
     22 /**
     23  * hbase for version 1.1.1
     24  * @author Administrator
     25  *
     26  */
     27 public class HBaseUtil {
     28     public static final String ZK_QUORUM  = "hbase.zookeeper.quorum";
     29     public static final String ZK_CLIENTPORT  = "hbase.zookeeper.property.clientPort";
     30     private Configuration conf = HBaseConfiguration.create();
     31     private Connection connection ;
     32     private Admin admin;
     33     
     34     public HBaseUtil(String zk_quorum) {
     35         conf.set(ZK_QUORUM, zk_quorum);
     36         init();
     37     }
     38     
     39     public HBaseUtil(String zk_quorum,String zk_clientPort) {
     40         conf.set(ZK_QUORUM, zk_quorum);
     41         conf.set(ZK_CLIENTPORT, zk_clientPort);
     42         init();
     43     }
     44 
     45     private void init(){
     46         try {
     47             //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
     48             connection = ConnectionFactory.createConnection(conf);
     49             admin  = connection.getAdmin();
     50         } catch (IOException e) {
     51             e.printStackTrace();
     52         }
     53     }
     54     public void close(){
     55         try {
     56             if(admin != null) admin.close();
     57             if(connection!=null) connection.close();
     58         } catch (IOException e) {
     59             e.printStackTrace();
     60         }
     61     }
     62     /**
     63      * 创建一个表
     64      * @param table_name 表名称
     65      * @param family_names 列族名称集合
     66      * @throws IOException 
     67      */
     68     public void create(String table_name,String... family_names) throws IOException{
     69         //获取TableName
     70         TableName tableName = TableName.valueOf(table_name);
     71         //table 描述
     72         HTableDescriptor htabledes =  new HTableDescriptor(tableName);
     73         for(String family_name : family_names){
     74             //column 描述
     75             HColumnDescriptor family = new HColumnDescriptor(family_name);
     76             htabledes.addFamily(family);
     77         }
     78         admin.createTable(htabledes);
     79     }
     80     /**
     81      * 增加一条记录
     82      * @param table_name 表名称
     83      * @param row    rowkey
     84      * @param family 列族名称
     85      * @param qualifier 列族限定符(可以为null)
     86      * @param value 值
     87      * @throws IOException
     88      */
     89     public void addColumn(String table_name,String row, String family,String qualifier,String value) throws IOException{
     90         //表名对象
     91         TableName tableName = TableName.valueOf(table_name);
     92         //表对象
     93         Table table = connection.getTable(tableName);
     94         // put对象 负责录入数据
     95         Put put = new Put(row.getBytes());
     96         put.addColumn(family.getBytes(), qualifier.getBytes(), value.getBytes());
     97         table.put(put);
     98     }
     99     /**
    100      * 判断表是否存在
    101      */
    102     public boolean tableExist(String table_name) throws IOException{
    103         return admin.tableExists(TableName.valueOf(table_name));
    104     }
    105     /**删除表*/
    106     public void deleteTable(String table_name) throws IOException{
    107         TableName tableName = TableName.valueOf(table_name);
    108         if(admin.tableExists(tableName)){
    109             admin.disableTable(tableName);
    110             admin.deleteTable(tableName);
    111         }
    112     }
    113     /**
    114      * 查询单个row的记录
    115      * @param table_name 表明
    116      * @param row  行键
    117      * @param family  列族
    118      * @param qualifier  列族成员
    119      * @return
    120      * @throws IOException
    121      */
    122     public Cell[] getRow(String table_name,String row,String family,String qualifier) throws IOException{
    123         Cell[] cells = null;
    124         //check
    125         if(StringUtils.isEmpty(table_name)||StringUtils.isEmpty(row)){
    126             return null;
    127         }
    128         //Table
    129         Table table = connection.getTable(TableName.valueOf(table_name));
    130         Get get = new Get(row.getBytes());
    131         //判断在查询记录时,是否限定列族和子列(qualifier).
    132         if(StringUtils.isNotEmpty(family)&&StringUtils.isNotEmpty(qualifier)){
    133             get.addColumn(family.getBytes(), qualifier.getBytes());
    134         }
    135         if(StringUtils.isNotEmpty(family)&&StringUtils.isEmpty(qualifier)){
    136             get.addFamily(family.getBytes());
    137         }
    138         Result result = table.get(get);
    139         cells = result.rawCells();
    140         return cells;
    141     }
    142     /**
    143      * 获取表中的所有记录,可以指定列族,列族成员,开始行键,结束行键.
    144      * @param table_name
    145      * @param family
    146      * @param qualifier
    147      * @param startRow
    148      * @param stopRow
    149      * @return
    150      * @throws IOException
    151      */
    152     public ResultScanner getScan(String table_name,String family,String qualifier,String startRow,String stopRow) throws IOException{
    153         ResultScanner resultScanner = null;
    154         
    155         //Table
    156         Table table = connection.getTable(TableName.valueOf(table_name));
    157         Scan scan = new Scan();
    158         if(StringUtils.isNotBlank(family)&& StringUtils.isNotEmpty(qualifier)){
    159             scan.addColumn(family.getBytes(), qualifier.getBytes());
    160         }
    161         if(StringUtils.isNotEmpty(family)&& StringUtils.isEmpty(qualifier)){
    162             scan.addFamily(family.getBytes());
    163         }
    164         if(StringUtils.isNotEmpty(startRow)){
    165             scan.setStartRow(startRow.getBytes());
    166         }
    167         if(StringUtils.isNotEmpty(stopRow)){
    168             scan.setStopRow(stopRow.getBytes());
    169         }
    170         resultScanner = table.getScanner(scan);
    171         
    172         return resultScanner;
    173     }
    174 }

    2.测试:

     1 package com.lixin.stuty.hbase;
     2 
     3 import static org.junit.Assert.*;
     4 
     5 import java.io.IOException;
     6 import java.util.Iterator;
     7 
     8 import org.apache.hadoop.hbase.Cell;
     9 import org.apache.hadoop.hbase.CellUtil;
    10 import org.apache.hadoop.hbase.client.Result;
    11 import org.apache.hadoop.hbase.client.ResultScanner;
    12 import org.apache.hadoop.hbase.util.Bytes;
    13 import org.junit.Before;
    14 import org.junit.Test;
    15 
    16 public class HBaseUtilTest {
    17     private HBaseUtil hu = null;
    18     @Before
    19     public void init(){
    20         String zk_quorum  = "172.21.135.148";
    21         String zk_clientPort = "2181";
    22         hu = new HBaseUtil(zk_quorum, zk_clientPort);
    23     }
    24     @Test
    25     public void testCreate() throws IOException {
    26         String table_name = "users";
    27         String[] fanily_names = new String[]{"user_id","address","info"}; 
    28         hu.create(table_name, fanily_names);
    29         hu.close();
    30     }
    31     @Test
    32     public void testIsExist() throws IOException{
    33         String table_name = "sitech";
    34         System.out.println(hu.tableExist(table_name));
    35         hu.close();
    36     }
    37     @Test
    38     public void testDelete() throws IOException{
    39         String table_name = "person1";
    40         hu.deleteTable(table_name);
    41         hu.close();
    42     }
    43     @Test
    44     public void testGetRow() throws IOException{
    45         String table_name = "users";
    46         String row = "xiaoming";
    47         String family = "address";
    48         String qualifier = "";
    49         Cell[] cells = hu.getRow(table_name, row, family, qualifier);
    50         for(Cell cell : cells){
    51             String recode_row = Bytes.toString(CellUtil.cloneRow(cell));
    52             String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
    53             String qualifier1 = Bytes.toString(CellUtil.cloneQualifier(cell));
    54             String value = Bytes.toString(CellUtil.cloneValue(cell));
    55             System.out.println(recode_row+"	"+family1+"	"+qualifier1+"	"+value);
    56         }
    57         hu.close();
    58     }
    59     @Test
    60     public void testGetScanner() throws IOException{
    61         String table_name = "users";
    62         String family = "address";
    63         String qualifier = "city";
    64         String startRow = "xiaoming";
    65         String stopRow = "xiaoming";
    66         
    67         ResultScanner resultScanner  = hu.getScan(table_name, family, qualifier, startRow, stopRow);
    68         Iterator<Result> iterator = resultScanner.iterator();
    69         while(iterator.hasNext()){
    70             Result result = iterator.next();
    71             Cell[] rawCells = result.rawCells();
    72             for(Cell cell : rawCells){
    73                 String recode_row = Bytes.toString(CellUtil.cloneRow(cell));
    74                 String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
    75                 String qualifier1 = Bytes.toString(CellUtil.cloneQualifier(cell));
    76                 String value = Bytes.toString(CellUtil.cloneValue(cell));
    77                 System.out.println(recode_row+"	"+family1+"	"+qualifier1+"	"+value);
    78             }
    79         }
    80         hu.close();
    81     }
    82 }
  • 相关阅读:
    eclipse pom文件报错 org.apache.maven.archiver.MavenArchiver.getManifest(org.apache.maven.project.Mav (Click for 1 more)
    严重: Compilation error org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException
    powercfg -duplicatescheme 设置电源方案
    测试3
    测试2
    markdonwn 测试1
    Java线程池-线程工厂ThreadFactory
    Java线程池-拒绝策略
    一文读懂Base64编码
    ThreadLocal
  • 原文地址:https://www.cnblogs.com/a198720/p/4664642.html
Copyright © 2011-2022 走看看