zoukankan      html  css  js  c++  java
  • confluent-kafka python Producer Consumer实现

    • 基础通讯
      •   Producer.py
    import confluent_kafka
    import time
    
    topic = 'confluent-kafka-topic'
    
    
    def confluent_kafka_producer_performance():
        topic = 'confluent-kafka-topic'
        conf = {'bootstrap.servers': '192.168.65.130:9092'}
        producer = confluent_kafka.Producer(**conf)
        messages_to_retry = 0
        msg_payload = 'This is message'
    
        producer_start = time.time()
        for i in range(10):
            try:
                producer.produce(topic, value=msg_payload)
                print(msg_payload)
            except BufferError as e:
                messages_to_retry += 1
    
        # hacky retry messages that over filled the local buffer
        for i in range(messages_to_retry):
            producer.poll(0)
            try:
                producer.produce(topic, value=msg_payload)
            except BufferError as e:
                producer.poll(0)
                producer.produce(topic, value=msg_payload)
    
        producer.flush()
    
        return time.time() - producer_start
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_producer_performance()
        print(time_span)
      •   Consumer.py
    import confluent_kafka
    import uuid
    import time
    
    
    def confluent_kafka_consumer_performance():
        topic = 'confluent-kafka-topic'
        msg_consumed_count = 0
        conf = {'bootstrap.servers': '192.168.65.130:9092',
                'group.id': uuid.uuid1(),
                'session.timeout.ms': 6000,
                'default.topic.config': {
                    'auto.offset.reset': 'earliest'
                }
                }
    
        consumer = confluent_kafka.Consumer(**conf)
    
        consumer_start = time.time()
        # This is the same as pykafka, subscribing to a topic will start a background thread
        consumer.subscribe([topic])
    
        while True:
            msg = consumer.poll(1)
            if msg:
                msg_consumed_count += 1
                print(msg)
    
            if msg_consumed_count >= 10:
                break
    
        consumer_timing = time.time() - consumer_start
        consumer.close()
        return consumer_timing
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_consumer_performance()
        print(time_span)
    • 分区实现
      •   pro_partition.py
    import confluent_kafka
    
    conf = {'bootstrap.servers':'192.168.65.130:9092'}
    
    producer = confluent_kafka.Producer(**conf)
    
    producer.produce('con_1',key ='key',value='part_0', partition=0)
    
    producer.poll(1)
      •   con_partition.py
    import confluent_kafka
    import uuid
    
    conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()}
    consumer = confluent_kafka.Consumer(**conf)
    tp1=confluent_kafka.TopicPartition('con_1', 0)
    consumer.assign([tp1])
    msg = consumer.poll(1)
    msg.key()
    msg.value()
  • 相关阅读:
    通过web端启动关闭服务器程序以及检测程序运行状态
    Windows 自动监听程序,游戏服务器挂掉以后,自动监听程序将其重启起来
    自动监听程序,如果程序挂了,就重启
    删除log
    封装了一个C++类,当程序意外崩溃的时候可以生成dump文件,以便确定错误原因。
    贝塞尔曲线
    golang sql连接池 超时 数据库自动断开 ->127.0.0.1:3 306: wsarecv: An established connection was aborted by the software in your host machine.
    带控制的抢庄牛牛
    龙虎斗控制
    回归模型与房价预测
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9443722.html
Copyright © 2011-2022 走看看