zoukankan      html  css  js  c++  java
  • python 实现 kakfa 的 生产消费模式 和 发布订阅模式

    python 实现 kakfa 的 生产消费模式 和 发布订阅模式(已安装好 kafka 的情况下)
    生产者 producer_demo.py
    from kafka import KafkaProducer, KafkaConsumer
    from kafka.errors import kafka_errors
    import traceback
    import json
    def producer_demo():
        # 假设生产的消息为键值对(不是一定要键值p对),且序列化方式为json
        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            key_serializer=lambda k: json.dumps(k).encode(),
            value_serializer=lambda v: json.dumps(v).encode())
        # 发送三条消息
        for i in range(0, 3):
            future = producer.send(
                'kafka_demo',
                key='count_num',  # 同一个key值,会被送至同一个分区
                value=str(i),
                partition=0)  # 向分区1发送消息
            print("send {}".format(str(i)))
            try:
                future.get(timeout=10) # 监控是否发送成功
            except kafka_errors:  # 发送失败抛出kafka_errors
                traceback.format_exc()
    if __name__ == "__main__":
        producer_demo()
     
    消费者 consumer_demo1.py
    from kafka import KafkaConsumer
    import json
    def consumer_demo():
        consumer = KafkaConsumer(
            'kafka_demo',
            bootstrap_servers=['localhost:9092'],
            group_id='test'
        )
        for message in consumer:
            print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
                )
            )
    if __name__ == "__main__":
        consumer_demo()

    消费者 consumer_demo2.py

    from kafka import KafkaConsumer
    import json
    def consumer_demo():
        consumer = KafkaConsumer(
            'kafka_demo',
            bootstrap_servers=['localhost:9092'],
            group_id='test'
        )
        for message in consumer:
            print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
                )
            )
    if __name__ == "__main__":
        consumer_demo()
    总结 : kafka 的生产和消费文件的 topic 都是 kafka_demo , 消费者 consumer_demo1.py 和consumer_demo2.py 文件中,
    如果 group_id 都为 test 的话,则为 生产消费模式,两个消费者只有一个会消费 topic 的消息;
    如果 group_id 不都为 test 的话,则为 发布订阅模式,两个消费者都会消费 topic 的消息。

     
     

  • 相关阅读:
    Unlicensed ARC session – terminating!
    ArcGIS读取dem格式数据
    OCIEnvCreate 失败,返回代码为 -1的解决方法
    PowerDesigner设计的数据库 ORA-0092
    Oracle空间查询 ORA-28595
    PowerDesigner添加表注释
    C# 动态解析表达式
    远程桌面不能交互复制粘贴
    ArcGIS10.4 Runtime Error R6034
    ArcGIS Add-in ValidateAddInXMLTask”任务意外失败
  • 原文地址:https://www.cnblogs.com/wjq310/p/python_kafka.html
Copyright © 2011-2022 走看看