zoukankan      html  css  js  c++  java
  • kafka:enable.auto.commit

    一、背景

    项目中有一个需求,是通过消费kafka的消息来处理数据,但是想要实现延迟消费的效果,于是想到了是否可以自己管理kafka的commit来实现,就是通过设置`enable.auto.commit`为False,预期是如果消费到了消息,但是不commit,kafka就会重新把消息放回队列,后续还会再次消费到,直到超过设置的延迟时间再真正消费并commit。

    于是写了个demo来验证,结果发现这个配置的效果并不是自己想要的。

    二、生产者

    生产者每秒钟向kafka的topic发送一条消息。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    
    from confluent_kafka import Producer, KafkaError
    from confluent_kafka import TopicPartition
    from confluent_kafka import OFFSET_BEGINNING
    
    p = Producer({'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094'})
    
    topic = 'nico-test'
    msg_tpl = 'hello kafka:{0}'
    
    while True:
        msg = msg_tpl.format(time.time())
        p.produce(topic, msg)
        print('Produce msg:{0}'.format(msg))
        time.sleep(1)
    
    p.flush()

    三、消费者

    消费者设置了配置项enable.auto.commit:False。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    
    from confluent_kafka import Consumer, KafkaError
    from confluent_kafka import TopicPartition
    from confluent_kafka import OFFSET_BEGINNING
    
    c = Consumer({
        'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094', 
        'group.id':'nico-test', 
        'auto.offset.reset':'earliest', 
        'enable.auto.commit':False
    })
    
    topic = 'nico-test'
    
    c.subscribe([topic])
    
    cd = c.list_topics()
    print(cd.cluster_id)
    print(cd.controller_id)
    print(cd.brokers)
    print(cd.topics)
    print(cd.orig_broker_id)
    print(cd.orig_broker_name)
    
    while True:
        msg = c.poll(1.0)
        if msg is None:
            continue
    
        print('topic:{topic}, partition:{partition}, offset:{offset}, headers:{headers}, key:{key}, msg:{msg}, timestamp:{timestamp}'.format(topic=msg.topic(), msg=msg.value(), headers=msg.headers(), key=msg.key(), offset=msg.offset(), partition=msg.partition(), timestamp=msg.timestamp()))

    四、结果

    结果是consumer启动后会一直顺序的消费消息,并且并不会把消息重放到队列中,但是当consumer被kill掉重启时,每次都是从最开始的时候消费的,所以总结一下,该配置项的作用是当配置为true时,每次获取到消息后就会自动更新存储在zookepper中的offset值。

    最后自己也想了一下,这里不支持延迟消费的原因其实和kafka的实现原理有很大的关系,kafka是直接把消息存储在磁盘文件中的,如果想要实现重放(支持延迟消费)那么就需要把该消息从消息队列中删除,然后重新插入到消息队列,那这样就跟kafka的设计相违背了。

  • 相关阅读:
    Flutter 路由管理
    SpringMVC 集成 MyBatis
    关于windows下安装mysql数据库出现中文乱码的问题
    md5.digest()与md5.hexdigest()之间的区别及转换
    MongoDB基础命令及操作
    redis相关操作&基本命令使用
    python中mysql主从同步配置的方法
    shell入门基础&常见命令及用法
    ORM总结
    多任务:进程、线程、协程总结及关系
  • 原文地址:https://www.cnblogs.com/lit10050528/p/12105297.html
Copyright © 2011-2022 走看看