zoukankan      html  css  js  c++  java
  • kafka offset 设置

    from kafka import KafkaConsumer
    from kafka import TopicPartition
    from kafka.structs import OffsetAndMetadata
    ...
    topic = 'your_topic'
    partition = 0
    tp = TopicPartition(topic,partition)
    kafkaConsumer = KafkaConsumer(config here...)
    kafkaConsumer.assign([tp])
    offset = 15394125
    kafkaConsumer.commit({
        tp: OffsetAndMetadata(offset, None)
    })
    
    meta = consumer.partitions_for_topic(topic)
    options = {}
    options[partition] = OffsetAndMetadata(message.offset, meta)
    consumer.commit(options)


    # a better way, remove assign partition manually, and extract partition info from kafka message
    topic_partition = TopicPartition(command_params["topic"], message.partition) 
    consumer.seek(topic_partition, offset_value) 
    consumer.commit()

    from: http://stackoverflow.com/questions/36579815/kafka-python-how-do-i-commit-a-partition

    如果consumer.commit()不可以,可以使用seek(),使用seek()时,如果有多个partition,需
    要为每个partition都手动进行consumer assign:

    topic_partition = TopicPartition("TOPIC_TEST", 1)
    # 格式为topic, partition, 1表示partition 1.
    consumer.assign([topic_partition])

    consumer.seek(topic_partition, 1660000)

    使用最下面的方法,不再需要手动指定partition,直接从message获取partition,更加灵活。 

     
  • 相关阅读:
    1203 forms组件
    1128 聚合查询 orm字段及属性
    1127 模型层orm表操作
    1126 视图层与模板层
    1122 django中orm操作
    1121 Django操作
    1125 视图层
    搭建并行开发环境MPICH2
    Linpack之HPL测试 (HPL Benchmark)
    安装NetCDF及HDF5
  • 原文地址:https://www.cnblogs.com/buxizhizhoum/p/6871663.html
Copyright © 2011-2022 走看看