zoukankan      html  css  js  c++  java
  • python模拟发送、消费kafka消息

    参考文章: https://zhuanlan.zhihu.com/p/279784873

    生产者代码:

    import traceback
    
    from kafka import KafkaProducer,KafkaConsumer
    from faker import Faker
    
    fake=Faker()
    # 生产者
    kafka_topic = "test_kafka_demo"
    kafka_bootstrap_servers = ['xx:9092','xx:9092','xxx:9092']
    # 消费者
    kafka_topic_group = "test-group-zeze" #消费者群组
    def producer(num:int):
        producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
        phones = [fake.name()+"-"+str(i) for i in range(num)]
        for p in phones:
            msg = bytes(p, encoding='utf-8')
            # print("生成消息",msg)

          #同一个key的消息会被自动分配到同一个分区 future=producer.send(kafka_topic, key=b"test",value=msg)
          #加了监听事件是否成功发送后,执行速度很慢,所以这里去掉了
    # try: # future.get(timeout=2) # except Exception as e: # traceback.print_stack() print("成功生产{}条消息".format(num)) producer.close()

    消费者代码:

    from kafka import KafkaConsumer
    
    
    # 生产者
    kafka_topic = "test_kafka_demo"
    kafka_bootstrap_servers = ['xx:9092','xx:9092','xx:9092'] #xx为对应的ip地址
    # 消费者
    kafka_topic_group = "test-group-zeze" #消费者群组
    
    
    def consumer():
        consumer = KafkaConsumer(kafka_topic,group_id=kafka_topic_group,bootstrap_servers=kafka_bootstrap_servers)
    
        for message in consumer:
            print ("开始消费:","%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                  message.offset, message.key,
                                                  message.value))
    
        # consumer.close()

    亲测单机生产10w消息耗时20秒内,单线程

    消费者没记录耗时,但是也非常快,kafka性能确实牛 

    脑子不够用当然只能脚踏实地的做事情!
  • 相关阅读:
    通过strace 监控 fdatasync
    RAID 2.0
    AHCI vs NVMe
    NVMe 图解
    详解linux运维工程师入门级必备技能
    条带深度 队列深度 NCQ IOPS
    NVMe 与 AHCI
    IO负载高的来源定位 IO系列
    磁盘性能指标--IOPS 理论
    java程序员从笨鸟到菜鸟系列
  • 原文地址:https://www.cnblogs.com/qtclm/p/15171594.html
Copyright © 2011-2022 走看看