zoukankan      html  css  js  c++  java
  • 使用TableSnapshotInputFormat读取Hbase快照数据

    根据快照名称读取hbase快照中的数据,在网上查了好多资料,很少有资料能够给出清晰的方案,根据自己的摸索终于实现,现将代码贴出,希望能给大家有所帮助:

    public void read(org.apache.hadoop.conf.Configuration hadoopConf, Pipeline pipeline, ReaderParam readerParam, int batchSize) {
            limiter = RateLimiter.create(readerParam.getFetchSize() * M_BYTE_SIZE);
    
            //用于记录读取行数
            AtomicInteger totalCount = new AtomicInteger();
    
            JobConf conf = new JobConf(hadoopConf);
            String sourceRcFilePath = readerParam.getFilePath();
            logger.info(String.format("Start Read Rcfile [%s].", sourceRcFilePath));
            String defaultFS=String.format("hdfs://%s", readerParam.getFsdefaultname());
    
            try {
                int size = 1;
                BatchData batchData;
                List<Record> recordList = new ArrayList<>(batchSize);
    
                Scan scan = new Scan();
                scan.setCaching(500);
                scan.setCacheBlocks(false);      //离线任务必须设置
                conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()));
    
                //序列化
                InputFormat<ImmutableBytesWritable, Result> in = new TableSnapshotInputFormat();
                Path rootDir = FSUtils.getRootDir(conf);
                String[] tableNameSplit = readerParam.getFileName().split(":");
                String namespace_table = tableNameSplit[0]+"_"+tableNameSplit[1];
    
                Connection conn = ConnectionFactory.createConnection(conf);
                Admin admin = conn.getAdmin();
                boolean tableExist = admin.tableExists(TableName.valueOf(readerParam.getFileName()));
    //            List<HBaseProtos.SnapshotDescription> list = admin.listSnapshots("^"+namespace_table);
    
    //            TableName[] tables = admin.listTableNames();
    //            List<HBaseProtos.SnapshotDescription> list = admin.listSnapshots();
    //            for(HBaseProtos.SnapshotDescription snapshotDescription : list){
    //                String snapshotName = snapshotDescription.getMsg();
    //                String table = snapshotDescription.getTable();
    //            }
    
                FileSystem fs = FileSystem.get(conf);
                Path rootPath = new Path(conf.get("hbase.rootdir"));
                Path snapshotDir = new Path(conf.get("hbase.rootdir")+HBASE_SNAPSHOT_BASE_PATH);
                snapshotDir = SnapshotDescriptionUtils.getSnapshotRootDir(new Path(conf.get("hbase.rootdir")));
                FileStatus[] listStatus = fs.listStatus(snapshotDir);
    
    //            HBaseProtos.SnapshotDescription snapshotDescription = SnapshotDescriptionUtils.readSnapshotInfo(fs, new Path(conf.get("hbase.rootdir")+"/.snapshots/completed"));
    //            Arrays.stream(listStatus).forEach(x-> System.out.println(x.getPath().toString()));
    //            System.out.println("-----------------------------------------");
    
                List<String> snapshotList = new ArrayList<String>();
                Arrays.stream(listStatus).filter(x-> !x.getPath().getName().startsWith(".")).forEach(x->{
                    String snapshotName = x.getPath().getName();
                    Path snapshotPath = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootPath);
                    try {
                       HBaseProtos.SnapshotDescription s = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotPath);
                       System.out.println("tableName:"+ s.getTable()+"	 snapshot:"+s.getName());
                       if (s.getTable().equalsIgnoreCase(readerParam.getFileName())){
                           snapshotList.add(s.getName());
                       }
                    } catch (CorruptedSnapshotException e) {
                        e.printStackTrace();
                    }
                });
    //            List<String> snapshotList = Arrays.stream(listStatus).filter(x-> !x.getPath().getMsg().startsWith(".")).map(x -> String.valueOf(x.getPath())).filter(x -> x.contains(namespace_table)).sorted(Comparator.reverseOrder()).collect(Collectors.toList());
                snapshotList.stream().forEach(x -> System.out.println(x));
                if (snapshotList.isEmpty()){
                    String message = String.format("读取Hbase快照信息发生异常,没有找到对应表快照,请联系系统管理员。", readerParam.getFilePath());
                    logger.error(message);
                    throw DiException.asDiException(CommonErrorCode.CONFIG_ERROR, message);
                }
                String snapshotName = snapshotList.stream().sorted(Comparator.reverseOrder()).findFirst().get();
                String restorTmp = String.format("%s/user/%s/restoretmp/%s", conf.get("fs.defaultFS"), "di", namespace_table);
                Path restorPath = new Path(restorTmp);
    //            Path restorPath = new Path("hdfs://RouterSit/user/di/restoretmp/ns_di_snapshot_test2");
                TableSnapshotInputFormatImpl.setInput(conf, snapshotName, restorPath);
    
                List<String> columns = Arrays.asList(readerParam.getReadColumns().split(","));
    
    
                //Each file as a split
                InputSplit[] splits = in.getSplits(conf, 1);
                for (InputSplit split : splits){
    
                    recordReader = in.getRecordReader(split, conf, Reporter.NULL);
                    ImmutableBytesWritable key = recordReader.createKey();
                    Result value = recordReader.createValue();
    
                    List<Object> recordFields;
                    while (start && recordReader.next(key, value)) {
    
                        Record record = result2Record(value, columns);
                        limiter.acquire(record.getMemorySize());
                        recordList.add(record);
                        size++;
                    }
                }
    
            } catch (Exception e) {
                String message = String.format("读取Hbase快照数据发生异常,请联系系统管理员。", readerParam.getFilePath());
                logger.error(message);
                throw DiException.asDiException(CommonErrorCode.CONFIG_ERROR, message, e);
            } finally {
                stop();
            }
    
        }
    

      

    如果读取快照数据时,数据列簇使用的是lzo压缩的话,可能会遇到lzo解压缩问题,可以参照:hbase读取快照数据-lzo压缩遇到的问题

  • 相关阅读:
    代码对齐--string|stream
    pytest_30_40
    性能总结
    day41_IO模型
    postman_05_cookie_case_流(顺序)
    linux_压缩
    post_04_assert_请求参数预处理_pre_requests
    postman_网址
    postman_03_引用随机变量($guid,$timestamp,$randomInt)_and_参数引用外部文件
    CF 1354C2 思维
  • 原文地址:https://www.cnblogs.com/qixing/p/11461448.html
Copyright © 2011-2022 走看看