zoukankan      html  css  js  c++  java
  • 通过线程池,从hbase中拿数据

    1.线程池类HbasePool

    package com.example.demospringboothbase.common;
    
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class HbasePool {
        private Logger log = Logger.getLogger(HbasePool.class);
    
    
    
        //代理类主要用于获取连接
        public class HbaseProxy {
    
            private String zk;
            private String zknode;
            private Connection connection;
    
            public HbaseProxy(String zk, String zknode) {
                this.zk = zk;
                this.zknode = zknode;
                init();
            }
    
            public void init() {
                Configuration entries = HBaseConfiguration.create();
                entries.set("hbase.zookeeper.quorum",zk);
                entries.set("zookeeper.znode.parent",zknode);
                try {
                    this.connection = ConnectionFactory.createConnection(entries);
                } catch (IOException e) {
                    log.error("获取连接失败!");
                    e.printStackTrace();
                }
    
            }
    
            public Connection getConnection(){
                return this.connection;
            }
    
            public void close(){
                if(this.connection !=null){
                    try {
                        this.connection.close();
                    } catch (IOException e) {
                        log.error("链接关闭失败~");
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    
        public class HbasePoolFactary extends BasePooledObjectFactory<HbaseProxy>{
            private String zk;
            private String zknode;
    
            public HbasePoolFactary(String zk, String zknode) {
                this.zk = zk;
                this.zknode = zknode;
            }
    
            @Override
            public HbaseProxy create() throws Exception {
                return new HbaseProxy(this.zk,this.zknode);
            }
    
            @Override
            public PooledObject<HbaseProxy> wrap(HbaseProxy hbaseProxy) {
                return new DefaultPooledObject<HbaseProxy>(hbaseProxy);
            }
    
            @Override
            public void destroyObject(PooledObject<HbaseProxy> p) throws Exception {
                HbaseProxy object = p.getObject();
                object.close();
                super.destroyObject(p);
            }
        }
    
        private  static HbasePool pool;
        //开始编写我们的单例池子
        private HbasePool(){}
    
        public static HbasePool getPool(){
            if(pool ==null){
                pool = new HbasePool();
            }
            return pool;
        }
    
        //还得写一个构造池子的单例方法。用通用的池子对象来进行构造
        private GenericObjectPool<HbaseProxy> gop;
    
        public GenericObjectPool<HbaseProxy> getGop(String zk,String zknode){
            if(gop ==null){
                HbasePoolFactary hbasePoolFactary = new HbasePoolFactary(zk, zknode);
                gop = new GenericObjectPool<HbaseProxy>(hbasePoolFactary);
                gop.setMaxTotal(10);
            }
            return gop;
        }
    }

    2.通过get来拿自己hbase中的数据

    这里将逻辑类和测试类写一块了。

    
    
    package com.example.demospringboothbase.serverce;

    import com.alibaba.fastjson.JSON;
    import com.example.demospringboothbase.common.HbasePool;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    public class test {
    //从连接池中拿链接、
    private HbasePool hbasePool = HbasePool.getPool();
    //客户给定一个表名、rowkey、rowkey的规则、哪些列、列的规则、列簇
    //输出的结果格式如下[{},{},{}]
    public List<Map> resultByRowkey(String tableName, List<String> rowkey, String rowkeyAttr,
    List<String> column, String columnAttr, String columnFamily) throws Exception {
    //先定义一个List
    ArrayList<Map> list = new ArrayList<>();
    Table table = null;
    //get操作是基于表名和rowkey来进行的
    ArrayList<Get> gets = new ArrayList<>();
    //这里将rowkey都放到gets中
    for (String rk:rowkey){
    Get get = null;
    if (rowkeyAttr.equals("rowkey")){
    get = new Get(rk.getBytes());
    }
    //在这里要指定列,因为只有指定列才会按照列输出,不指定列某人输出的是全部列
    if (columnAttr.equals("column")){
    for(String cl:column){
    get.addColumn(columnFamily.getBytes(),cl.getBytes());
    }
    }
    gets.add(get);
    }
    //和hbase取的联系
    GenericObjectPool<HbasePool.HbaseProxy> gop = hbasePool.getGop("server3:2181", "/hbase-unsecure");
    //从连接池中拿一个连接
    HbasePool.HbaseProxy hbaseProxy = gop.borrowObject();
    //指定表
    table = hbaseProxy.getConnection().getTable(TableName.valueOf(tableName));
    Result[] results = table.get(gets);
    if (results!=null){
    for (Result r:results){
    HashMap map = new HashMap();
    while (r.advance()){
    Cell current = r.current();
    String q = Bytes.toString(CellUtil.cloneQualifier(current));
    String p = Bytes.toString(CellUtil.cloneValue(current));
    map.put(q,p);
    }
    String rowkey1 = Bytes.toString(r.getRow());
    map.put("rowkey",rowkey1);
    list.add(map);
    }
    }else{
    return list;
    }
    return list;
    }
    //测试是否成功
    public static void main(String[] args) throws Exception {
    test ceshi = new test();
    ArrayList<String> rowkey = new ArrayList();
    ArrayList<String> colum = new ArrayList();
    rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222603");
    // rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222802");
    rowkey.add("0001b04bf9473458af40acb4c13f1476_20111230002114");
    colum.add("click");
    colum.add("url");
    colum.add("serch");
    List<Map> maps = ceshi.resultByRowkey("sogo3", rowkey, "rowkey", colum, "colum", "oo");

    System.out.println(JSON.toJSONString(maps));
    }
    }
    输出结果:[{"serch":"福彩3d单选一注法","rank":"10","rowkey":"000080fd3eaf6b381e33868ec6459c49_20111230222603","click":"5","url":"http://www.18888.com/read-htm-tid-6069520.html"},{"serch":"淫淫网","rank":"1","rowkey":"0001b04bf9473458af40acb4c13f1476_20111230002114","click":"1","url":"http://www.244uu.com/"}]
  • 相关阅读:
    大型web系统分布式架构
    与MSN聊天的PowerTalk两个示例
    PowerTalk的四个示例代码
    PowerTalk在十月份左右会有新的版本
    PowerTalk控件 制作 即时通信 聊天室 产品咨询系统 支持与MSN的控件
    PowerTalk有些对不住大家
    自动生成实体sql工具的IDEvs2005工具(源代码+程序)
    C#字符串类快速编译器
    小菜编程成长记系列
    一道简单的编程题,不过您做对了吗?
  • 原文地址:https://www.cnblogs.com/shiji7/p/11927583.html
Copyright © 2011-2022 走看看