zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(15)spark写parquet数据报错ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead

    spark 2.1.1

    spark里执行sql报错

    insert overwrite table test_parquet_table select * from dummy

    报错如下:

    org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:333)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
    at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121)
    at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:123)
    at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:42)
    at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:111)
    at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:124)
    at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:321)
    ... 8 more
    Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
    at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:244)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
    ... 16 more

    跟进代码

    org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter

        private void writeMap(Object value, MapObjectInspector inspector, GroupType type) {
            GroupType repeatedType = type.getType(0).asGroupType();
            this.recordConsumer.startGroup();
            this.recordConsumer.startField(repeatedType.getName(), 0);
            Map<?, ?> mapValues = inspector.getMap(value);
            Type keyType = repeatedType.getType(0);
            String keyName = keyType.getName();
            ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
            Type valuetype = repeatedType.getType(1);
            String valueName = valuetype.getName();
            ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
    
            for(Iterator i$ = mapValues.entrySet().iterator(); i$.hasNext(); this.recordConsumer.endGroup()) {
                Entry<?, ?> keyValue = (Entry)i$.next();
                this.recordConsumer.startGroup();
                if (keyValue != null) {
                    Object keyElement = keyValue.getKey();
                    this.recordConsumer.startField(keyName, 0);
                    this.writeValue(keyElement, keyInspector, keyType);
                    this.recordConsumer.endField(keyName, 0);
                    Object valueElement = keyValue.getValue();
                    if (valueElement != null) {
                        this.recordConsumer.startField(valueName, 1);
                        this.writeValue(valueElement, valueInspector, valuetype);
                        this.recordConsumer.endField(valueName, 1);
                    }
                }
            }
    
            this.recordConsumer.endField(repeatedType.getName(), 0);
            this.recordConsumer.endGroup();
        }
    
        private void writeValue(Object value, ObjectInspector inspector, Type type) {
            if (type.isPrimitive()) {
                this.checkInspectorCategory(inspector, Category.PRIMITIVE);
                this.writePrimitive(value, (PrimitiveObjectInspector)inspector);
            } else {
                GroupType groupType = type.asGroupType();
                OriginalType originalType = type.getOriginalType();
                if (originalType != null && originalType.equals(OriginalType.LIST)) {
                    this.checkInspectorCategory(inspector, Category.LIST);
                    this.writeArray(value, (ListObjectInspector)inspector, groupType);
                } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
                    this.checkInspectorCategory(inspector, Category.MAP);
                    this.writeMap(value, (MapObjectInspector)inspector, groupType);
                } else {
                    this.checkInspectorCategory(inspector, Category.STRUCT);
                    this.writeGroup(value, (StructObjectInspector)inspector, groupType);
                }
            }
    
        }
        
        private void writePrimitive(Object value, PrimitiveObjectInspector inspector) {
            if (value != null) {
                switch(inspector.getPrimitiveCategory()) {
                case VOID:
                    return;
                case DOUBLE:
                    this.recordConsumer.addDouble(((DoubleObjectInspector)inspector).get(value));
                    break;
                case BOOLEAN:
                    this.recordConsumer.addBoolean(((BooleanObjectInspector)inspector).get(value));
                    break;
                case FLOAT:
                    this.recordConsumer.addFloat(((FloatObjectInspector)inspector).get(value));
                    break;
                case BYTE:
                    this.recordConsumer.addInteger(((ByteObjectInspector)inspector).get(value));
                    break;
                case INT:
                    this.recordConsumer.addInteger(((IntObjectInspector)inspector).get(value));
                    break;
                case LONG:
                    this.recordConsumer.addLong(((LongObjectInspector)inspector).get(value));
                    break;
                case SHORT:
                    this.recordConsumer.addInteger(((ShortObjectInspector)inspector).get(value));
                    break;
                case STRING:
                    String v = ((StringObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addBinary(Binary.fromString(v));
                    break;
                case CHAR:
                    String vChar = ((HiveCharObjectInspector)inspector).getPrimitiveJavaObject(value).getStrippedValue();
                    this.recordConsumer.addBinary(Binary.fromString(vChar));
                    break;
                case VARCHAR:
                    String vVarchar = ((HiveVarcharObjectInspector)inspector).getPrimitiveJavaObject(value).getValue();
                    this.recordConsumer.addBinary(Binary.fromString(vVarchar));
                    break;
                case BINARY:
                    byte[] vBinary = ((BinaryObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addBinary(Binary.fromByteArray(vBinary));
                    break;
                case TIMESTAMP:
                    Timestamp ts = ((TimestampObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
                    break;
                case DECIMAL:
                    HiveDecimal vDecimal = (HiveDecimal)inspector.getPrimitiveJavaObject(value);
                    DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
                    this.recordConsumer.addBinary(this.decimalToBinary(vDecimal, decTypeInfo));
                    break;
                case DATE:
                    Date vDate = ((DateObjectInspector)inspector).getPrimitiveJavaObject(value);
                    this.recordConsumer.addInteger(DateWritable.dateToDays(vDate));
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
                }
    
            }
        }

    parquet.io.MessageColumnIO.MessageColumnIORecordConsumer

            public void startField(String field, int index) {
                try {
                    if (MessageColumnIO.DEBUG) {
                        this.log("startField(" + field + ", " + index + ")");
                    }
    
                    this.currentColumnIO = ((GroupColumnIO)this.currentColumnIO).getChild(index);
                    this.emptyField = true;
                    if (MessageColumnIO.DEBUG) {
                        this.printState();
                    }
    
                } catch (RuntimeException var4) {
                    throw new ParquetEncodingException("error starting field " + field + " at " + index, var4);
                }
            }
    
            public void endField(String field, int index) {
                if (MessageColumnIO.DEBUG) {
                    this.log("endField(" + field + ", " + index + ")");
                }
    
                this.currentColumnIO = this.currentColumnIO.getParent();
                if (this.emptyField) {
                    throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
                } else {
                    this.fieldsWritten[this.currentLevel].markWritten(index);
                    this.r[this.currentLevel] = this.currentLevel == 0 ? 0 : this.r[this.currentLevel - 1];
                    if (MessageColumnIO.DEBUG) {
                        this.printState();
                    }
    
                }
            }
            
            public void addInteger(int value) {
                if (MessageColumnIO.DEBUG) {
                    this.log("addInt(" + value + ")");
                }
    
                this.emptyField = false;
                this.getColumnWriter().write(value, this.r[this.currentLevel], this.currentColumnIO.getDefinitionLevel());
                this.setRepetitionLevel();
                if (MessageColumnIO.DEBUG) {
                    this.printState();
                }
    
            }

    DataWritableWriter报错的关键代码是这几行

                    Object keyElement = keyValue.getKey();
                    this.recordConsumer.startField(keyName, 0);
                    this.writeValue(keyElement, keyInspector, keyType);
                    this.recordConsumer.endField(keyName, 0);

    代码流程梳理如下:

    DataWritableWriter.writeMap

             MessageColumnIORecordConsumer.startField

                      注释:this.emptyField = true;

             迭代entry

                      处理key

                              Object keyElement = keyValue.getKey();

                              MessageColumnIORecordConsumer.startField

                              DataWritableWriter.writeValue

                                       DataWritableWriter.isPrimitive

                                                DataWritableWriter.writePrimitive

                                                         1)if (value == null) 或是Void

                                                                 注释:this.emptyField依旧为true

                                                         2)if (value != null) MessageColumnIORecordConsumer.addInteger

                                                                 注释:this.emptyField = false;

                              MessageColumnIORecordConsumer.endField

             MessageColumnIORecordConsumer.endField

                    注释:if (this.emptyField) {throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");}

    当map<?,?>或array<?>类型的列插入空集合或者map中存在key为null的情形时,就会触发这个错误,

    后来发现官方已经有讨论:https://issues.apache.org/jira/browse/HIVE-11625

    要避免这个问题有两种方式:

    1 改用hive执行sql;

    2 增加udf函数filter_map,当map为空集合时置为null,当map不为空集合时过滤掉map值中所有key为null的entry

    spark.udf.register("filter_map", ((map : Map[String, String]) => {if (map != null && !map.isEmpty) map.filter(_._1 != null) else null}))
  • 相关阅读:
    js 正则表达式 test match exec三个方法的异同
    网页使用MD5加密
    解决Google地图和字体api无法加载的问题(转)
    Javascript 的addEventListener()及attachEvent()区别分析
    get与post的区别
    清除浮动的几种方法
    zoom属性(IE私有属性)
    class,id和name的区别
    深夜偷精之反射函数
    jQuery和js区别
  • 原文地址:https://www.cnblogs.com/barneywill/p/10137235.html
Copyright © 2011-2022 走看看