zoukankan      html  css  js  c++  java
  • python-kafka demo

    使用python-kafka https://pypi.org/project/kafka-python/

    创建topic kafka_demo1

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic kafka_demo1

    生产者

    from kafka import KafkaProducer
    from kafka.errors import kafka_errors
    import traceback
    import json
    
    
    def producer_demo():
        # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            key_serializer=lambda k: json.dumps(k).encode(),
            value_serializer=lambda v: json.dumps(v).encode())
        # 发送三条消息
        for i in range(0, 3):
            future = producer.send(
                'kafka_demo1',
                key='count_num',  # 同一个key值,会被送至同一个分区
                value=str(i),
                partition=1)  # 向分区1发送消息
            print("send {}".format(str(i)))
            try:
                future.get(timeout=10) # 监控是否发送成功
            except kafka_errors:  # 发送失败抛出kafka_errors
                traceback.format_exc()

    消费者

    from kafka import KafkaConsumer
    import json
    
    
    def consumer_demo():
        consumer = KafkaConsumer(
            'kafka_demo1',
            bootstrap_servers='localhost:9092',
            group_id='test'
        )
        for message in consumer:
            print("receive, key: {}, value: {}".format(
                json.loads(message.key.decode()),
                json.loads(message.value.decode())
            )
            )
    
    
    consumer_demo()
    Please call me JiangYouDang!
  • 相关阅读:
    HDU 2717 Catch That Cow
    补题列表
    Codeforces 862C 异或!
    HDU 2084
    HDU 2037
    Codeforces 492B
    POJ 2262
    Codeforces 1037A
    HDU 1276
    itertools — Functions creating iterators for efficient looping
  • 原文地址:https://www.cnblogs.com/luckygxf/p/15042229.html
Copyright © 2011-2022 走看看