zoukankan      html  css  js  c++  java
  • kafka consumer

    # coding:utf-8

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time

    consumer = KafkaConsumer('test',
    group_id='my-group', # 消费群组
    # auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest,
    # {'smallest': 'earliest', 'largest': 'latest'}
    auto_offset_reset='earlist',
    bootstrap_servers=['127.0.0.1:9092'])

    consumer.subscribe(topics=('test', 'test0')) # 订阅要消费的主题
    while True:
    msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
    print(msg)
    time.sleep(1)

    print(consumer.partitions_for_topic("test")) # 获取test主题的分区信息
    print(consumer.topics()) # 获取主题列表
    print(consumer.subscription()) # 获取当前消费者订阅的主题
    print(consumer.assignment()) # 获取当前消费者topic、分区信息
    print(consumer.beginning_offsets(consumer.assignment())) # 获取当前消费者可消费的偏移量
    consumer.seek(TopicPartition(topic=u'test', partition=0), 5) # 重置偏移量,从第5个偏移量消费

    num = 0
    while True:
    print(num)
    print(consumer.paused()) # 获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
    print("resume...")
    consumer.resume(TopicPartition(topic=u'test', partition=0))
    print("resume......")

    for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
  • 相关阅读:
    CCF201503-1 图像旋转(100分)
    CCF201509-1 数列分段(100分)
    CCF201509-1 数列分段(100分)
    JSP---使用checkbox实现多项删除
    JS---checkbox实现全选
    JSP---jsp页面获取物理路径
    JSP---根据值让某一Radio处于选中状态
    JSP---Myeclipse8.5使用Sql server数据库
    JSP---JSP学习笔记
    VS---解决VS2008专业版试用90天限制的方法
  • 原文地址:https://www.cnblogs.com/yaohu/p/12597079.html
Copyright © 2011-2022 走看看