zoukankan      html  css  js  c++  java
  • hadoop2-HBase的Java API操作

    Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。

    项目结构如下:

    我使用的Hbase的版本是

    hbase-0.98.9-hadoop2-bin.tar.gz

    大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。

    接口类:

    /hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java

     1 package com.b510.hbase.util.dao;
     2 
     3 import java.util.List;
     4 
     5 import org.apache.hadoop.hbase.client.HTableInterface;
     6 
     7 
     8 /**
     9  * @author Hongten
    10  * @created 7 Nov 2018
    11  */
    12 public interface HbaseDao {
    13 
    14     // initial table
    15     public HTableInterface getHTableFromPool(String tableName);
    16 
    17     // check if the table is exist
    18     public boolean isHTableExist(String tableName);
    19 
    20     // create table
    21     public void createHTable(String tableName, String[] columnFamilys);
    22 
    23     // insert new row
    24     public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);
    25 
    26     // get row by row key
    27     public void getRow(String tableName, String rowKey);
    28 
    29     public void getAllRows(String tableName);
    30 
    31     // get rows by giving range
    32     public void getRowsByRange(String tableName, String startRowKey, String endRowKey);
    33 
    34     //delete row
    35     public void delRow(String tableName, String rowKey);
    36     
    37     //delete rows by row keys
    38     public void delRowsByRowKeys(String tableName, List<String> rowKeys);
    39 
    40     // auto flush data when close
    41     public void closeAutoFlush(HTableInterface table);
    42 
    43     // close table
    44     public void closeTable(HTableInterface table);
    45 
    46     // close pool connection
    47     public void closePoolConnection();
    48 
    49     // delete table
    50     public void deleteHTable(String tableName);
    51 }

    实现类:

    /hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java

      1 package com.b510.hbase.util.dao.impl;
      2 
      3 import java.io.IOException;
      4 import java.util.List;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.hbase.Cell;
      8 import org.apache.hadoop.hbase.CellUtil;
      9 import org.apache.hadoop.hbase.HBaseConfiguration;
     10 import org.apache.hadoop.hbase.HColumnDescriptor;
     11 import org.apache.hadoop.hbase.HTableDescriptor;
     12 import org.apache.hadoop.hbase.MasterNotRunningException;
     13 import org.apache.hadoop.hbase.TableName;
     14 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
     15 import org.apache.hadoop.hbase.client.Delete;
     16 import org.apache.hadoop.hbase.client.Get;
     17 import org.apache.hadoop.hbase.client.HBaseAdmin;
     18 import org.apache.hadoop.hbase.client.HTableInterface;
     19 import org.apache.hadoop.hbase.client.HTablePool;
     20 import org.apache.hadoop.hbase.client.Put;
     21 import org.apache.hadoop.hbase.client.Result;
     22 import org.apache.hadoop.hbase.client.ResultScanner;
     23 import org.apache.hadoop.hbase.client.Scan;
     24 
     25 import com.b510.hbase.util.dao.HbaseDao;
     26 
     27 /**
     28  * @author Hongten
     29  * @created 7 Nov 2018
     30  */
     31 @SuppressWarnings("deprecation")
     32 public class HbaseDaoImpl implements HbaseDao {
     33 
     34     private static Configuration conf = null;
     35     private static HBaseAdmin hAdmin;
     36     private static HTablePool pool;
     37 
     38     private static int defaultPoolSize = 5;
     39 
     40     public HbaseDaoImpl(int poolSize) {
     41         conf = HBaseConfiguration.create();
     42         conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");
     43         try {
     44             hAdmin = new HBaseAdmin(conf);
     45             // the default pool size is 5.
     46             pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);
     47         } catch (MasterNotRunningException e) {
     48             e.printStackTrace();
     49         } catch (ZooKeeperConnectionException e) {
     50             e.printStackTrace();
     51         } catch (IOException e) {
     52             e.printStackTrace();
     53         }
     54     }
     55 
     56     @Override
     57     public HTableInterface getHTableFromPool(String tableName) {
     58         HTableInterface table = pool.getTable(tableName);
     59         return table;
     60     }
     61 
     62     @Override
     63     public boolean isHTableExist(String tableName) {
     64         try {
     65             return hAdmin.tableExists(tableName);
     66         } catch (IOException e) {
     67             e.printStackTrace();
     68         }
     69         return false;
     70     }
     71 
     72     @Override
     73     public void createHTable(String tableName, String[] columnFamilys) {
     74         if (!isHTableExist(tableName)) {
     75             HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
     76             // The Hbase suggested the number of column family should be less than 3.
     77             // Normally, there only have 1 column family.
     78             for (String cfName : columnFamilys) {
     79                 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);
     80                 tableDescriptor.addFamily(hColumnDescriptor);
     81             }
     82             try {
     83                 hAdmin.createTable(tableDescriptor);
     84             } catch (IOException e) {
     85                 e.printStackTrace();
     86             }
     87             System.out.println("The table [" + tableName + "]  is created.");
     88         } else {
     89             System.out.println("The table [" + tableName + "]  is existing already.");
     90         }
     91 
     92     }
     93 
     94     @Override
     95     public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {
     96         if (isHTableExist(tableName)) {
     97             HTableInterface table = getHTableFromPool(tableName);
     98             Put put = new Put(rowKey.getBytes());
     99             put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());
    100             try {
    101                 table.put(put);
    102             } catch (IOException e) {
    103                 e.printStackTrace();
    104             }
    105             System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "].");
    106             closeTable(table);
    107         } else {
    108             System.out.println("The table [" + tableName + "] does not exist.");
    109         }
    110     }
    111 
    112     @Override
    113     public void getRow(String tableName, String rowKey) {
    114         if (isHTableExist(tableName)) {
    115             HTableInterface table = getHTableFromPool(tableName);
    116             Get get = new Get(rowKey.getBytes());
    117             Result result;
    118             try {
    119                 result = table.get(get);
    120                 String columnName = "";
    121                 String timeStamp = "";
    122                 String columnFamily = "";
    123                 String value = "";
    124                 for (Cell cell : result.rawCells()) {
    125                     timeStamp = String.valueOf(cell.getTimestamp());
    126                     columnFamily = new String(CellUtil.cloneFamily(cell));
    127                     columnName = new String(CellUtil.cloneQualifier(cell));
    128                     value = new String(CellUtil.cloneValue(cell));
    129 
    130                     System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
    131                 }
    132             } catch (IOException e) {
    133                 e.printStackTrace();
    134             }
    135             closeTable(table);
    136         } else {
    137             System.out.println("The table [" + tableName + "] does not exist.");
    138         }
    139     }
    140 
    141     @Override
    142     public void getAllRows(String tableName) {
    143         if (isHTableExist(tableName)) {
    144             Scan scan = new Scan();
    145             scanHTable(tableName, scan);
    146         } else {
    147             System.out.println("The table [" + tableName + "] does not exist.");
    148         }
    149     }
    150 
    151     private void scanHTable(String tableName, Scan scan) {
    152         try {
    153             HTableInterface table = getHTableFromPool(tableName);
    154             ResultScanner results = table.getScanner(scan);
    155             for (Result result : results) {
    156                 String rowKey = "";
    157                 String columnName = "";
    158                 String timeStamp = "";
    159                 String columnFamily = "";
    160                 String value = "";
    161                 for (Cell cell : result.rawCells()) {
    162                     rowKey = new String(CellUtil.cloneRow(cell));
    163                     timeStamp = String.valueOf(cell.getTimestamp());
    164                     columnFamily = new String(CellUtil.cloneFamily(cell));
    165                     columnName = new String(CellUtil.cloneQualifier(cell));
    166                     value = new String(CellUtil.cloneValue(cell));
    167 
    168                     System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
    169                 }
    170             }
    171             closeTable(table);
    172         } catch (IOException e) {
    173             e.printStackTrace();
    174         }
    175     }
    176 
    177     @Override
    178     public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {
    179         if (isHTableExist(tableName)) {
    180             Scan scan = new Scan();
    181             scan.setStartRow(startRowKey.getBytes());
    182             // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).
    183             // the hbase version is 0.98.9
    184             scan.setStopRow(endRowKey.getBytes());
    185             scanHTable(tableName, scan);
    186         } else {
    187             System.out.println("The table [" + tableName + "] does not exist.");
    188         }
    189     }
    190 
    191     @Override
    192     public void delRow(String tableName, String rowKey) {
    193         if (isHTableExist(tableName)) {
    194             HTableInterface table = getHTableFromPool(tableName);
    195             deleteRow(table, rowKey);
    196         } else {
    197             System.out.println("The table [" + tableName + "] does not exist.");
    198         }
    199     }
    200 
    201     private void deleteRow(HTableInterface table, String rowKey) {
    202         Delete del = new Delete(rowKey.getBytes());
    203         try {
    204             table.delete(del);
    205             System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "].");
    206             closeTable(table);
    207         } catch (IOException e) {
    208             e.printStackTrace();
    209         }
    210     }
    211 
    212     @Override
    213     public void delRowsByRowKeys(String tableName, List<String> rowKeys) {
    214         if (rowKeys != null && rowKeys.size() > 0) {
    215             for (String rowKey : rowKeys) {
    216                 delRow(tableName, rowKey);
    217             }
    218         }
    219     }
    220 
    221     @Override
    222     public void deleteHTable(String tableName) {
    223         if (isHTableExist(tableName)) {
    224             try {
    225                 hAdmin.disableTable(tableName.getBytes());
    226                 hAdmin.deleteTable(tableName.getBytes());
    227                 System.out.println("The table [" + tableName + "] is deleted.");
    228             } catch (IOException e) {
    229                 e.printStackTrace();
    230             }
    231         } else {
    232             System.out.println("The table [" + tableName + "] does not exist.");
    233         }
    234 
    235     }
    236 
    237     @Override
    238     public void closeAutoFlush(HTableInterface table) {
    239         table.setAutoFlush(false, false);
    240     }
    241 
    242     @Override
    243     public void closeTable(HTableInterface table) {
    244         try {
    245             table.close();
    246         } catch (IOException e) {
    247             e.printStackTrace();
    248         }
    249     }
    250 
    251     @Override
    252     public void closePoolConnection() {
    253         try {
    254             pool.close();
    255         } catch (IOException e) {
    256             e.printStackTrace();
    257         }
    258     }
    259 
    260 }

    测试类:

    /hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java

     1 package com.b510.hbase.util.dao.test;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 
     6 import org.junit.Test;
     7 
     8 import com.b510.hbase.util.dao.HbaseDao;
     9 import com.b510.hbase.util.dao.impl.HbaseDaoImpl;
    10 
    11 /**
    12  * @author Hongten
    13  * @created 7 Nov 2018
    14  */
    15 public class HbaseDaoTest {
    16 
    17     HbaseDao dao = new HbaseDaoImpl(4);
    18 
    19     public static final String tableName = "t_test";
    20     public static final String columnFamilyName = "cf1";
    21     public static final String[] CFs = { columnFamilyName };
    22 
    23     public static final String COLUMN_NAME_NAME = "name";
    24     public static final String COLUMN_NAME_AGE = "age";
    25 
    26     @Test
    27     public void main() {
    28         createTable();
    29         addRow();
    30         getRow();
    31         getAllRows();
    32         getRowsByRange();
    33         delRow();
    34         delRowsByRowKeys();
    35         deleteHTable();
    36     }
    37 
    38     public void createTable() {
    39         System.out.println("=== create table ====");
    40         dao.createHTable(tableName, CFs);
    41     }
    42 
    43     public void addRow() {
    44         System.out.println("=== insert record ====");
    45         dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten");
    46         dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22");
    47 
    48         dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom");
    49         dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25");
    50 
    51         dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone");
    52         dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30");
    53 
    54         dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs");
    55         dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24");
    56     }
    57 
    58     public void getRow() {
    59         System.out.println("=== get record ====");
    60         dao.getRow(tableName, "12345566");
    61     }
    62 
    63     public void getAllRows() {
    64         System.out.println("=== scan table ====");
    65         dao.getAllRows(tableName);
    66     }
    67 
    68     public void getRowsByRange() {
    69         System.out.println("=== scan record by giving range ====");
    70         // it will return the '12345567' and '12345568' rows.
    71         dao.getRowsByRange(tableName, "12345567", "12345569");
    72     }
    73 
    74     public void delRow() {
    75         System.out.println("=== delete record ====");
    76         dao.delRow(tableName, "12345568");
    77         // only '12345567' row.
    78         getRowsByRange();
    79     }
    80 
    81     public void delRowsByRowKeys() {
    82         System.out.println("=== delete batch records ====");
    83         List<String> rowKeys = new ArrayList<String>();
    84         rowKeys.add("12345566");
    85         rowKeys.add("12345569");
    86         dao.delRowsByRowKeys(tableName, rowKeys);
    87         // can not find the '12345566' and '12345569'
    88         getAllRows();
    89     }
    90 
    91     public void deleteHTable() {
    92         System.out.println("=== delete table ====");
    93         dao.deleteHTable(tableName);
    94     }
    95 }

    测试结果:

    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    === create table ====
    The table [t_test]  is created.
    === insert record ====
    Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten].
    Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22].
    Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom].
    Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25].
    Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone].
    Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30].
    Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs].
    Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24].
    === get record ====
    Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
    Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
    === scan table ====
    Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22].
    Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten].
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
    Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
    Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
    Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24].
    Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs].
    === scan record by giving range ====
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
    Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30].
    Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone].
    === delete record ====
    Delete from table [t_test], Rowkey=[12345568].
    === scan record by giving range ====
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
    === delete batch records ====
    Delete from table [t_test], Rowkey=[12345566].
    Delete from table [t_test], Rowkey=[12345569].
    === scan table ====
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25].
    Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom].
    === delete table ====
    The table [t_test] is deleted.

    源码下载:

    hbase-util.zip

    ========================================================

    More reading,and english is important.

    I'm Hongten

     

    大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
    Hongten博客排名在100名以内。粉丝过千。
    Hongten出品,必是精品。

    E | hongtenzone@foxmail.com  B | http://www.cnblogs.com/hongten

    ========================================================

  • 相关阅读:
    Visual Studio 常用快捷键 (二)
    Visual Studio 常用快捷键
    页游体验
    JSFL 工具
    GhostCat工具
    如何制作一个塔防游戏
    水墨
    给 想转2dx 却无从下手的aser
    两岸三地在线编程学习网站大全
    as3反射应用及简要代码
  • 原文地址:https://www.cnblogs.com/hongten/p/hongten_hadoop_hbase_java_api.html
Copyright © 2011-2022 走看看