zoukankan      html  css  js  c++  java
  • python编写producer、consumer

    自主producer、consumer

    • 首先在不同的终端,分别开启两个consumer,保证groupid一致

      ]# python consumer_kafka.py
    • 执行一次producer

      ]# python producer_kafka.py
    • 指定key的partition进行发送信息:

      from kafka import KafkaProducer
      ​
      producer = KafkaProducer(bootstrap_servers='localhost:9092')
      ​
      # # block until all pending messages are sent
      # for _ in range(10):
      #     producer.send('test_m_brokers', b'are you ok!!!')
      # 
      # producer.flush()
      ​
      ​
      # key for hashed partitioning
      producer.send('zhongqiu_many_brokers', key=b'8', value=b'aaa')
      producer.flush()
    • 指定partition和offset读数据

    #encoding=utf8
    from kafka import KafkaConsumer
    from kafka import TopicPartition
    from kafka.structs import OffsetAndMetadata
    from kafka.structs import TopicPartition
    
    def main():
        consumer = KafkaConsumer('zhongqiu_many_brokers', bootstrap_servers=['master:9092'])
        print consumer.partitions_for_topic("zhongqiu_many_brokers")
        print consumer.topics()  #获取主题列表
        print consumer.subscription()  #获取当前消费者订阅的主题
        print consumer.assignment()  #获取当前消费者topic、分区信息
        print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
    
        consumer.seek(TopicPartition(topic=u'zhongqiu_many_brokers', partition=0), 10)  #重置偏移量
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                message.offset, message.key,
                message.value))
    
    if __name__ == "__main__":
        main()
  • 相关阅读:
    C 语言指针小结
    NOIP2012 复赛考生须知!
    2012 NOIP 初赛复习指导
    16元的纸币
    记一次社会化的钓鱼攻击
    福州大学ACM代表队获36届ACMICPC全球总决赛第18名
    福州教育相关教育资源介绍
    世界末日:科普很重要啊~~~
    201212信息学奥林匹克竞赛,新生招募
    模拟人脑:这个事儿,闹大了
  • 原文地址:https://www.cnblogs.com/zxbdboke/p/10466121.html
Copyright © 2011-2022 走看看