zoukankan      html  css  js  c++  java
  • python模拟发送、消费kafka消息

    参考文章: https://zhuanlan.zhihu.com/p/279784873

    生产者代码:

    import traceback
    
    from kafka import KafkaProducer,KafkaConsumer
    from faker import Faker
    
    fake=Faker()
    # 生产者
    kafka_topic = "test_kafka_demo"
    kafka_bootstrap_servers = ['xx:9092','xx:9092','xxx:9092']
    # 消费者
    kafka_topic_group = "test-group-zeze" #消费者群组
    def producer(num:int):
        producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
        phones = [fake.name()+"-"+str(i) for i in range(num)]
        for p in phones:
            msg = bytes(p, encoding='utf-8')
            # print("生成消息",msg)

          #同一个key的消息会被自动分配到同一个分区 future=producer.send(kafka_topic, key=b"test",value=msg)
          #加了监听事件是否成功发送后,执行速度很慢,所以这里去掉了
    # try: # future.get(timeout=2) # except Exception as e: # traceback.print_stack() print("成功生产{}条消息".format(num)) producer.close()

    消费者代码:

    from kafka import KafkaConsumer
    
    
    # 生产者
    kafka_topic = "test_kafka_demo"
    kafka_bootstrap_servers = ['xx:9092','xx:9092','xx:9092'] #xx为对应的ip地址
    # 消费者
    kafka_topic_group = "test-group-zeze" #消费者群组
    
    
    def consumer():
        consumer = KafkaConsumer(kafka_topic,group_id=kafka_topic_group,bootstrap_servers=kafka_bootstrap_servers)
    
        for message in consumer:
            print ("开始消费:","%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                  message.offset, message.key,
                                                  message.value))
    
        # consumer.close()

    亲测单机生产10w消息耗时20秒内,单线程

    消费者没记录耗时,但是也非常快,kafka性能确实牛 

    脑子不够用当然只能脚踏实地的做事情!
  • 相关阅读:
    PHP定时任务实现(计划任务 vs node.js)
    第三方支付,代支付接口调用
    iframe调用页面中的局部部分
    树状数据删除(TP5)
    PHP 代码编写注意事项总结归纳
    MySQL 存储过程与事物
    radio与checkbox的选中事件
    简单十步让你全面理解SQL
    生成条形码
    使2个div 在一行上显示
  • 原文地址:https://www.cnblogs.com/qtclm/p/15171594.html
Copyright © 2011-2022 走看看