zoukankan      html  css  js  c++  java
  • python kafka client--confluent-kafka-python

    项目中需要使用python 向Kafka生产和消费数据,最初使用pykafka 。后来发现pykafka不支持client.id.
    最后,终于找到confluent-kafka。

    python kafka推荐使用confluent-kafka,官方推荐的。

    Confluent's Apache Kafka Python client
    confluent-kafka-python is Confluent's Python client for Apache Kafka and the Confluent Platform.

    安装和使用可参考github
    https://github.com/confluentinc/confluent-kafka-python

    本文只针对笔者使用python2.7 + Linux环境介绍kafka包安装和使用。

    安装python
    可网上搜索教程,此处略过。

    安装librdkafka

    For RedHat and RPM-based distros, add this YUM repo and then do sudo yum install librdkafka-devel python-devel: http://docs.confluent.io/current/installation.html#rpm-packages-via-yum

    其他系统,可以参考github上说明。

    安装 python kafka pacakge

    $ pip install confluent-kafka
    
    Successfully installed confluent-kafka-0.11.4
    

    consumer 举例

    #!/usr/bin/python
    
    from confluent_kafka import Consumer, KafkaError
    
    mybroker = "127.0.0.1:9092"
    
    c = Consumer({
        'bootstrap.servers': mybroker,
        'group.id': 'mygroup',
        'client.id': 'lanyang',
        'default.topic.config': {
        'auto.offset.reset': 'smallest'
        }
    })
    
    c.subscribe(['test'])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()
    

    输出结果

    $ python kafa_consumer.py 
    Received message: Hello,python
    Received message: Hello,kafka
    

    更多demo请参考github。

  • 相关阅读:
    sqli29-32环境搭建(winserver)
    sqli-labs(Basic)
    SQL语句
    8月10号
    8月9号
    第五周进度报告
    8月8号
    8月7号
    8月6号
    大道至简读后感
  • 原文地址:https://www.cnblogs.com/lanyangsh/p/9162546.html
Copyright © 2011-2022 走看看