zoukankan      html  css  js  c++  java
  • python模拟发送、消费kafka消息

    参考文章: https://zhuanlan.zhihu.com/p/279784873

    生产者代码:

    import traceback
    
    from kafka import KafkaProducer,KafkaConsumer
    from faker import Faker
    
    fake=Faker()
    # 生产者
    kafka_topic = "test_kafka_demo"
    kafka_bootstrap_servers = ['xx:9092','xx:9092','xxx:9092']
    # 消费者
    kafka_topic_group = "test-group-zeze" #消费者群组
    def producer(num:int):
        producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
        phones = [fake.name()+"-"+str(i) for i in range(num)]
        for p in phones:
            msg = bytes(p, encoding='utf-8')
            # print("生成消息",msg)

          #同一个key的消息会被自动分配到同一个分区 future=producer.send(kafka_topic, key=b"test",value=msg)
          #加了监听事件是否成功发送后,执行速度很慢,所以这里去掉了
    # try: # future.get(timeout=2) # except Exception as e: # traceback.print_stack() print("成功生产{}条消息".format(num)) producer.close()

    消费者代码:

    from kafka import KafkaConsumer
    
    
    # 生产者
    kafka_topic = "test_kafka_demo"
    kafka_bootstrap_servers = ['xx:9092','xx:9092','xx:9092'] #xx为对应的ip地址
    # 消费者
    kafka_topic_group = "test-group-zeze" #消费者群组
    
    
    def consumer():
        consumer = KafkaConsumer(kafka_topic,group_id=kafka_topic_group,bootstrap_servers=kafka_bootstrap_servers)
    
        for message in consumer:
            print ("开始消费:","%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                  message.offset, message.key,
                                                  message.value))
    
        # consumer.close()

    亲测单机生产10w消息耗时20秒内,单线程

    消费者没记录耗时,但是也非常快,kafka性能确实牛 

    脑子不够用当然只能脚踏实地的做事情!
  • 相关阅读:
    华为ensp使用
    网络学习目录
    MySQL简介
    zip命令详解
    gzip命令详解
    unzip/tar命令详解
    tar命令详解
    ipython使用
    os, sys, stat 模块使用
    配置linux系统时区---解决ntp同步完时间不准问题
  • 原文地址:https://www.cnblogs.com/qtclm/p/15171594.html
Copyright © 2011-2022 走看看