zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(15)spark写parquet数据报错ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead

    spark 2.1.1

    spark里执行sql报错

    insert overwrite table test_parquet_table select * from dummy

    报错如下:

    org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:333)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
    at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121)
    at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:123)
    at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:42)
    at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:111)
    at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:124)
    at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:321)
    ... 8 more
    Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
    at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:244)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
    ... 16 more

    跟进代码

    org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter

        private void writeMap(Object value, MapObjectInspector inspector, GroupType type) {
            GroupType repeatedType = type.getType(0).asGroupType();
            this.recordConsumer.startGroup();
            this.recordConsumer.startField(repeatedType.getName(), 0);
            Map<?, ?> mapValues = inspector.getMap(value);
            Type keyType = repeatedType.getType(0);
            String keyName = keyType.getName();
            ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
            Type valuetype = repeatedType.getType(1);
            String valueName = valuetype.getName();
            ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
    
            for(Iterator i$ = mapValues.entrySet().iterator(); i$.hasNext(); this.recordConsumer.endGroup()) {
                Entry<?, ?> keyValue = (Entry)i$.next();
                this.recordConsumer.startGroup();
                if (keyValue != null) {
                    Object keyElement = keyValue.getKey();
                    this.recordConsumer.startField(keyName, 0);
                    this.writeValue(keyElement, keyInspector, keyType);
                    this.recordConsumer.endField(keyName, 0);
                    Object valueElement = keyValue.getValue();
                    if (valueElement != null) {
                        this.recordConsumer.startField(valueName, 1);
                        this.writeValue(valueElement, valueInspector, valuetype);
                        this.recordConsumer.endField(valueName, 1);
                    }
                }
            }
    
            this.recordConsumer.endField(repeatedType.getName(), 0);
            this.recordConsumer.endGroup();
        }
    
        private void writeValue(Object value, ObjectInspector inspector, Type type) {
            if (type.isPrimitive()) {
                this.checkInspectorCategory(inspector, Category.PRIMITIVE);
                this.writePrimitive(value, (PrimitiveObjectInspector)inspector);
            } else {
                GroupType groupType = type.asGroupType();
                OriginalType originalType = type.getOriginalType();
                if (originalType != null && originalType.equals(OriginalType.LIST)) {
                    this.checkInspectorCategory(inspector, Category.LIST);
                    this.writeArray(value, (ListObjectInspector)inspector, groupType);
                } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
                    this.checkInspectorCategory(inspector, Category.MAP);
                    this.writeMap(value, (MapObjectInspector)inspector, groupType);
                } else {
                    this.checkInspectorCategory(inspector, Category.STRUCT);
                    this.writeGroup(value, (StructObjectInspector)inspector, groupType);
                }
            }
    
        }
        
        private void writePrimitive(Object value, PrimitiveObjectInspector inspector) {
            if (value != null) {
                switch(inspector.getPrimitiveCategory()) {
                case VOID:
                    return;
                case DOUBLE:
                    this.recordConsumer.addDouble(((DoubleObjectInspector)inspector).get(value));
                    break;
                case BOOLEAN:
                    this.recordConsumer.addBoolean(((BooleanObjectInspector)inspector).get(value));
                    break;
                case FLOAT:
                    this.recordConsumer.addFloat(((FloatObjectInspector)inspector).get(value));
                    break;
                case BYTE:
                    this.recordConsumer.addInteger(((ByteObjectInspector)inspector).get(value));
                    break;
                case INT:
                    this.recordConsumer.addInteger(((IntObjectInspector)inspector).get(value));
                    break;
                case LONG:
                    this.recordConsumer.addLong(((LongObjectInspector)inspector).get(value));
                    break;
                case SHORT:
                    this.recordConsumer.addInteger(((ShortObjectInspector)inspector).get(value));
                    break;
                case STRING:
                    String v = ((StringObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addBinary(Binary.fromString(v));
                    break;
                case CHAR:
                    String vChar = ((HiveCharObjectInspector)inspector).getPrimitiveJavaObject(value).getStrippedValue();
                    this.recordConsumer.addBinary(Binary.fromString(vChar));
                    break;
                case VARCHAR:
                    String vVarchar = ((HiveVarcharObjectInspector)inspector).getPrimitiveJavaObject(value).getValue();
                    this.recordConsumer.addBinary(Binary.fromString(vVarchar));
                    break;
                case BINARY:
                    byte[] vBinary = ((BinaryObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addBinary(Binary.fromByteArray(vBinary));
                    break;
                case TIMESTAMP:
                    Timestamp ts = ((TimestampObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
                    break;
                case DECIMAL:
                    HiveDecimal vDecimal = (HiveDecimal)inspector.getPrimitiveJavaObject(value);
                    DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
                    this.recordConsumer.addBinary(this.decimalToBinary(vDecimal, decTypeInfo));
                    break;
                case DATE:
                    Date vDate = ((DateObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addInteger(DateWritable.dateToDays(vDate));
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
                }
    
            }
        }

    parquet.io.MessageColumnIO.MessageColumnIORecordConsumer

            public void startField(String field, int index) {
                try {
                    if (MessageColumnIO.DEBUG) {
                        this.log("startField(" + field + ", " + index + ")");
                    }
    
                    this.currentColumnIO = ((GroupColumnIO)this.currentColumnIO).getChild(index);
                    this.emptyField = true;
                    if (MessageColumnIO.DEBUG) {
                        this.printState();
                    }
    
                } catch (RuntimeException var4) {
                    throw new ParquetEncodingException("error starting field " + field + " at " + index, var4);
                }
            }
    
            public void endField(String field, int index) {
                if (MessageColumnIO.DEBUG) {
                    this.log("endField(" + field + ", " + index + ")");
                }
    
                this.currentColumnIO = this.currentColumnIO.getParent();
                if (this.emptyField) {
                    throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
                } else {
                    this.fieldsWritten[this.currentLevel].markWritten(index);
                    this.r[this.currentLevel] = this.currentLevel == 0 ? 0 : this.r[this.currentLevel - 1];
                    if (MessageColumnIO.DEBUG) {
                        this.printState();
                    }
    
                }
            }
            
            public void addInteger(int value) {
                if (MessageColumnIO.DEBUG) {
                    this.log("addInt(" + value + ")");
                }
    
                this.emptyField = false;
                this.getColumnWriter().write(value, this.r[this.currentLevel], this.currentColumnIO.getDefinitionLevel());
                this.setRepetitionLevel();
                if (MessageColumnIO.DEBUG) {
                    this.printState();
                }
    
            }

    DataWritableWriter报错的关键代码是这几行

                    Object keyElement = keyValue.getKey();
                    this.recordConsumer.startField(keyName, 0);
                    this.writeValue(keyElement, keyInspector, keyType);
                    this.recordConsumer.endField(keyName, 0);

    代码流程梳理如下:

    DataWritableWriter.writeMap

             MessageColumnIORecordConsumer.startField

                      注释:this.emptyField = true;

             迭代entry

                      处理key

                              Object keyElement = keyValue.getKey();

                              MessageColumnIORecordConsumer.startField

                              DataWritableWriter.writeValue

                                       DataWritableWriter.isPrimitive

                                                DataWritableWriter.writePrimitive

                                                         1)if (value == null) 或是Void

                                                                 注释:this.emptyField依旧为true

                                                         2)if (value != null) MessageColumnIORecordConsumer.addInteger

                                                                 注释:this.emptyField = false;

                              MessageColumnIORecordConsumer.endField

             MessageColumnIORecordConsumer.endField

                    注释:if (this.emptyField) {throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");}

    当map<?,?>或array<?>类型的列插入空集合或者map中存在key为null的情形时,就会触发这个错误,

    后来发现官方已经有讨论:https://issues.apache.org/jira/browse/HIVE-11625

    要避免这个问题有两种方式:

    1 改用hive执行sql;

    2 增加udf函数filter_map,当map为空集合时置为null,当map不为空集合时过滤掉map值中所有key为null的entry

    spark.udf.register("filter_map", ((map : Map[String, String]) => {if (map != null && !map.isEmpty) map.filter(_._1 != null) else null}))
  • 相关阅读:
    [译文] 实体与值对象到底是不是一回事?
    实现 WebApi 自托管服务宿主于 WinForms 及其交互
    [译文] C# 8 已成旧闻, 向前, 抵达 C# 9!
    [译文] 为什么你在 C# 里总是应该使用 "var" 关键字
    通过设置iis在局域网中访问网页
    windows 10 安装使用kafka
    ASP.NET Core 2.1 中的 HttpClientFactory (Part 4) 整合Polly实现瞬时故障处理
    ASP.NET Core 2.1 中的 HttpClientFactory (Part 3) 使用Handler实现传出请求中间件
    ASP.NET Core 2.1 中的 HttpClientFactory (Part 2) 定义命名化和类型化的客户端
    Asp.net Core 2.0 OpenId Connect Handler缺失Claims?
  • 原文地址:https://www.cnblogs.com/barneywill/p/10137235.html
Copyright © 2011-2022 走看看