zoukankan      html  css  js  c++  java
  • kafka consumer

    # coding:utf-8

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time

    consumer = KafkaConsumer('test',
    group_id='my-group', # 消费群组
    # auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest,
    # {'smallest': 'earliest', 'largest': 'latest'}
    auto_offset_reset='earlist',
    bootstrap_servers=['127.0.0.1:9092'])

    consumer.subscribe(topics=('test', 'test0')) # 订阅要消费的主题
    while True:
    msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
    print(msg)
    time.sleep(1)

    print(consumer.partitions_for_topic("test")) # 获取test主题的分区信息
    print(consumer.topics()) # 获取主题列表
    print(consumer.subscription()) # 获取当前消费者订阅的主题
    print(consumer.assignment()) # 获取当前消费者topic、分区信息
    print(consumer.beginning_offsets(consumer.assignment())) # 获取当前消费者可消费的偏移量
    consumer.seek(TopicPartition(topic=u'test', partition=0), 5) # 重置偏移量,从第5个偏移量消费

    num = 0
    while True:
    print(num)
    print(consumer.paused()) # 获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
    print("resume...")
    consumer.resume(TopicPartition(topic=u'test', partition=0))
    print("resume......")

    for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
  • 相关阅读:
    多线程实际应用踩坑
    SpringBoot远程接口调用-RestTemplate使用
    Python测试Post请求
    Ubuntu基于zsh自定义设置shell主题
    github-share报错无法读取远程仓库
    JVM是如何处理异常的
    springboot之父pom操作
    php 后台json 转 js数组
    tp 外连接查询
    原生关联查询语句
  • 原文地址:https://www.cnblogs.com/yaohu/p/12597079.html
Copyright © 2011-2022 走看看