zoukankan      html  css  js  c++  java
  • Hbase客户端示例

    public interface HBaseOperations {
    
        <T> T get(String tableName, String rowKey, String familyName,
                  ResultMapper<T, Result> mapper);
    
        <T> T get(String tableName, String rowKey, String familyName, String qualifier,
                  ResultMapper<T, Result> mapper);
    
        <T> T get(String tableName, List<String> rowKeys, String familyName,
                  ResultMapper<T, Result[]> mapper);
    
        <T> T get(String tableName, List<String> rowKeys, String familyName, String qualifier,
                  ResultMapper<T, Result[]> mapper);
    
        void put(String tableName, String rowKey, String familyName, String qualifier, byte[] value);
    
        void put(String tableName, List<Put> putList);
    
    }

    一 初始化

    public class HBaseTemplate implements HBaseOperations {
    
        private Connection connection;
    
        @PostConstruct
        private void connect() {
            log.info("start connecting to HBase");
            try {
                Configuration configuration = HBaseConfiguration.create();
                
                    configuration.set("hbase.zookeeper.quorum", HBaseConfig.getHBaseZkHost());
                    configuration.set("hbase.zookeeper.property.clientPort", HBaseConfig.getHBaseZkPort());
                    configuration.set("zookeeper.znode.parent", HBaseConfig.getHBaseZkZnodeParent());// /ymm_hbase
    // HBase操作失败重试次数 
    configuration.setInt("hbase.client.retries.number", 2);
    // HBase重试间隔ms
    configuration.setInt("hbase.client.pause", 50);

    // HBase rpc请求超时时间ms configuration.setInt("hbase.rpc.timeout", 1000);
    configuration.setInt(
    "hbase.client.operation.timeout", 5000);
    connection
    = ConnectionFactory.createConnection(configuration);
    }
    catch (Exception e) {
    log.error(
    "failed to connect HBase", e); } }

    二 get部分

    @Override
        public <T> T get(String tableName, String rowKey, String familyName,
                         ResultMapper<T, Result> mapper) {
            return get(tableName, rowKey, familyName, null, mapper);
        }
    
        @Override
        public <T> T get(String tableName, String rowKey, String familyName, String qualifier,
                  ResultMapper<T, Result> mapper) {
            return execute(tableName, (table -> {
                Get get = buildGet(rowKey, familyName, qualifier);
                Result result = table.get(get);
                return mapper.mapResult(result);
            }));
        }
    
        @Override
        public <T> T get(String tableName, List<String> rowKeys, String familyName,
                  ResultMapper<T, Result[]> mapper) {
            return get(tableName, rowKeys, familyName, null, mapper);
        }
    
        @Override
        public <T> T get(String tableName, List<String> rowKeys, String familyName, String qualifier,
                  ResultMapper<T, Result[]> mapper) {
            return execute(tableName, (table -> {
                List<Get> gets = new ArrayList<>(rowKeys.size());
                rowKeys.forEach(rowKey -> {
                    Get get = buildGet(rowKey, familyName, qualifier);
                    gets.add(get);
                });
                Result[] results = table.get(gets);
                return mapper.mapResult(results);
            }));
        }

    execute

    private <T> T execute(String tableName, TableCallback<T> action) {
            if(action == null || StringUtils.isBlank(tableName)) {
                throw new HBaseException("execute param error");
            }
            Table table = getTable(tableName);
            try {
                T result = action.doInTable(table);
                return result;
            } catch (Throwable t) {
                throw new HBaseException("invoke HBase failed", t);
            } finally {
                releaseTable(table);
            }
        }
    public static void releaseTable(Table table) {
            try {
                table.close();
            } catch (IOException e) {
                throw new HBaseException("release table failed", e);
            }
        }
    private Table getTable(String tableName) {
            Table table;
            try {
                table = connection.getTable(TableName.valueOf(tableName));
            } catch (IOException e) {
                throw new HBaseException("get table failed", e);
            }
            return table;
        }

    我们这里拿一个实际的例子 看看 ResultMapper的实现类

     private List<WaybillLogEntity> mapResult(Result result) {
            List<WaybillLogEntity> waybillLogEntityList = Lists.newArrayList();
            if(result == null ||result.isEmpty()) {
                return waybillLogEntityList;
            }
            Cell[] cells = result.rawCells();
            Arrays.stream(cells).forEach(cell -> {
                WaybillLogEntity waybillLogEntity = convert(cell);
                if(waybillLogEntity != null) {
                    waybillLogEntityList.add(waybillLogEntity);
                }
            });
            return waybillLogEntityList;
        }
    private WaybillLogEntity convert(Cell cell) {
            if(cell == null) {
                return null;
            }
            String value = new String(CellUtil.cloneValue(cell));
            return JSON.parseObject(value, WaybillLogEntity.class);
        }

    看得出来 公司都是把对象转成json传

    三 put部分

      

    @Override
        public void put(String tableName, String rowKey, String familyName, String qualifier, byte[] value) {
            if(StringUtils.isBlank(tableName)
                    || StringUtils.isBlank(rowKey)
                    || StringUtils.isBlank(familyName)
                    || StringUtils.isBlank(qualifier)
                    || value == null) {
                log.info("put hbase param error");
                return;
            }
            execute(tableName, (table -> {
                Put put = new Put(Bytes.toBytes(rowKey))
                                .addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), value);
                table.put(put);
                return null;
            }));
        }
    
        @Override
        public void put(String tableName, List<Put> putList) {
            if(StringUtils.isBlank(tableName) || CollectionUtils.isEmpty(putList)) {
                log.info("put list hbase param error");
                return;
            }
            execute(tableName, (table -> {
                table.put(putList);
                return null;
            }));
        }
    hBaseTemplate.put(buildTableName(),
                        buildRowKey(waybillLogEntity.getWaybillId()),
                        buildFamilyName(),
                        buildQualifier(waybillLogEntity),
                        JSONObject.toJSONBytes(waybillLogEntity));
    private String buildTableName() {
            return HBASE_NAMESPACE + ":" + HBaseConfig.getHBaseTableTpWaybillLog();
        }
    HBASE_NAMESPACE String HBASE_NAMESPACE = "default";
    private String buildRowKey(long waybillId) {
            long prefix = waybillId % 64;
            return prefix + "|" + waybillId;
        }
    private static final String DEFAULT_FAMILY_NAME = "data";
    private String buildQualifier(WaybillLogEntity waybillLogEntity) {
            if(waybillLogEntity == null) {
                return null;
            }
            return String.valueOf(waybillLogEntity.getId());
        }
  • 相关阅读:
    C#后台正则表达式
    Layer 弹出层抖动问题
    JS中子页面父页面方法 变量相互调用
    layer最大话.最小化.还原回调方法
    trove远程连接mongodb
    tar.gz tar.bz2的解压命令
    IO测试工具之fio详解
    HTTP请求方法
    jmeter --使用put方法上传文件
    DHCP的原理和实现过程
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14838170.html
Copyright © 2011-2022 走看看