zoukankan      html  css  js  c++  java
  • python+kafka,从指定位置消费数据

    # @staticmethod
    def get_kafka_reviews(self):
    # print type(self.bootstrap_servers)
    consumer = kafka.KafkaConsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=False)
    consumer.subscribe(topics=(self.topics)) #订阅要消费的主题

    # print consumer.topics()
    # print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #获取当前主题的最新偏移量

    review_list =[]
    for message in consumer:
    print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
    review_list.append(message.value)
    if len(review_list)==self.num: #先取100条来消费
    break
    return review_list




    解释:
    consumer = kafka.KafkaConsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=False)
    自动提交位移设为flase, 默认为取最新的偏移量,重新建立一个guou_id,这样就实现了不影响别的应用程序消费数据,又能消费到最新数据,实现预警(先于用户发现)的目的。
  • 相关阅读:
    HDU_2191_多重背包
    HDU_1494_dp
    POJ_1088_dfs
    所有的畅通工程[HDU1232][HDU1874][HDU1875][HDU1879]
    畅通工程[HDU1863]
    还是畅通工程[HDU1233]
    最小生成树
    Who's in the Middle[HDU1157]
    Bungee Jumping[HDU1155]
    Is It A Tree?[HDU1325][PKU1308]
  • 原文地址:https://www.cnblogs.com/yoyoma0355/p/9227440.html
Copyright © 2011-2022 走看看