zoukankan      html  css  js  c++  java
  • FlinkSQL/TableAPI 添加配置

    在使用FlinkSQL是向sink表中某个非空字段中插入了null值,收到异常信息

    org.apache.flink.table.api.TableException: Column 'sub_id' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and drop such records silently.
        at org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:58)
        at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:71)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at StreamExecCalc$692.processElement(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)

    解决方式:

     执行后SQL语句能正常运行。类似的参数设置

    SET execution.result-mode=table;
    
    SET execution.result-mode=tableau;
    
    SET table.sql-dialect=hive;

    除了在SQL中使用这种设定方式,在TalbeAPI中可以可以设定一些参数:

    // instantiate table environment
    val tEnv: TableEnvironment = ...
    
    // access flink configuration
    val configuration = tEnv.getConfig().getConfiguration()
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true")
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
    configuration.setString("table.exec.mini-batch.size", "5000")

    参考:

    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/query_configuration.html

    https://www.cnblogs.com/qiu-hua/p/14053999.html

  • 相关阅读:
    useState 的介绍和多状态声明(二)
    PHP:相对于C#,PHP中的个性化语法
    PHP:IIS下的PHP开发环境搭建
    PHP:同一件事,有太多的方式
    Javascript:再论Javascript的单线程机制 之 DOM渲染时机
    Javascript:拦截所有AJAX调用,重点处理服务器异常
    DDD:谈谈数据模型、领域模型、视图模型和命令模型
    .NET:再论异常处理,一个真实的故事
    Javascript:由 “鸭子类型” 得出来的推论
    Workflow:采用坐标变换(移动和旋转)画箭头
  • 原文地址:https://www.cnblogs.com/yoyowin/p/14899227.html
Copyright © 2011-2022 走看看