zoukankan      html  css  js  c++  java
  • 2、通过HBase API进行开发

    一、将HBase的jar包及hbase-site.xml添加到IDE

    1、到安装HBase集群的任意一台机器上找到HBase的安装目录,到lib目录下下载HBase需要的jar包,然后再到conf目录下下载hbase-site.xml

    2、在ide中新建一个java项目,然后再右击"项目名",新建2个文件夹,分别是"lib""conf"

    3、将1步骤中下载的jar包放到2步骤中的lib目录下,并且将hbase-site.xml放到conf目录下,并将2个文件夹添加到classpath下。

    二、使用HBase的基本API操作HBase

    通过编码(java)的形式对HBase进行一系列的管理涉及到对表的管理、数据的操作等。

    下面这段是公共代码片段:

    private static Configuration conf = null;
    static {
      Configuration HBASE_CONFIG = new Configuration();
      // 与hbase/conf/hbase-site.xml 中 hbase.zookeeper.quorum 配置的值相同
      HBASE_CONFIG.set("hbase.zookeeper.quorum", "c7004,c7003,c7005");
      // 与hbase/conf/hbase-site.xml 中 hbase.zookeeper.property.clientPort 配置的值相同
      HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
      conf = HBaseConfiguration.create(HBASE_CONFIG);
    }

    1、对表的创建、删除、显示以及修改等,可以用Admin, 一旦创建了表,那么可以通过HTable的 实例来访问表,每次可以往表里增加数据。

    /**
    * 创建表
    * @param tableName 表名称
    * @param familys   列族
    * @param force     是否强制建表
    * @throws Exception
    */
    public static void creatTable(String tableName, String[] familys,boolean force) throws Exception {
      //建立连接
      Connection conn = ConnectionFactory.createConnection(conf);
      //表管理类
      Admin admin = conn.getAdmin();
      if (admin.tableExists(TableName.valueOf(tableName))) {
        if(force){
          //禁用表
          admin.disableTable(TableName.valueOf(tableName));
          //删除表
          admin.deleteTable(TableName.valueOf(tableName));
          System.out.println("开始创建表!");
        }else{
          System.out.println("表已存在!");
        }
      } else {
        //定义表名
        HTableDescriptor tblDesc = new HTableDescriptor(TableName.valueOf(tableName));
        for (int i = 0; i < familys.length; i++) {
          //定义列族
          HColumnDescriptor clmDesc = new HColumnDescriptor(familys[i]);
          //将列族添加到表中
          tblDesc.addFamily(clmDesc);
        }
        //执行建表
        admin.createTable(tblDesc);
        System.out.println("创建表" + tableName + "成功!");
      }
      //关闭表管理
      admin.close();
      //关闭连接
      conn.close();
    }

    2、插入数据

    创建一个Put对象,在这个Put对象里可以指定要给哪个列增加数据,以及当前的时间戳等值,然后通过调用HTable.put(Put)来 提交操作,子猴在这里提请注意的是:在创建Put对象的时候,你必须指定一个行(Row)值,在构造Put对象的时候作为参数传入。

    /**
    * 向表中插入数据
    * @param tableName
    * @param rowKey
    * @param family
    * @param qualifier
    * @param value
    * @throws IOException 
    */
    public static void insertData2Tbl(String tableName,String rowKey,String family,String qualifier,String value) throws IOException{
      //建立连接
      Connection conn = ConnectionFactory.createConnection(conf);
      //建立表连接
      Table tbl=conn.getTable(TableName.valueOf(tableName));
      //用行键实例化Put
      Put put=new Put(rowKey.getBytes());
      //指定列族名、列名和值
      put.addColumn(family.getBytes(), qualifier.getBytes(), value.getBytes());
      //执行put
      tbl.put(put);
      //关闭表
      tbl.close();
      //关闭连接
      conn.close();
    }

    3、获取数据

    要获取数据,使用Get对 象,Get对象同Put对象一样有好几个构造函数,通常在构造的时候传入行值,表示取第几行的数据,通过HTable.get(Get)来 调用。

    /**
    * 从表中取值
    * @param tableName  表名
    * @param rowKey     行键
    * @param family     列族
    * @param qualifier  列
    * @return
    * @throws IOException
    */
    public static String readData4Tbl(String tableName,String rowKey,String family,String qualifier) throws IOException{
      //建立连接
      Connection conn = ConnectionFactory.createConnection(conf);
      //建立表连接
      Table tbl=conn.getTable(TableName.valueOf(tableName));
      //用行键实例化Get
      Get get=new Get(rowKey.getBytes());
      //增加列族名和列名条件
      get.addColumn(family.getBytes(),qualifier.getBytes() );
      //执行,返回结果
      Result result=tbl.get(get);
      //取出结果
      String valStr=Bytes.toString(result.getValue(family.getBytes(), qualifier.getBytes()));
      //关闭表
      tbl.close();
      //关闭连接
      conn.close();
      return valStr;
    }

    4、扫描

    通过Scan可以对表中的行键范围进行浏览,得到每一行的信息,比如列名,时间戳等,Scan 相当于一个游标,通过next()来浏览下一个,通过调用HTable.getScanner(Scan) 来返回一个ResultScanner对象。HTable.get(Get)HTable.getScanner(Scan) 都是返回一个Result Result是一个KeyValue的链表。

    /**
    * 扫描行键范围取值
    * @param tableName   表名
    * @param startRow    起始行键
    * @param stopRow     结束行键
    * @param family      列族
    * @param qualifier   列
    * @throws IOException
    */
    public static void scanRowRange4Tbl(String tableName,String startRow,String stopRow,String family,String qualifier) throws IOException{
      //建立连接
      Connection conn = ConnectionFactory.createConnection(conf);
      //建立表连接
      Table tbl=conn.getTable(TableName.valueOf(tableName));
      //初始化Scan实例
      Scan scan=new Scan();
      //指定开始行键
      scan.setStartRow(startRow.getBytes());
      //指定结束行键
      scan.setStopRow(stopRow.getBytes());
      //增加过滤条件
      scan.addColumn(family.getBytes(), qualifier.getBytes());
      //返回结果 
      ResultScanner rss=tbl.getScanner(scan);
      //迭代并取出结果
      for(Result rs:rss){
        String valStr=Bytes.toString(rs.getValue(family.getBytes(), qualifier.getBytes()));
        System.out.println(valStr);
      }
      //关闭表
      tbl.close();
      //关闭连接
      conn.close();
    }

    5、删除

    使用Delete来 删除记录,通过调用HTable.delete(Delete)来 执行删除操作。(注:删除这里有些特别,也就是删除并不是马上将数据从表中删除。)

    /**
    * 删除行键
    * @param tableName
    * @param rowKey
    * @throws IOException
    */
    public static void delRowKey4Tbl(String tableName, String rowKey) throws IOException {
      // 建立连接
      Connection conn = ConnectionFactory.createConnection(conf);
      // 建立表连接
      Table tbl = conn.getTable(TableName.valueOf(tableName));
      // 用行键来实例化Delete实例
      Delete del = new Delete(rowKey.getBytes());
      // 执行删除
      tbl.delete(del);
      // 关闭表
      tbl.close();
      // 关闭连接
      conn.close();
    }

    /**
    * 删除单元格(即列)
    * @param tableName
    * @param rowKey
    * @param family
    * @param qualifier
    * @throws IOException
    */
    public static void delCell4Tbl(String tableName, String rowKey, String family, String qualifier) throws IOException {
      // 建立连接
      Connection conn = ConnectionFactory.createConnection(conf);
      // 建立表连接
      Table tbl = conn.getTable(TableName.valueOf(tableName));
      // 用行键来实例化Delete实例
      Delete del = new Delete(rowKey.getBytes());
      // 增加过滤条件
      del.addColumn(family.getBytes(), qualifier.getBytes());
      // 执行删除
      tbl.delete(del);
      // 关闭表
      tbl.close();
      // 关闭连接
      conn.close();
    }

    6、锁

    7、新增、获取、删除在操作过程中会对所操作的行加一个锁,而浏览却不会。

    8、簇(cluster)的访问

    客户端代码通过ZooKeeper来访问找到簇,也就是说ZooKeeper quorum将被使用,那么相关的类(包)应该在客户端的类(classes)目录下,即客户端一定要找到文件hbase-site.xml

    三、Hbase的高级API

    HBase高级API主要分为三类:过滤器、计数器和协处理器。

    1、过滤器

    在设置Scal、Get的时候有一个setFilter(filter),可以在查询时添加更多的限制条件,如正则匹配、根据列值进行匹配等。HBase内置了一些常用过滤器,用户也可以通过实现Filter接口,编写自己的过滤器。

    过滤器是服务器端的操作。它在客户端被创建,通过RPC传送到服务器端,然后在服务器端进行过滤操作。

    HBase内置常用过滤器如下:

    • 行过滤器(RowFilter):基于行键过滤数据。

    Filter filter=new RowFilter(CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("row-100")));

    • 前缀过滤器(PrefixFilter):所有与前缀匹配的行都会返回给客户端。

    Filter filter=new PrefixFilter(Bytes.toBytes("1990"));

    • 首次行键过滤器(FirstKeyOnlyFilter):在找到所有行第一列的值时,就会返回数据。

    Filter filter=new FirstKeyOnlyFilter();

    • 单列值过滤器(SingleColumnValueFilter):根据某列的值进行过滤。

    Filter filter=new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("qual"),CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("value-100")));

    2、计数器

    由于HBase没有二级索引,故统计功能相对较弱。为应对此种情况,HBase使用计数器(counter)用于实时统计的业务场景下。

    计数器实际时HBase表中某一列的值,当进行写操作时,使用Table类的API对该列加1即可。计数器有两种:单计数据器和多计数器。

    • 单计数据器:用户自己设定计数器的行、列族和列。

    // 建立表连接
    Table tbl = conn.getTable(TableName.valueOf(tableName));
    // 计数器增加1
    long a=tbl.incrementColumnValue(
      Bytes.toBytes("row-count"), 
      Bytes.toBytes("info"), 
      Bytes.toBytes("q1"), 1);//比0大,则按给定值增加计数器的数值,若比0小,则按值减少计数器中的值
    // 返回计数器当前值
    long b=tbl.incrementColumnValue(
      Bytes.toBytes("row-count"), 
      Bytes.toBytes("info"), 
      Bytes.toBytes("q1"), 0);//0表示返回当前计数器的值

    • 多计数器:允许用户同时更新多个计数器的值,但这些计数器的值都必须处于同一行。

    //用行键初始化计数器
    Increment increment=new Increment(Bytes.toBytes("row-count"));
    //添加多个列
    increment.addColumn(Bytes.toBytes("info"), Bytes.toBytes("q2"), 1);
    increment.addColumn(Bytes.toBytes("info"), Bytes.toBytes("q3"), 1);
    Result result=tbl.increment(increment);
    // 打印计数器返回的结果
    for(KeyValue kv:result.raw()){
      System.out.println("KV:"+kv+" Value:"+Bytes.toLong(kv.getValue()));
    }

    3、协处理器

    协处理器(coprocessor)允许用户在RegionServer上运行自己的代码。协处理器框架主要有Observer和Endpoint两大类,用户可以继承这些类实现自己的逻辑。

    Observer:类似于RDMS的触发器,可以重写一些在特定事件发生时执行的回调函数。RegionObserverMasterObserverWALObserver三种。RegionObserver可以被用来处理数据修改事件,它发生的地点是Region;MasterObserver可以被用来管理表,如新定义表;WALObserver提供了控制WAL的回调函数。

    示例:Observer实现二级索引

    import java.io.IOException;
    import java.util.Iterator;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;

    public class TestCoprocessor extends BaseRegionObserver {

      @Override
      public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)throws IOException {
        Configuration conf=new Configuration();
        Connection conn=ConnectionFactory.createConnection(conf);
        //索引表
        Table tbl=conn.getTable(TableName.valueOf("idx_tbl"));
        //取出要插入的数据
        List<Cell> cells=put.get("cf".getBytes(), "info".getBytes());
        Iterator<Cell> kvIt=cells.iterator();
        while(kvIt.hasNext()){
          Cell tmp=kvIt.next();
          //用值作为行键
          Put idxPut=new Put(tmp.getValue());
          idxPut.add("cf".getBytes(),tmp.getRow(),Bytes.toBytes(System.currentTimeMillis()));
          //插入索引表
          tbl.put(idxPut);
        }
        tbl.close();
        conn.close();
      }

    }

    Endpoint:类似于RDMS的存储过程,允许用户将自定义操作添加到服务端。

  • 相关阅读:
    js中undefined,null,NaN的区别
    js中数字计算精度
    BestCoder Round #32
    POJ 2299 求逆序对(归并排序或树状数组)
    POJ 2603
    CodeForces 515C
    POJ 1853 背包问题
    UVA 10115 子符串替换
    POJ 1155 树状dp
    HDU 2196 树状dp 求树中节点之间的最长距离
  • 原文地址:https://www.cnblogs.com/netbloomy/p/6683509.html
Copyright © 2011-2022 走看看