zoukankan      html  css  js  c++  java
  • hbase+springboot+redis实现分页

    实现原理:

      1、读取hbase数据每页的数据时多取一条数据。如:分页是10条一页,第一次查询hbase时, 取10+1条数据,然后把第一条和最后一条rowkey数据保存在redis中,redis中的key为用户的token+URL。即token.set(token+url:list<String>);

      2、前台点击下页时,查询当前页(currentPagae)在redis的list是否存在list.get(currentPage)的rowkey。如果存在,则以之前为startRowKey,取10+1条,并把最后一条保存在redis中。不存在则查询出错,提示重新查询,理论上不会出现,除非redis挂了。

      3、如果查询的数据的startRowKey和stopRowKey在token中都能找到。则只需要查询这个范围数据即可。

      3、什么时候清除这些redis数据呢?第一、设置redis有效期。第二,这条很重要,是保证前三条数据准确性的前提。在用户点击非下一页上一页按钮的操作时,都清除redis中的当前用户的数据。

      4、即然有分页,那当然有数据量统计count,这个我使用hbase的协处理器coprocessor。当然每次查询count也费时间。当第一次查询时,把count保存在用户的redis中。redis的清除还是第(3)步的过程。

      5、能这么做还有一个很重要的前提:前端界面的分页,只提供了两个按钮:上一页和下一页。这是保证这个方案可行性的基础。

    下面上代码:

    controller中方法的代码,这里的基本框架使用的是renren快速开发套件

    @ResponseBody
    	@RequestMapping("/list")
    	public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){
    
    		long time = System.currentTimeMillis();
    		HBasePage page= null;
    		try{
    			String token = httpRequest.getHeader("token");
    
    			//如需要额外的过滤器,请自己定义并使用.buildFilter(filter)方法添加到query中
    			//请注意顺序:如果有RowKey的过滤,先添加buildRowKeyFilter,再加buildRowKeyPage。否则无效
    			HbaseQuery query = new HbaseQuery(params,token,true);
    
    			Object cnOrderCodeObject = params.get("cnOrderCode");
    			if(cnOrderCodeObject!=null&&!((String)cnOrderCodeObject).equals("")){
    				String cnOrderCode = (String)cnOrderCodeObject;
    				query.buildRowKeyFilter(cnOrderCode,cnOrderCode);
    			}
    			query.buildRowKeyPage().finish();
    			page = service.query(query).buildRedisRowKey(token);
    
    		}catch (Exception e){
    			e.printStackTrace();
    		}
    		long time2 = System.currentTimeMillis();
    		System.out.println("time2-time==list="+(time2-time));
    		return R.ok().put("page",page);
    
    	}
    

      

      

    处理查询参数的类HBaseQuery,因为业务原因,所有的hbase查询有两个必须的条件:开始日期和结束日期,所以我在HBaseQuery中把这两个参数直接封装了。

    @Data
    public class HbaseQuery  {
    	private static final long serialVersionUID = 1L;
    	//当前页码
        private int page;
        //每页条数
        private long limit;
    
        private Map<String, Object> params;
    
        private List<String> pageStartRowKeys;
    
        private Date startDate;
    
        private Date endDate;
    
        private StringBuffer startRowKey = new StringBuffer();
        private StringBuffer stopRowKey = new StringBuffer();
    
        private RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils");
    
        private Scan countScan;
    
        private Scan dataScan= new Scan();
    
        private int cache = 10;
    
        private DateFormat sf =new SimpleDateFormat("yyyy-MM-dd");
    
        private FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        private FilterList countfl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    
        private boolean isOpenCount;
        private String token;
    
        public HbaseQuery(Map<String, Object> params,String token,boolean isOpenCount)throws  Exception{
            this.isOpenCount = isOpenCount;
            this.token = token;
            this.params = params;
            String temp2 = (String)params.get("startDate");
            startDate = sf.parse(temp2);
            String temp = (String)params.get("endDate");
            endDate = sf.parse(temp);
            endDate = DateUtils.addDays(endDate,1);
            this.page = Integer.parseInt(params.get("page").toString());
            this.limit = Integer.parseInt(params.get("limit").toString());
            cache = limit>5000?5000:((int)limit+1);//加1,因为每次都会取limit+1条数据
            params.remove("startDate");
            params.remove("endDate");
            params.remove("page");
            params.remove("limit");
    
            initRowKeyFilter();
        }
    
        public void initRowKeyFilter(){
            Long endLong = Long.MAX_VALUE-startDate.getTime();
            Long startLong = Long.MAX_VALUE-(DateUtils.addDays(endDate,1).getTime()-1);
            stopRowKey.append(endLong);
            startRowKey.append(startLong);
        }
    
        //请注意顺序:如果有RowKey的过滤,先添加buildRowKeyFilter,再加buildRowKeyPage。否则无效
        public HbaseQuery buildRowKeyFilter(String startRowKey,String stopRowKey){
            if (this.startRowKey.equals("")) {
                this.stopRowKey.append(startRowKey);
            } else {
                this.stopRowKey.append("-").append(startRowKey);
            }
            if (this.stopRowKey.equals("")) {
                this.stopRowKey.append(stopRowKey);
            } else {
                this.stopRowKey.append("-").append(stopRowKey);
            }
            return this;
        }
    
    
        //请注意顺序:如果有RowKey的过滤,先添加buildRowKeyFilter,再加buildRowKeyPage。否则无效
        public HbaseQuery buildRowKeyPage()throws Exception{
    
    
            List<String> pageStartRowKeys = redisUtils.get(token,List.class);
    
    
            //点击上一页或下一页
            Object temp =params.get("pageicon");
            if(temp!=null&&!((String)temp).equals("")&&!((String)temp).equals("records")){
                //且redis中的startRowKeys不为空
                String pageicon =  (String)temp;
                if(pageStartRowKeys!=null){
                    String startRowKey = pageStartRowKeys.get(this.page-1);
                    if(pageicon.equals("next")&&pageStartRowKeys.size()==this.page){
                        this.startRowKey = new StringBuffer(startRowKey);
                        Filter pageFilter=new PageFilter(cache);
                        fl.addFilter(pageFilter);
                    }else if((pageicon.equals("next")&&pageStartRowKeys.size()>this.page)
                            ||pageicon.equals("prev")){
                        String stopRowKey = pageStartRowKeys.get(this.page);
                        this.startRowKey = new StringBuffer(startRowKey);
                        this.stopRowKey = new StringBuffer(stopRowKey);
                        Filter pageFilter=new PageFilter(this.getLimit());
                        fl.addFilter(pageFilter);
                    }
    
                }else{
                    throw new Exception("点的是分页,但是redis中没有数据,这程序肯定有问题");
                }
            }else{//点击的非分页按钮,则删除redis中分页信息
                redisUtils.delete(token);
                redisUtils.delete(RedisKeys.getPageCountKey(token));
                Filter pageFilter=new PageFilter(this.getLimit()+1);
                fl.addFilter(pageFilter);
            }
            dataScan.setCaching(cache);
            return this;
        }
    
        public HbaseQuery addFilter(Filter filter){
            fl.addFilter(filter);
            countfl.addFilter(filter);
            return this;
        }
    
        public HbaseQuery finish(){
            String count = redisUtils.get(RedisKeys.getPageCountKey(token));
            if(isOpenCount&&(count==null||count.equals(""))){
                countScan= new Scan();
                countScan.setMaxVersions();
                countScan.setCaching(5);
                countScan.setStartRow((startRowKey.toString()).getBytes());
                countScan.setStopRow((stopRowKey.toString()).getBytes());
                countScan.setFilter(countfl);
            }
            dataScan.setMaxVersions();
            dataScan.setStartRow((startRowKey.toString()).getBytes());
            dataScan.setStopRow((stopRowKey.toString()).getBytes());
            dataScan.setFilter(fl);
    
            return this;
        }
    
    }
    

      

      

    查询hbase的方法。注意:在使用hbase的协处理器前,请先确保表开通了此功能。

      hbase表开通协处理功能方法(shell命令):

      (1)disable指定表。hbase> disable 'mytable'
      (2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
      (3)重启指定表 hbase> enable 'mytable'

    public HBasePage query(HbaseQuery query) {
            long time = System.currentTimeMillis();
            Map<String,String> communtiyKeysMap = new HashMap<>();
            HBasePage page = new HBasePage(query.getLimit(),query.getPage());
            final String tableName = this.getTableName();
            if(query.getCountScan()!=null){
                AggregationClient ac = new AggregationClient(hbaseTemplate.getConfiguration());
                try{
                    long count = ac.rowCount(TableName.valueOf(tableName),new LongColumnInterpreter(),query.getCountScan());
                    page.setTotalCount(count);
                }catch (Throwable  e){
                    e.printStackTrace();
                }
            }
            long time2 = System.currentTimeMillis();
            List rows = hbaseTemplate.find(tableName, query.getDataScan(), new RowMapper<Object>() {
    
                @Override
                public Object mapRow(Result result, int i) throws Exception {
                    Class clazz = ReflectMap.get(tableName);//这里做了表名和实体Bean的映射。
                    if(i==0){
                        communtiyKeysMap.put("curPageStart", new String(result.getRow()));
                    }
                    if(i==query.getLimit()){
                        communtiyKeysMap.put("nextPageStart", new String(result.getRow()));
                    }
                    HBaseResultBuilder hrb = new HBaseResultBuilder<Object>("sf", result, clazz);
                    return hrb.buildAll().fetch();
                }
            });
            //
            if(rows.size()>0&&page.getPageSize()<rows.size()){
                rows.remove(rows.size()-1);
            }
            page.setList(rows);
            page.setNextPageRow(communtiyKeysMap.get("nextPageStart"));
            page.setCurPageRow(communtiyKeysMap.get("curPageStart"));
            long time3 = System.currentTimeMillis();
            System.out.println("time2-time==getCount="+(time2-time));
            System.out.println("time3-time2==getData="+(time3-time2));
            return page;
        }

    /分页类的代码HbasePage

    public class HBasePage  implements Serializable {
    	private static final long serialVersionUID = 1L;
    	//总记录数
    	protected long totalCount;
    	//每页记录数
    	protected long pageSize;
    	//总页数
    	protected int totalPage;
    	//当前页数
    	protected int currPage;
    	//列表数据
    	protected List<Object> list;
    
    	private String nextPageRow;//下一页的ROWKEY
    
    	private String curPageRow;//当前页的开始ROWKEY
    
    	/**
    	 * 分页
    	 * @param pageSize    每页记录数
    	 * @param currPage    当前页数
    	 */
    	public HBasePage(long pageSize, int currPage) {
    		this.list = list;
    		this.totalCount = totalCount;
    		this.pageSize = pageSize;
    		this.currPage = currPage;
    	}
    
    
    	public void setTotalCount(long totalCount) {
    		this.totalCount = totalCount;
    		this.totalPage = (int)Math.ceil((double)totalCount/pageSize);
    	}
    
    	public HBasePage buildRedisRowKey(String token){
    		RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils");
    		List<String> pageStartRowKeys = redisUtils.get(token,List.class);
    		List<String> pageRowKeys = redisUtils.get(token,List.class);
    		if(this.getList().size()>0){
    			if(pageRowKeys==null||pageRowKeys.size()<=0){
    				pageRowKeys = new ArrayList<>();
    				pageRowKeys.add(this.getCurPageRow().substring(0,this.getCurPageRow().indexOf("-")+1));
    				pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1));
    				redisUtils.set(token,pageRowKeys);
    			}else{
    				if(pageRowKeys.size()>this.getCurrPage()){
    					//doNothing
    				}else if(pageRowKeys.size()==this.getCurrPage()){
    					pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1));
    					redisUtils.set(token,pageRowKeys);
    				}
    			}
    		}
    		return this;
    	}
    
    }  

    注意:

    1、我的rowKey设置规则是 (Long_Max-new Date().getTime()+"-"+id),所以在看startRowKey和stopRowKey时特别注意。

    如有什么更好的办法或代码缺陷,欢迎留言探讨。

    @ResponseBody
    @RequestMapping("/list")
    @RequiresPermissions("business:stockinbill:list")
    public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){

    long time = System.currentTimeMillis();
    HBasePage page= null;
    try{
    String token = httpRequest.getHeader("token");

    //如需要额外的过滤器,请自己定义并使用.buildFilter(filter)方法添加到query
    //请注意顺序:如果有RowKey的过滤,先添加buildRowKeyFilter,再加buildRowKeyPage。否则无效
    HbaseQuery query = new HbaseQuery(params,token,true);

    Object cnOrderCodeObject = params.get("cnOrderCode");
    if(cnOrderCodeObject!=null&&!((String)cnOrderCodeObject).equals("")){
    String cnOrderCode = (String)cnOrderCodeObject;
    query.buildRowKeyFilter(cnOrderCode,cnOrderCode);
    }
    query.buildRowKeyPage().finish();
    page = stockInBillService.query(query).buildRedisRowKey(token);

    }catch (Exception e){
    e.printStackTrace();
    }
    long time2 = System.currentTimeMillis();
    System.out.println("time2-time==list="+(time2-time));
    return R.ok().put("page",page);

    }
  • 相关阅读:
    HTML5 Application Cache
    一个页面多个bootstrip轮播以及一个页面多个swiper轮播 冲突问题
    jquery中attr和prop的区别
    eval函数的工作原理
    JSON.parse 函数
    JS知识体系
    闭包
    io输入输出与反射机制2
    IO输入输出与反射机制1
    项目-超市会员管理系统
  • 原文地址:https://www.cnblogs.com/mowei/p/7776832.html
Copyright © 2011-2022 走看看