zoukankan      html  css  js  c++  java
  • 使用parquet-hadoop.jar包解析hive parquet文件时,遇到FIXED_LEN_BYTE_ARRAY转换为Decimal 以及 INT96转换为timestamp问题

    在使用parquet-hadoop.jar包解析parquet文件时,遇到decimal类型的数据为乱码,具体解决方法如下:

    使用parquet-Hadoop.jar解析httpfs服务提供的parquet文件,代码如下:

    @Test
        public void httpfsReadHiveParquetFile() throws Exception {        Path path = new Path("webhdfs://s128:14000/wbd_test/parq1.0.parq");
            Configuration conf = new Configuration();
            conf.set("fs.webhdfs.impl", WebHdfsFileSystem.class.getName());
            Map<String, String> urlParams = new HashMap<>();
            urlParams.put("user.token", "7hmsNJIget0eGO5maKQ=sfds");
            conf.set(WebHdfsFileSystem.HTTPFS_URL_PARAM, JSON.toJSONString(urlParams));
            FileSystem fs = path.getFileSystem(conf);
    
            FileStatus fileStatus = fs.getFileStatus(path);
    
            InputFile inputFile = HadoopInputFile.fromStatus(fileStatus, conf);
            GroupReadSupport readSupport = new GroupReadSupport();
            ParquetReader.Builder<Group> reader= ParquetReader.read(inputFile);
            reader.withConf(conf);
            ParquetReader<Group> build=reader.build();
    
            Group line=null;
            line=build.read();
    
            Map<String,String> fieldTypeMap = new HashMap<String, String>();
    
            if (line != null){
                List<Type> typeList = line.getType().getFields();
                ParquetInputFormat inputFormat = new ParquetInputFormat();
                for(Type type : typeList){
                    System.out.print(type.getName()+"("+type.asPrimitiveType().getPrimitiveTypeName().name()+")		");
                }
                System.out.println();
                System.out.println("-----------------------------------------------------------------------------------------------------------");
                do{
                    for (Type type : typeList){
                        System.out.print(converterType2Java(line, type)+"		");
                    }
                    System.out.println();
                }while ((line=build.read())!=null);
            }
    
            System.out.println("It is over !");
    
        }
    
    public static String converterType2Java(Group line, Type type) {
            String value = null;
            String fieldType = type.asPrimitiveType().getPrimitiveTypeName().name();
            String fieldName = type.getName();
            int repetition = line.getFieldRepetitionCount(type.getName());
            if (repetition == 0){
                return value;
            }
    
            switch (fieldType){
                case "BOOLEAN":
                    value = String.valueOf(line.getBoolean(fieldName, 0));
                    break;
                case "INT32":
                    value = String.valueOf(line.getInteger(fieldName, 0));
                    break;
                case "INT64":
                    value = String.valueOf(line.getLong(fieldName, 0));
                    break;
                case "INT96":
                    value = String.valueOf(getTimestampMillis(line.getInt96(fieldName, 0)));
                    break;
                case "FLOAT":
                    value = String.valueOf(line.getFloat(fieldName, 0));
                    break;
                case "DOUBLE":
                    value = String.valueOf(line.getDouble(fieldName, 0));
                    break;
                case "FIXED_LEN_BYTE_ARRAY":
                    if (type.getOriginalType() != null && type.getOriginalType().name().equals("DECIMAL")){
                        value = String.valueOf(binaryToDecimal(type.asPrimitiveType().getDecimalMetadata().getPrecision(), type.asPrimitiveType().getDecimalMetadata().getScale(), line.getBinary(fieldName, 0).getBytes()));
                        int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision();
                        int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
                        BigDecimal decimalValue = binaryToDecimal(precision, scale, line.getBinary(fieldName, 0).getBytes());
                        String precisionFormat = String.join("", Collections.nCopies(precision-1, "#"));
                        String scaleFrmat = String.join("", Collections.nCopies(scale,"0"));
                        String format = precisionFormat + "0."+ scaleFrmat;
                        DecimalFormat decimalFormat = new DecimalFormat(format);
    
                        value = decimalFormat.format(decimalValue);
                    }
                    break;
                case "BINARY":
                    value = line.getString(fieldName, 0);
                    break;
                default:
                    value = line.getString(fieldName, 0);
            }
            return value;
        }
    
    public static long getTimestampMillis(Binary timestampBinary)
        {
            if (timestampBinary.length() != 12) {
                return 0;
            }
            byte[] bytes = timestampBinary.getBytes();
    
            // little endian encoding - need to invert byte order
            long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
            int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);
    
            return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
        }
    
        private static long julianDayToMillis(int julianDay)
        {
            return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
        }
    
    
    static BigDecimal binaryToDecimal(int precision, int scale, byte[] bytes) {
            /*
             * Precision <= 18 checks for the max number of digits for an unscaled long,
             * else treat with big integer conversion
             */
            if (precision <= 18) {
    
                int start = 0;//buffer.arrayOffset() + buffer.position();
                int end = bytes.length; //buffer.arrayOffset() + buffer.limit();
                long unscaled = 0L;
                int i = start;
                while ( i < end ) {
                    unscaled = ( unscaled << 8 | bytes[i] & 0xff );
                    i++;
                }
                int bits = 8*(end - start);
                long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
                BigDecimal result;
                if (unscaledNew <= -pow(10,18) || unscaledNew >= pow(10,18)) {
                    result =  new BigDecimal(unscaledNew);
    //                System.out.println(result);
                } else {
                    result =  BigDecimal.valueOf(unscaledNew / pow(10,scale));
    //                System.out.println(result);
                }
                return result;
            } else {
                BigDecimal result =  new BigDecimal(new BigInteger(bytes), scale);
    //            System.out.println(result);
                return  result;
            }
        }

    parquet文件timestamp类型实际为INT96类型,decimal实际为FIXED_LEN_BYTE_ARRAY二进制类型,要想得到原来的数据,都需要进行转换,在网上很少能找到相关问题,希望对其他人有所帮助

  • 相关阅读:
    c++中的 三/五原则
    3. 无重复字符的最长子串
    c++中的单例模式
    bfs 以及 dfs 常用解题思路
    经济学的三个问题
    gtihub 上一些值得学习的项目
    994. 腐烂的橘子
    96. 不同的二叉搜索树
    some idea
    Libco协程库
  • 原文地址:https://www.cnblogs.com/qixing/p/11880262.html
Copyright © 2011-2022 走看看