zoukankan      html  css  js  c++  java
  • Flink 1.9 FlinkKafkaProducer 使用 EXACTLY_ONCE 错误记录

    使用flink FlinkKafkaProducer 往kafka写入数据的时候要求使用EXACTLY_ONCE语义
    本以为本以为按照官网写一个就完事,但是却报错了

    代码

    package com.meda.test
    
    import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaProducer, KafkaSerializationSchema}
    
    //创建一个DataStream
    val dStream: DataStream[MapDt] = ...
    
    //kafka配置
    val kafkaPro:Properties = ...
      
    //创建FlinkKafkaProducer 指定EXACTLY_ONCE
    val kafkaSink: FlinkKafkaProducer[ResultDt] = new FlinkKafkaProducer[ResultDt]("top[ic", new ResultDtSerialization("flink-topic-lillcol"), kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    
    
    case class ResultDt(id: String, date_h: String, star: String, end: String, watermark: String, pv: Long, uv: Long)
    
    class ResultDtSerialization(topic: String) extends KafkaSerializationSchema[ResultDt] {
      override def serialize(t: ResultDt, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord[Array[Byte], Array[Byte]](topic, t.toString.getBytes())
      }
    }
    

    遇到问题

    FlinkKafkaProducer.Semantic指定为FlinkKafkaProducer.Semantic.AT_LEAST_ONCE时,执行没有问题。

    FlinkKafkaProducer.Semantic指定为FlinkKafkaProducer.Semantic.EXACTLY_ONCE时,执行报下面的错误:

    org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
    	at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:984)
    	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
    	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
    	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
    	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
    	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
    	at java.lang.Thread.run(Thread.java:748)
    [INFO ] 2019-12-24 15:25:35,212 method:org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1324)
    

    错误大意是:
    事务超时大于broker允许的最大值(transaction.max.timeout.ms)
    一开始想都没想去修改transaction.max.timeout.ms的值,但是没有解决问题。


    解决办法

    官网关于Kafka Producers and Fault Tolerance有一段说明

    Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. 
    This property will not allow to set transaction timeouts for the producers larger than it’s value.
    FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.
    

    Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes
    生产者设置事务超时不允许大于这个值,这个属性不允许为大于其值的
    但是在默认的情况下,FlinkKafkaProducer011设置事务超时属性(transaction.timeout.ms)为1 hour, 超过默认transaction.max.timeout.ms15 minutes。
    因此在使用EXACTLY_ONCE语义的时候需要增大transaction.max.timeout.ms的值。

    按照个和说法我只要transaction.max.timeout.ms增加到大于1 hour(即3 600 000ms)‬以上就可以了,但是经过测试还是不行。
    最后通过修改生产者的事务超时属性transaction.timeout.ms解决问题

    transaction.timeout.ms从1hour降到5 minutes 成功解决问题。

    //增加配置属性操作如下:
    kafkaPro.setProperty("transaction.timeout.ms",1000*60*5+"")
    

    本文原创文章,转载请注明出处!!!

  • 相关阅读:
    集合的一些操作总结
    字符串的操作
    python字典的操作总结
    python中的列表知识总结
    Python利用文件操作实现用户名的存储登入操作
    如何理解:城市的“信息化→智能化→智慧化”
    程序员必备技能-怎样快速接手一个项目
    程序员的职业规划
    只要 8 个步骤,学会这个 Docker 命令终极教程!
    使用GitLab实现CI/CD
  • 原文地址:https://www.cnblogs.com/lillcol/p/12092869.html
Copyright © 2011-2022 走看看