zoukankan      html  css  js  c++  java
  • python 实现 kakfa 的 生产消费模式 和 发布订阅模式

    python 实现 kakfa 的 生产消费模式 和 发布订阅模式(已安装好 kafka 的情况下)
    生产者 producer_demo.py
    from kafka import KafkaProducer, KafkaConsumer
    from kafka.errors import kafka_errors
    import traceback
    import json
    def producer_demo():
        # 假设生产的消息为键值对(不是一定要键值p对),且序列化方式为json
        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            key_serializer=lambda k: json.dumps(k).encode(),
            value_serializer=lambda v: json.dumps(v).encode())
        # 发送三条消息
        for i in range(0, 3):
            future = producer.send(
                'kafka_demo',
                key='count_num',  # 同一个key值,会被送至同一个分区
                value=str(i),
                partition=0)  # 向分区1发送消息
            print("send {}".format(str(i)))
            try:
                future.get(timeout=10) # 监控是否发送成功
            except kafka_errors:  # 发送失败抛出kafka_errors
                traceback.format_exc()
    if __name__ == "__main__":
        producer_demo()
     
    消费者 consumer_demo1.py
    from kafka import KafkaConsumer
    import json
    def consumer_demo():
        consumer = KafkaConsumer(
            'kafka_demo',
            bootstrap_servers=['localhost:9092'],
            group_id='test'
        )
        for message in consumer:
            print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
                )
            )
    if __name__ == "__main__":
        consumer_demo()

    消费者 consumer_demo2.py

    from kafka import KafkaConsumer
    import json
    def consumer_demo():
        consumer = KafkaConsumer(
            'kafka_demo',
            bootstrap_servers=['localhost:9092'],
            group_id='test'
        )
        for message in consumer:
            print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
                )
            )
    if __name__ == "__main__":
        consumer_demo()
    总结 : kafka 的生产和消费文件的 topic 都是 kafka_demo , 消费者 consumer_demo1.py 和consumer_demo2.py 文件中,
    如果 group_id 都为 test 的话,则为 生产消费模式,两个消费者只有一个会消费 topic 的消息;
    如果 group_id 不都为 test 的话,则为 发布订阅模式,两个消费者都会消费 topic 的消息。

     
     

  • 相关阅读:
    Lerning Entity Framework 6 ------ Defining Relationships
    Lerning Entity Framework 6 ------ Defining the Database Structure
    Lerning Entity Framework 6 ------ Introduction to TPH
    Lerning Entity Framework 6 ------ Introduction to TPT
    Lerning Entity Framework 6 ------ Using a commandInterceptor
    Lerning Entity Framework 6 ------ A demo of using Entity framework with MySql
    C#是否该支持“try/catch/else”语法
    Hadoop学习之旅三:MapReduce
    CLR via C# 摘要二:IL速记
    Java 制表符 " "
  • 原文地址:https://www.cnblogs.com/wjq310/p/python_kafka.html
Copyright © 2011-2022 走看看