zoukankan      html  css  js  c++  java
  • confluent-kafka python Producer Consumer实现

    • 基础通讯
      •   Producer.py
    import confluent_kafka
    import time
    
    topic = 'confluent-kafka-topic'
    
    
    def confluent_kafka_producer_performance():
        topic = 'confluent-kafka-topic'
        conf = {'bootstrap.servers': '192.168.65.130:9092'}
        producer = confluent_kafka.Producer(**conf)
        messages_to_retry = 0
        msg_payload = 'This is message'
    
        producer_start = time.time()
        for i in range(10):
            try:
                producer.produce(topic, value=msg_payload)
                print(msg_payload)
            except BufferError as e:
                messages_to_retry += 1
    
        # hacky retry messages that over filled the local buffer
        for i in range(messages_to_retry):
            producer.poll(0)
            try:
                producer.produce(topic, value=msg_payload)
            except BufferError as e:
                producer.poll(0)
                producer.produce(topic, value=msg_payload)
    
        producer.flush()
    
        return time.time() - producer_start
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_producer_performance()
        print(time_span)
      •   Consumer.py
    import confluent_kafka
    import uuid
    import time
    
    
    def confluent_kafka_consumer_performance():
        topic = 'confluent-kafka-topic'
        msg_consumed_count = 0
        conf = {'bootstrap.servers': '192.168.65.130:9092',
                'group.id': uuid.uuid1(),
                'session.timeout.ms': 6000,
                'default.topic.config': {
                    'auto.offset.reset': 'earliest'
                }
                }
    
        consumer = confluent_kafka.Consumer(**conf)
    
        consumer_start = time.time()
        # This is the same as pykafka, subscribing to a topic will start a background thread
        consumer.subscribe([topic])
    
        while True:
            msg = consumer.poll(1)
            if msg:
                msg_consumed_count += 1
                print(msg)
    
            if msg_consumed_count >= 10:
                break
    
        consumer_timing = time.time() - consumer_start
        consumer.close()
        return consumer_timing
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_consumer_performance()
        print(time_span)
    • 分区实现
      •   pro_partition.py
    import confluent_kafka
    
    conf = {'bootstrap.servers':'192.168.65.130:9092'}
    
    producer = confluent_kafka.Producer(**conf)
    
    producer.produce('con_1',key ='key',value='part_0', partition=0)
    
    producer.poll(1)
      •   con_partition.py
    import confluent_kafka
    import uuid
    
    conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()}
    consumer = confluent_kafka.Consumer(**conf)
    tp1=confluent_kafka.TopicPartition('con_1', 0)
    consumer.assign([tp1])
    msg = consumer.poll(1)
    msg.key()
    msg.value()
  • 相关阅读:
    随笔
    第一次随笔
    团队战day02-接口
    团队战day01-初步搭建UI
    团队战start-确定项目以及介绍
    团队—易软
    找回感觉的练习
    第五次作业-团队作业-团队组建
    第四次博客作业-结对项目
    java第九次作业
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9443722.html
Copyright © 2011-2022 走看看