zoukankan      html  css  js  c++  java
  • Hbase(四) 过滤器查询

    引言:过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
    过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;

    一、hbase过滤器的分类

       1、比较过滤器

          行键过滤器 RowFilter

    Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));

    scan.setFilter(filter1);

          列族过滤器 FamilyFilter

    Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
    scan.setFilter(filter1);

         列过滤器 QualifierFilter

    Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new  BinaryComparator(Bytes.toBytes("col-2")));
    scan.setFilter(filter1);

        值过滤器 ValueFilter

    Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
    scan.setFilter(filter1);

       2、专用过滤器

    单列值过滤器 SingleColumnValueFilter ----会返回满足条件的整行

    SingleColumnValueFilter filter = new SingleColumnValueFilter(
    Bytes.toBytes("colfam1"),
    Bytes.toBytes("col-5"),
    CompareFilter.CompareOp.NOT_EQUAL,
    new SubstringComparator("val-5"));
    filter.setFilterIfMissing(true); //如果不设置为 true,则那些不包含指定 column 的行也会返回
    scan.setFilter(filter1);

    单列值排除器 SingleColumnValueExcludeFilter -----返回排除了该列的结果 与上面的结果相反

    前缀过滤器 PrefixFilter----针对行键
    Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
    scan.setFilter(filter1);

    列前缀过滤器 ColumnPrefixFilter
    Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
    scan.setFilter(filter1);

    分页过滤器 PageFilter

    代码实现:

    package com.ghgj.hbase;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.BinaryComparator;
    import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
    import org.apache.hadoop.hbase.filter.ByteArrayComparable;
    import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.FamilyFilter;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
    import org.apache.hadoop.hbase.filter.PageFilter;
    import org.apache.hadoop.hbase.filter.PrefixFilter;
    import org.apache.hadoop.hbase.filter.QualifierFilter;
    import org.apache.hadoop.hbase.filter.RegexStringComparator;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.filter.SubstringComparator;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.junit.Test;
    public class HbasePageDemo {
    // 声明静态配置
    static Configuration conf = null;
    private static final String ZK_CONNECT_STR =
    "hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181,hadoop05:2181";
    static {
    conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR);
    }
    public static void main(String[] args) throws Exception {
    String tableName = "testfilter";
    String cfName = "f1";
    final byte[] POSTFIX = new byte[] { 0x00 };
    HTable table = new HTable(conf, tableName);
    Filter filter = new PageFilter(3);
    byte[] lastRow = null;
    int totalRows = 0;
    while (true) {
    Scan scan = new Scan();
    scan.setFilter(filter);
    if(lastRow != null){
    //注意这里添加了 POSTFIX 操作,用来重置扫描边界
    byte[] startRow = Bytes.add(lastRow,POSTFIX);
    scan.setStartRow(startRow);
    }
    ResultScanner scanner = table.getScanner(scan);
    int localRows = 0;
    Result result;
    while((result = scanner.next()) != null){
    System.out.println(localRows++ + ":" + result);
    totalRows ++;
    lastRow = result.getRow();
    }
    scanner.close();
    if(localRows == 0) break;
    }
    System.out.println("total rows:" + totalRows);
    } /
    **
    * 多种过滤条件的使用方法
    * @throws Exception
    */
    @Test
    public void testScan() throws Exception{
    HTable table = new HTable(conf, "person".getBytes());
    Scan scan = new Scan(Bytes.toBytes("person_zhang_000001"),
    Bytes.toBytes("person_zhang_000002"));
    //前缀过滤器----针对行键
    Filter filter = new PrefixFilter(Bytes.toBytes("person"));
    //行过滤器 ---针对行键
    ByteArrayComparable rowComparator = new
    BinaryComparator(Bytes.toBytes("person_zhang_000001"));
    RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator);
    rf = new RowFilter(CompareOp.EQUAL , new
    SubstringComparator("_2016-12-31_"));
    //单值过滤器 1 完整匹配字节数组
    new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(),
    CompareOp.EQUAL, "zhangsan".getBytes());
    //单值过滤器 2 匹配正则表达式
    ByteArrayComparable comparator = new RegexStringComparator("zhang.");
    new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(),
    CompareOp.EQUAL, comparator);
    //单值过滤器 3 匹配是否包含子串,大小写不敏感
    comparator = new SubstringComparator("wu");
    new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(),
    CompareOp.EQUAL, comparator);
    //键值对元数据过滤-----family 过滤----字节数组完整匹配
    FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL ,
    new BinaryComparator(Bytes.toBytes("base_info")) //表中不存
    在 inf 列族,过滤结果为空
    );
    //键值对元数据过滤-----family 过滤----字节数组前缀匹配
    ff = new FamilyFilter(
    CompareOp.EQUAL ,
    new BinaryPrefixComparator(Bytes.toBytes("inf")) //表中存在以
    inf 打头的列族 info,过滤结果为该列族所有行
    );
    //键值对元数据过滤-----qualifier 过滤----字节数组完整匹配
    filter = new QualifierFilter(
    CompareOp.EQUAL ,
    new BinaryComparator(Bytes.toBytes("na")) //表中不存在 na
    列,过滤结果为空
    );
    filter = new QualifierFilter(
    CompareOp.EQUAL ,
    new BinaryPrefixComparator(Bytes.toBytes("na")) //表中存在以
    na 打头的列 name,过滤结果为所有行的该列数据
    );
    //基于列名(即 Qualifier)前缀过滤数据的 ColumnPrefixFilter
    filter = new ColumnPrefixFilter("na".getBytes());
    //基于列名(即 Qualifier)多个前缀过滤数据的 MultipleColumnPrefixFilter
    byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")};
    filter = new MultipleColumnPrefixFilter(prefixes);
    //为查询设置过滤条件
    scan.setFilter(filter);
    scan.addFamily(Bytes.toBytes("base_info"));
    //一行
    // Result result = table.get(get);
    //多行的数据
    ResultScanner scanner = table.getScanner(scan);
    for(Result r : scanner){
    /**
    for(KeyValue kv : r.list()){
    String family = new String(kv.getFamily());
    System.out.println(family);
    String qualifier = new String(kv.getQualifier());
    System.out.println(qualifier);
    System.out.println(new String(kv.getValue()));
    }
    */
    //直接从 result 中取到某个特定的 value
    byte[] value = r.getValue(Bytes.toBytes("base_info"),
    Bytes.toBytes("name"));
    System.out.println(new String(value));
    }
    table.close();
    }
    }
    

      

    分页过滤器  代码实现:

    package com.ghgj.hbase.test1610;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.PageFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    /**
     * 501条
     * 
     * 每页100条,求第四页 : 301 - 400
     * 
     * pageIndex:第几页  
     * pageNumber:每页几条
     *
     *
     * 在hbase当中取一部分数据的取法:
     * scan 'user_info',{COLUMNS => 'base_info:name', 
     * LIMIT => 4, STARTROW => 'zhangsan_20150701_0001'}
     * 
     * mysqL:从第几条开始,取多少条
     * 
     * 从mysql的分页规则引申到hbase的分页:把startRow转换成mysql的第几条
     */
    public class HBasePageFilterDemo {
    
    	private static final String ZK_CONNECT_STR = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
    
    	private static final String TABLE_NAME = "user_info";
    	private static final String FAMILY_BASIC = "base_info";
    	private static final String FAMILY_EXTRA = "extra_info";
    	private static final String COLUMN_NAME = "name";
    	private static final String COLUMN_AGE = "age";
    	private static final String ROW_KEY = "rk0001";
    
    	private static Configuration config = null;
    	private static HTable table = null;
    	static {
    		config = HBaseConfiguration.create();
    		config.set("hbase.zookeeper.quorum", ZK_CONNECT_STR);
    		try {
    			table = new HTable(config, TABLE_NAME);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public static void main(String[] args) throws Exception {
    		
    //		ResultScanner pageData = getPageData("zhangsan_20150701_0001", 4);
    		ResultScanner pageData = getPageData(2, 4);
    		HBasePrintUtil.printResultScanner(pageData);
    		
    //		String lastRowkey = getLastRowkey(pageData);
    //		System.out.println(lastRowkey);
    		
    	}
    	
    	public static ResultScanner getPageData(int pageIndex, int pageNumber) throws Exception{
    		// 怎么把pageIndex 转换成 startRow
    		String startRow = null;
    		
    		if(pageIndex == 1){	// 当客户方法只取第一页的分页数据时,
    			ResultScanner pageData = getPageData(startRow, pageNumber);
    			return pageData;
    			
    		}else{
    			ResultScanner newPageData = null;
    			for(int i=0; i<pageIndex - 1; i++){	// 总共循环次数是比你取的页数少1
    				newPageData = getPageData(startRow, pageNumber);
    				startRow = getLastRowkey(newPageData);
    				byte[] add = Bytes.add(Bytes.toBytes(startRow), new byte[]{ 0X00 });
    				startRow = Bytes.toString(add);
    			}
    			newPageData = getPageData(startRow, pageNumber);
    			return newPageData;
    		}
    	}
    	
    	/**
    	 * @param startRow
    	 * @param pageNumber
    	 * @return
    	 *  * scan 'user_info',{COLUMNS => 'base_info:name', 
         * LIMIT => 4, STARTROW => 'zhangsan_20150701_0001'}
    	 * @throws Exception 
    	 */
    	public static ResultScanner getPageData(String startRow, int pageNumber) throws Exception{
    		Scan scan  = new Scan();
    		scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
    		// 設置當前查询的其实位置
    		if(!StringUtils.isBlank(startRow)){
    			scan.setStartRow(Bytes.toBytes(startRow));
    		}
    		// 第二个参数
    		Filter pageFilter = new PageFilter(pageNumber);
    		scan.setFilter(pageFilter);
    		
    		ResultScanner rs = table.getScanner(scan);
    		return rs;
    	}
    	
    	public static String getLastRowkey(ResultScanner rs){
    		String lastRowkey = null;
    		for(Result result : rs){
    //			System.out.println(result.getRow());
    			lastRowkey = Bytes.toString(result.getRow());
    		}
    		return lastRowkey;
    //		return null;
    	}
    }
    

      

    多条件过滤时,可以使用FilterList

    List<Filter> filters = new ArrayList<Filter>();
    		SingleColumnValueFilter filter =new  SingleColumnValueFilter(
    				Bytes.toBytes("info"),
    				Bytes.toBytes("age"),
    				CompareOp.LESS_OR_EQUAL,
    				new BinaryComparator(Bytes.toBytes("20")));
    		filters.add(filter);
    		
    		SingleColumnValueFilter filter1 =new  SingleColumnValueFilter(
    				Bytes.toBytes("info"),
    				Bytes.toBytes("age"),
    				CompareOp.GREATER,
    				new BinaryComparator(Bytes.toBytes("18")));
    		
    		filters.add(filter1);
    		
    		Filter filter2 = new ValueFilter(CompareOp.EQUAL, new SubstringComparator("lisi") );
    		filters.add(filter2);
    		
    		
    		FilterList f=new FilterList(filters);
                    scan.setFilter(f);
    

      

  • 相关阅读:
    Spring的事务处理机制
    英特尔诺基亚联手研发Symbian系统的智能手机
    国际最新LOGO设计趋势总结
    Java学习笔记_身份验证机制
    用Validator检查你的表单
    软件企业:细节造就竞争力
    优化软件性能的方法
    【开发经验】Struts常见错误及原因分析
    CF547E Mike and Friends
    [ZJOI2015]诸神眷顾的幻想乡
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6840300.html
Copyright © 2011-2022 走看看