zoukankan      html  css  js  c++  java
  • kafka的OffsetOutOfRangeError

    from kafka import KafkaClient, SimpleConsumer
    from sys import argv
    kafka = KafkaClient("10.0.1.100:6667")
    consumer = SimpleConsumer(kafka, "my-group", argv[1])
    consumer.max_buffer_size=0
    consumer.seek(0,2)
    for message in consumer:
     print("OFFSET: "+str(message[0])+"	 MSG: "+str(message[1][3]))

    Max Buffer Size

    There are two lines I wanted to focus on in particular.  The first is the “max_buffer_size” setting:

    consumer.max_buffer_size=0

    When subscribing to a topic with a high level of messages that have not been received before, the consumer/client can max out and fail.  Setting an infinite buffer size (zero) allows it to take everything that is available.

    If you kill and restart the script it will continue where it last left off, at the last offset that was received.  This is pretty cool but in some environments it has some trouble, so I changed the default by adding another line.

    Offset Out of Range Error

    As I regularly kill the servers running Kafka and the producers feeding it (yes, just for fun), things sometimes go a bit crazy, not entirely sure why but I got the error:

    kafka.common.OffsetOutOfRangeError: FetchResponse(topic='my_messages', partition=0, error=1, highwaterMark=-1, messages=)

    To fix it I added the “seek” setting:

    consumer.seek(0,2)
  • 相关阅读:
    安卓模拟器黑屏
    关系型数据库的1NF、2NF、3NF
    flowable数据库详解
    spring事务传播行为详解
    springboot整合activity
    各种java面试题目
    springCloud中增加gateway(超详细)
    mysql实现主从复制
    flowable整合springboot
    window安装linux系统
  • 原文地址:https://www.cnblogs.com/linn/p/4626540.html
Copyright © 2011-2022 走看看