zoukankan      html  css  js  c++  java
  • confluent-kafka python Producer Consumer实现

    • 基础通讯
      •   Producer.py
    import confluent_kafka
    import time
    
    topic = 'confluent-kafka-topic'
    
    
    def confluent_kafka_producer_performance():
        topic = 'confluent-kafka-topic'
        conf = {'bootstrap.servers': '192.168.65.130:9092'}
        producer = confluent_kafka.Producer(**conf)
        messages_to_retry = 0
        msg_payload = 'This is message'
    
        producer_start = time.time()
        for i in range(10):
            try:
                producer.produce(topic, value=msg_payload)
                print(msg_payload)
            except BufferError as e:
                messages_to_retry += 1
    
        # hacky retry messages that over filled the local buffer
        for i in range(messages_to_retry):
            producer.poll(0)
            try:
                producer.produce(topic, value=msg_payload)
            except BufferError as e:
                producer.poll(0)
                producer.produce(topic, value=msg_payload)
    
        producer.flush()
    
        return time.time() - producer_start
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_producer_performance()
        print(time_span)
      •   Consumer.py
    import confluent_kafka
    import uuid
    import time
    
    
    def confluent_kafka_consumer_performance():
        topic = 'confluent-kafka-topic'
        msg_consumed_count = 0
        conf = {'bootstrap.servers': '192.168.65.130:9092',
                'group.id': uuid.uuid1(),
                'session.timeout.ms': 6000,
                'default.topic.config': {
                    'auto.offset.reset': 'earliest'
                }
                }
    
        consumer = confluent_kafka.Consumer(**conf)
    
        consumer_start = time.time()
        # This is the same as pykafka, subscribing to a topic will start a background thread
        consumer.subscribe([topic])
    
        while True:
            msg = consumer.poll(1)
            if msg:
                msg_consumed_count += 1
                print(msg)
    
            if msg_consumed_count >= 10:
                break
    
        consumer_timing = time.time() - consumer_start
        consumer.close()
        return consumer_timing
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_consumer_performance()
        print(time_span)
    • 分区实现
      •   pro_partition.py
    import confluent_kafka
    
    conf = {'bootstrap.servers':'192.168.65.130:9092'}
    
    producer = confluent_kafka.Producer(**conf)
    
    producer.produce('con_1',key ='key',value='part_0', partition=0)
    
    producer.poll(1)
      •   con_partition.py
    import confluent_kafka
    import uuid
    
    conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()}
    consumer = confluent_kafka.Consumer(**conf)
    tp1=confluent_kafka.TopicPartition('con_1', 0)
    consumer.assign([tp1])
    msg = consumer.poll(1)
    msg.key()
    msg.value()
  • 相关阅读:
    java9新特性-9-语法改进:try语句
    10.04 FZSZ模拟Day1 总结
    10.03模拟总结
    HNOI2012 永无乡
    ZJOI2007 报表统计
    HNOI2004 宠物收养场
    HNOI2002 营业额统计
    Splay 区间反转
    Splay基本操作
    HEOI2016 树
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9443722.html
Copyright © 2011-2022 走看看