zoukankan      html  css  js  c++  java
  • python+kafka,从指定位置消费数据

    # @staticmethod
    def get_kafka_reviews(self):
    # print type(self.bootstrap_servers)
    consumer = kafka.KafkaConsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=False)
    consumer.subscribe(topics=(self.topics)) #订阅要消费的主题

    # print consumer.topics()
    # print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #获取当前主题的最新偏移量

    review_list =[]
    for message in consumer:
    print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
    review_list.append(message.value)
    if len(review_list)==self.num: #先取100条来消费
    break
    return review_list




    解释:
    consumer = kafka.KafkaConsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=False)
    自动提交位移设为flase, 默认为取最新的偏移量,重新建立一个guou_id,这样就实现了不影响别的应用程序消费数据,又能消费到最新数据,实现预警(先于用户发现)的目的。
  • 相关阅读:
    java面向对象
    Mysql 用户管理
    Mysql备份数据库
    Java方法&面向对象习题
    Mysql 用户管理
    java 方法笔记
    事物与存储过程
    多表操作
    视图 sql语句
    mongodb
  • 原文地址:https://www.cnblogs.com/yoyoma0355/p/9227440.html
Copyright © 2011-2022 走看看