zoukankan      html  css  js  c++  java
  • confluent-kafka python Producer Consumer实现

    • 基础通讯
      •   Producer.py
    import confluent_kafka
    import time
    
    topic = 'confluent-kafka-topic'
    
    
    def confluent_kafka_producer_performance():
        topic = 'confluent-kafka-topic'
        conf = {'bootstrap.servers': '192.168.65.130:9092'}
        producer = confluent_kafka.Producer(**conf)
        messages_to_retry = 0
        msg_payload = 'This is message'
    
        producer_start = time.time()
        for i in range(10):
            try:
                producer.produce(topic, value=msg_payload)
                print(msg_payload)
            except BufferError as e:
                messages_to_retry += 1
    
        # hacky retry messages that over filled the local buffer
        for i in range(messages_to_retry):
            producer.poll(0)
            try:
                producer.produce(topic, value=msg_payload)
            except BufferError as e:
                producer.poll(0)
                producer.produce(topic, value=msg_payload)
    
        producer.flush()
    
        return time.time() - producer_start
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_producer_performance()
        print(time_span)
      •   Consumer.py
    import confluent_kafka
    import uuid
    import time
    
    
    def confluent_kafka_consumer_performance():
        topic = 'confluent-kafka-topic'
        msg_consumed_count = 0
        conf = {'bootstrap.servers': '192.168.65.130:9092',
                'group.id': uuid.uuid1(),
                'session.timeout.ms': 6000,
                'default.topic.config': {
                    'auto.offset.reset': 'earliest'
                }
                }
    
        consumer = confluent_kafka.Consumer(**conf)
    
        consumer_start = time.time()
        # This is the same as pykafka, subscribing to a topic will start a background thread
        consumer.subscribe([topic])
    
        while True:
            msg = consumer.poll(1)
            if msg:
                msg_consumed_count += 1
                print(msg)
    
            if msg_consumed_count >= 10:
                break
    
        consumer_timing = time.time() - consumer_start
        consumer.close()
        return consumer_timing
    
    
    if __name__ == "__main__":
        time_span = confluent_kafka_consumer_performance()
        print(time_span)
    • 分区实现
      •   pro_partition.py
    import confluent_kafka
    
    conf = {'bootstrap.servers':'192.168.65.130:9092'}
    
    producer = confluent_kafka.Producer(**conf)
    
    producer.produce('con_1',key ='key',value='part_0', partition=0)
    
    producer.poll(1)
      •   con_partition.py
    import confluent_kafka
    import uuid
    
    conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()}
    consumer = confluent_kafka.Consumer(**conf)
    tp1=confluent_kafka.TopicPartition('con_1', 0)
    consumer.assign([tp1])
    msg = consumer.poll(1)
    msg.key()
    msg.value()
  • 相关阅读:
    根据group by、count case when 分组分类统计
    Cron表达式
    SQL分页查询 — 不同页面的查询结果有重复数据
    Dockerfile文件语法
    redis获取系统当前时间
    mybatis oracle批量插入数据
    Mysql函数->TRIM(去掉首尾空格、任意字符)
    Oracle函数->TRIM(去掉首尾空格、首尾字符)
    使用redis-list类型 限制用户1分钟内访问次数为100次
    一文了解mysql基础架构
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9443722.html
Copyright © 2011-2022 走看看