zoukankan      html  css  js  c++  java
  • avro-1.8.1 serialize BigDecimal and Short error fix.

    1. create mysql table like

    CREATE TABLE `test` (
    `a` tinyint(4) NOT NULL DEFAULT '0',
    `b` decimal(12,0) DEFAULT NULL,
    `c` decimal(5,0) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

    2. start kafka connect using Debezium mysql plugin

    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "root",
        "database.server.id": "223344",
        "database.server.name": "localhost",
        "database.whitelist": "inventory",
        "table.whitelist":"inventory.test",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "include.schema.changes":"false",
        "transforms": "extractField",
        "transforms.extractField.type": "com.centchain.kafka.connect.mysql.DebeziumMysql$Value",
        "transforms.extractField.field": "after"
      }
    }

    3. get errors:

    [2019-05-09 15:23:10,310] INFO WorkerSourceTask{id=cashier-201905091402-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
    [2019-05-09 15:23:10,311] ERROR WorkerSourceTask{id=cashier-201905091402-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:269)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.avro.AvroRuntimeException: Unknown datum class: class java.lang.Short
        at org.apache.avro.util.internal.JacksonUtils.toJson(JacksonUtils.java:87)
        at org.apache.avro.util.internal.JacksonUtils.toJsonNode(JacksonUtils.java:48)
        at org.apache.avro.Schema$Field.<init>(Schema.java:423)
        at org.apache.avro.Schema$Field.<init>(Schema.java:415)
        at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:964)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:847)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroDa」

    4. fix

    file location: avro-release-1.8.1/lang/java/avro/src/main/java/org/apache/avro/util/internal/JacksonUtils.java

      1 /**
      2  * Licensed to the Apache Software Foundation (ASF) under one
      3  * or more contributor license agreements.  See the NOTICE file
      4  * distributed with this work for additional information
      5  * regarding copyright ownership.  The ASF licenses this file
      6  * to you under the Apache License, Version 2.0 (the
      7  * "License"); you may not use this file except in compliance
      8  * with the License.  You may obtain a copy of the License at
      9  *
     10  *     http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing, software
     13  * distributed under the License is distributed on an "AS IS" BASIS,
     14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15  * See the License for the specific language governing permissions and
     16  * limitations under the License.
     17  */
     18 package org.apache.avro.util.internal;
     19 
     20 import java.io.IOException;
     21 import java.io.UnsupportedEncodingException;
     22 import java.math.BigDecimal;
     23 import java.util.ArrayList;
     24 import java.util.Collection;
     25 import java.util.Iterator;
     26 import java.util.LinkedHashMap;
     27 import java.util.List;
     28 import java.util.Map;
     29 import org.apache.avro.AvroRuntimeException;
     30 import org.apache.avro.JsonProperties;
     31 import org.apache.avro.Schema;
     32 import org.codehaus.jackson.JsonGenerator;
     33 import org.codehaus.jackson.JsonNode;
     34 import org.codehaus.jackson.map.ObjectMapper;
     35 import org.codehaus.jackson.util.TokenBuffer;
     36 
     37 public class JacksonUtils {
     38   static final String BYTES_CHARSET = "ISO-8859-1";
     39 
     40   private JacksonUtils() {
     41   }
     42 
     43   public static JsonNode toJsonNode(Object datum) {
     44     if (datum == null) {
     45       return null;
     46     }
     47     try {
     48       TokenBuffer generator = new TokenBuffer(new ObjectMapper());
     49       toJson(datum, generator);
     50       return new ObjectMapper().readTree(generator.asParser());
     51     } catch (IOException e) {
     52       throw new AvroRuntimeException(e);
     53     }
     54   }
     55 
     56   @SuppressWarnings(value="unchecked")
     57   static void toJson(Object datum, JsonGenerator generator) throws IOException {
     58     if (datum == JsonProperties.NULL_VALUE) { // null
     59       generator.writeNull();
     60     } else if (datum instanceof Map) { // record, map
     61       generator.writeStartObject();
     62       for (Map.Entry<Object,Object> entry : ((Map<Object,Object>) datum).entrySet()) {
     63         generator.writeFieldName(entry.getKey().toString());
     64         toJson(entry.getValue(), generator);
     65       }
     66       generator.writeEndObject();
     67     } else if (datum instanceof Collection) { // array
     68       generator.writeStartArray();
     69       for (Object element : (Collection<?>) datum) {
     70         toJson(element, generator);
     71       }
     72       generator.writeEndArray();
     73     } else if (datum instanceof byte[]) { // bytes, fixed
     74       generator.writeString(new String((byte[]) datum, BYTES_CHARSET));
     75     } else if (datum instanceof CharSequence || datum instanceof Enum<?>) { // string, enum
     76       generator.writeString(datum.toString());
     77     } else if (datum instanceof Double) { // double
     78       generator.writeNumber((Double) datum);
     79     } else if (datum instanceof Float) { // float
     80       generator.writeNumber((Float) datum);
     81     } else if (datum instanceof Long) { // long
     82       generator.writeNumber((Long) datum);
     83     } else if (datum instanceof Integer) { // int
     84       generator.writeNumber((Integer) datum);
     85     }else if ( datum instanceof  Short) {  // short
     86       generator.writeNumber(new Integer(datum.toString()));
     87     }else if (datum instanceof Boolean) { // boolean
     88       generator.writeBoolean((Boolean) datum);
     89     }
     90     else if (datum instanceof BigDecimal){
     91       generator.writeNumber((BigDecimal) datum);
     92     } else {
     93       throw new AvroRuntimeException("Unknown datum class: " + datum.getClass());
     94     }
     95   }
     96 
     97   public static Object toObject(JsonNode jsonNode) {
     98     return toObject(jsonNode, null);
     99   }
    100 
    101   public static Object toObject(JsonNode jsonNode, Schema schema) {
    102     if (schema != null && schema.getType().equals(Schema.Type.UNION)) {
    103       return toObject(jsonNode, schema.getTypes().get(0));
    104     }
    105     if (jsonNode == null) {
    106       return null;
    107     } else if (jsonNode.isNull()) {
    108       return JsonProperties.NULL_VALUE;
    109     } else if (jsonNode.isBoolean()) {
    110       return jsonNode.asBoolean();
    111     } else if (jsonNode.isInt()) {
    112       if (schema == null || schema.getType().equals(Schema.Type.INT)) {
    113         return jsonNode.asInt();
    114       } else if (schema.getType().equals(Schema.Type.LONG)) {
    115         return jsonNode.asLong();
    116       }
    117     }else if (jsonNode.isBigDecimal()){
    118       return jsonNode.asDouble();
    119     }else if (jsonNode.isLong()) {
    120       return jsonNode.asLong();
    121     } else if (jsonNode.isDouble()) {
    122       if (schema == null || schema.getType().equals(Schema.Type.DOUBLE)) {
    123         return jsonNode.asDouble();
    124       } else if (schema.getType().equals(Schema.Type.FLOAT)) {
    125         return (float) jsonNode.asDouble();
    126       }
    127     } else if (jsonNode.isTextual()) {
    128       if (schema == null || schema.getType().equals(Schema.Type.STRING) ||
    129           schema.getType().equals(Schema.Type.ENUM)) {
    130         return jsonNode.asText();
    131       } else if (schema.getType().equals(Schema.Type.BYTES)) {
    132         try {
    133           return jsonNode.getTextValue().getBytes(BYTES_CHARSET);
    134         } catch (UnsupportedEncodingException e) {
    135           throw new AvroRuntimeException(e);
    136         }
    137       }
    138     } else if (jsonNode.isArray()) {
    139       List l = new ArrayList();
    140       for (JsonNode node : jsonNode) {
    141         l.add(toObject(node, schema == null ? null : schema.getElementType()));
    142       }
    143       return l;
    144     } else if (jsonNode.isObject()) {
    145       Map m = new LinkedHashMap();
    146       for (Iterator<String> it = jsonNode.getFieldNames(); it.hasNext(); ) {
    147         String key = it.next();
    148         Schema s = null;
    149         if (schema == null) {
    150           s = null;
    151         } else if (schema.getType().equals(Schema.Type.MAP)) {
    152           s = schema.getValueType();
    153         } else if (schema.getType().equals(Schema.Type.RECORD)) {
    154           s = schema.getField(key).schema();
    155         }
    156         Object value = toObject(jsonNode.get(key), s);
    157         m.put(key, value);
    158       }
    159       return m;
    160     }
    161     return null;
    162   }
    163 }
    View Code

     The key is in

    line 85-86 which fix error for short

    line 90-91,117-118  which fix error for BigDecimal

    5. result:

    5.1  mysql -> kafka

    lenmom@M1701:~/workspace/software/confluent-community-5.1.0-2.11$ bin/kafka-avro-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning   --topic localhost.a.test
    {"a":1,"b":{"bytes":"u0001"},"c":{"bytes":"u0001"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
    {"a":1,"b":{"bytes":"u0001"},"c":{"bytes":"u0002"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
    {"a":1,"b":{"bytes":"u0001"},"c":{"bytes":"u0003"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}
    {"a":1,"b":{"bytes":"u0001"},"c":{"bytes":"u0004"},"operation_type":"c","pt_log_d":"20190513","last_update_timestamp":1557676877029}

    5.2 kafka-hive

    command config for connector:

    {
      "name": "hive-sink",
      "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "1",
        "topics": "localhost.a.test",
        "hdfs.url": "hdfs://127.0.0.1:9000/",
        "logs.dir": "/logs",
        "topics.dir": "/inventory/",
        "hadoop.conf.dir": "/home/lenmom/workspace/software/hadoop-2.7.3/etc/hadoop/",
        "flush.size": "1",
        "rotate.interval.ms": "5000",
        "hive.integration": true,
        "hive.database": "inventory",
        "partitioner.class":"io.confluent.connect.hdfs.partitioner.FieldPartitioner",
        "partition.field.name":"pt_log_d",
        "hive.metastore.uris": "thrift://127.0.0.1:9083",
        "schema.compatibility": "BACKWARD"
      }
    }

    result:

    hive> select * from localhost_a_test;
    OK
    1       1       1       c       20190513        2019-05-13 00:01:17.029
    1       1       2       c       20190513        2019-05-13 00:01:17.029
    1       1       3       c       20190513        2019-05-13 00:01:17.029
    1       1       4       c       20190513        2019-05-13 00:01:17.029
    Time taken: 0.168 seconds, Fetched: 4 row(s)

    6. for table schema

     CREATE TABLE `decimalTest` (
      `POINTSDAY` decimal(12,0) NOT NULL DEFAULT '0' ,
      `POINTSMONTH` decimal(12,0) NOT NULL DEFAULT '0' ,
      `CASHDAY` decimal(12,0) NOT NULL DEFAULT '0' ,
      `CASHMONTH` decimal(12,0) NOT NULL DEFAULT '0' 
    ) 
    
    insert into decimalTest values(1,2,3);

    if we use hdfs-connector to sink to hive, we would get error like

    [2019-05-16 11:35:18,623] ERROR WorkerSinkTask{id=hive-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
            at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
            at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1623)
            at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1500)
            at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1368)
            at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1080)
            at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:89)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
            ... 13 more
    Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: BYTES
            at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
            at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
            at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
            ... 21 more
    [2019-05-16 11:35:18,624] ERROR WorkerSinkTask{id=hive-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

    which is caused by serialization decimal using kafka-avro-convertor error.

    To fix this error, I added logic 

    6.1  /schema-registry-5.1.0/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java

      1 private Object defaultValueFromAvro(Schema schema,
      2                                       org.apache.avro.Schema avroSchema,
      3                                       Object value,
      4                                       ToConnectContext toConnectContext) {
      5     // The type will be JsonNode if this default was pulled from a Connect default field, or an
      6     // Object if it's the actual Avro-specified default. If it's a regular Java object, we can
      7     // use our existing conversion tools.
      8     if (!(value instanceof JsonNode)) {
      9       return toConnectData(schema, value, toConnectContext);
     10     }
     11 
     12     JsonNode jsonValue = (JsonNode) value;
     13     switch (avroSchema.getType()) {
     14       case INT:
     15         if (schema.type() == Schema.Type.INT8) {
     16           return (byte) jsonValue.getIntValue();
     17         } else if (schema.type() == Schema.Type.INT16) {
     18           return (short) jsonValue.getIntValue();
     19         } else if (schema.type() == Schema.Type.INT32) {
     20           return jsonValue.getIntValue();
     21         } else {
     22           break;
     23         }
     24 
     25       case LONG:
     26         return jsonValue.getLongValue();
     27 
     28       case FLOAT:
     29         return (float) jsonValue.getDoubleValue();
     30       case DOUBLE:
     31         return jsonValue.getDoubleValue();
     32 
     33       case BOOLEAN:
     34         return jsonValue.asBoolean();
     35 
     36       case NULL:
     37         return null;
     38 
     39       case STRING:
     40       case ENUM:
     41         return jsonValue.asText();
     42 
     43       case BYTES:
     44           return jsonValue.getDecimalValue();
     45       case FIXED:
     46         try {
     47           return jsonValue.getBinaryValue();
     48         } catch (IOException e) {
     49           throw new DataException("Invalid binary data in default value");
     50         }
     51 //        return convertIntegerToBytes(jsonValue.getIntValue());
     52 //        return jsonValue.getIntValue();
     53 
     54       case ARRAY: {
     55         if (!jsonValue.isArray()) {
     56           throw new DataException("Invalid JSON for array default value: " + jsonValue.toString());
     57         }
     58         List<Object> result = new ArrayList<>(jsonValue.size());
     59         for (JsonNode elem : jsonValue) {
     60           result.add(
     61               defaultValueFromAvro(schema, avroSchema.getElementType(), elem, toConnectContext));
     62         }
     63         return result;
     64       }
     65 
     66       case MAP: {
     67         if (!jsonValue.isObject()) {
     68           throw new DataException("Invalid JSON for map default value: " + jsonValue.toString());
     69         }
     70         Map<String, Object> result = new HashMap<>(jsonValue.size());
     71         Iterator<Map.Entry<String, JsonNode>> fieldIt = jsonValue.getFields();
     72         while (fieldIt.hasNext()) {
     73           Map.Entry<String, JsonNode> field = fieldIt.next();
     74           Object converted = defaultValueFromAvro(
     75               schema, avroSchema.getElementType(), field.getValue(), toConnectContext);
     76           result.put(field.getKey(), converted);
     77         }
     78         return result;
     79       }
     80 
     81       case RECORD: {
     82         if (!jsonValue.isObject()) {
     83           throw new DataException("Invalid JSON for record default value: " + jsonValue.toString());
     84         }
     85 
     86         Struct result = new Struct(schema);
     87         for (org.apache.avro.Schema.Field avroField : avroSchema.getFields()) {
     88           Field field = schema.field(avroField.name());
     89           JsonNode fieldJson = ((JsonNode) value).get(field.name());
     90           Object converted = defaultValueFromAvro(
     91               field.schema(), avroField.schema(), fieldJson, toConnectContext);
     92           result.put(avroField.name(), converted);
     93         }
     94         return result;
     95       }
     96 
     97       case UNION: {
     98         // Defaults must match first type
     99         org.apache.avro.Schema memberAvroSchema = avroSchema.getTypes().get(0);
    100         if (memberAvroSchema.getType() == org.apache.avro.Schema.Type.NULL) {
    101           return null;
    102         } else {
    103           return defaultValueFromAvro(
    104               schema.field(unionMemberFieldName(memberAvroSchema)).schema(),
    105               memberAvroSchema,
    106               value,
    107               toConnectContext);
    108         }
    109       }
    110       default: {
    111         return null;
    112       }
    113     }
    114     return null;
    115   }

    after the fix, rebuild the jar and replace the file kafka-connect-avro-converter-5.1.0.jar  in confluent kafka installation dir.

    then the data should be able to sink to hive now. 

    hive> select * from decimalTest limit 10;
    [WARNING] Avro: Invalid default for field POINTSDAY: 0 not a {"type":"bytes","scale":0,"precision":12,"connect.version":1,"connect.parameters":{"scale":"0","connect.decimal.precision":"12"},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
    [WARNING] Avro: Invalid default for field POINTSMONTH: 0 not a {"type":"bytes","scale":0,"precision":12,"connect.version":1,"connect.parameters":{"scale":"0","connect.decimal.precision":"12"},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
    [WARNING] Avro: Invalid default for field CASHDAY: 0 not a {"type":"bytes","scale":0,"precision":12,"connect.version":1,"connect.parameters":{"scale":"0","connect.decimal.precision":"12"},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
    [WARNING] Avro: Invalid default for field CASHMONTH: 0 not a {"type":"bytes","scale":0,"precision":12,"connect.version":1,"connect.parameters":{"scale":"0","connect.decimal.precision":"12"},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
    40      40      0       0        
    1394    1394    0       0      
    1500    1500    0       0       
    10      10      0       0       

    6.2  as we ca see from above, there's warnings if we query data in hive, to elemiate the warnings

    /avro-release-1.8.1/lang/java/avro/src/main/java/org/apache/avro/Schema.java

     1 private static boolean isValidDefault(Schema schema, JsonNode defaultValue) {
     2     if (defaultValue == null)
     3       return false;
     4     switch (schema.getType()) {
     5 
     6     case BYTES:
     7       if (schema.logicalType.getName().equals("decimal")||
     8           schema.logicalType.getName().toLowerCase().equals("bigdecimal")){
     9         return  defaultValue.isBigDecimal();
    10       }
    11       else{
    12         return defaultValue.isTextual();
    13       }
    14     case STRING:
    15     case ENUM:
    16     case FIXED:
    17       return defaultValue.isTextual();
    18     case INT:
    19     case LONG:
    20     case FLOAT:
    21     case DOUBLE:
    22       return defaultValue.isNumber();
    23     case BOOLEAN:
    24       return defaultValue.isBoolean();
    25     case NULL:
    26       return defaultValue.isNull();
    27     case ARRAY:
    28       if (!defaultValue.isArray())
    29         return false;
    30       for (JsonNode element : defaultValue)
    31         if (!isValidDefault(schema.getElementType(), element))
    32           return false;
    33       return true;
    34     case MAP:
    35       if (!defaultValue.isObject())
    36         return false;
    37       for (JsonNode value : defaultValue)
    38         if (!isValidDefault(schema.getValueType(), value))
    39           return false;
    40       return true;
    41     case UNION:                                   // union default: first branch
    42       return isValidDefault(schema.getTypes().get(0), defaultValue);
    43     case RECORD:
    44       if (!defaultValue.isObject())
    45         return false;
    46       for (Field field : schema.getFields())
    47         if (!isValidDefault(field.schema(),
    48                             defaultValue.has(field.name())
    49                             ? defaultValue.get(field.name())
    50                             : field.defaultValue()))
    51           return false;
    52       return true;
    53     default:
    54       return false;
    55     }
    56   }

    after the fix, replace the jar file in $HIVE_HOME and $CONFLUENT_KAFKA_HOME installation dir.

  • 相关阅读:
    Digital Video Stabilization and Rolling Shutter Correction using Gyroscope 论文笔记
    Distortion-Free Wide-Angle Portraits on Camera Phones 论文笔记
    Panorama Stitching on Mobile
    Natural Image Stitching with the Global Similarity Prior 论文笔记 (三)
    Natural Image Stitching with the Global Similarity Prior 论文笔记(二)
    Natural Image Stitching with the Global Similarity Prior 论文笔记(一)
    ADCensus Stereo Matching 笔记
    Efficient Large-Scale Stereo Matching论文解析
    Setting up caffe on Ubuntu
    Kubernetes配置Secret访问Harbor私有镜像仓库
  • 原文地址:https://www.cnblogs.com/lenmom/p/10854444.html
Copyright © 2011-2022 走看看