zoukankan      html  css  js  c++  java
  • python-使用pykafka模块连接kafka

    1、导入模块

    from pykafka import KafkaClient,common

     2、实例化

    client = KafkaClient(hosts="host:port")   #实例化
    print(client.topics)
    print(client.brokers)

    ps:一个坑
    broker打印出来的host如果是域名形式的(kafka自己设置了只能通过域名连接),实例化KafkaClient时传入的是ip,在没有配置host的情况下,连接会报socket连接错误
    出现socket错误时,首先检查是否是这种情况,如果是,在host文件中配置域名和ip的映射可以解决

    3、生产者demo

    from pykafka import KafkaClient,common
    client = KafkaClient(hosts="ip:port")   #实例化
    print(client.topics)
    print(client.brokers)
    topic = client.topics['test_topic']  #指定topic,没有就新建
    producer = topic.get_producer()
    for i in range(1):
        producer.produce(('test message ' + str(i ** 2)).encode())
    producer.stop()

    4、消费者demo

    from pykafka import KafkaClient,common
    import time
    import json
    class KafkaTest(object):
        def __init__(self, host):
            self.host = host
            self.client = KafkaClient(hosts=self.host)
    
        def balance_consumer(self, topic, offset=0):
            """
            使用balance consumer去消费kafka
            :return:
            """
            result=[]
    
            topic = self.client.topics[topic.encode()]
            # managed=True 设置后,使用新式reblance分区方法,不需要使用zk,而False是通过zk来实现reblance的需要使用zk,必须指定        # zookeeper_connect = "zookeeperIp",consumer_group='test_group',
            consumer = topic.get_balanced_consumer(consumer_group='test_group',
                                        auto_commit_enable=True,managed=True,
                                                consumer_timeout_ms=1000)
            partitions = topic.partitions
            print("分区 {}".format(partitions))
            earliest_offsets = topic.earliest_available_offsets()
            print("最早可用offset {}".format(earliest_offsets))
            last_offsets = topic.latest_available_offsets()
            print("最近可用offset {}".format(last_offsets))
            offset = consumer.held_offsets
            print("当前消费者分区offset情况{}".format(offset))
            while True:
                msg = consumer.consume()
                if msg:
                    offset = consumer.held_offsets
                    print("当前位移:{}".format(offset))
                    result.append(eval(msg.value.decode()))
                    print(msg.value.decode())
                    consumer.commit_offsets()   #commit一下
                    
                else:
                    print("没有数据")
    
    
    
    if __name__ == '__main__':
        host = 'ip:port' 
        kafka_ins = KafkaTest(host)
        topic = 'test_topic'
        kafka_ins.balance_consumer(topic)
  • 相关阅读:
    快速搭建vue2.0开发环境
    node+websocket+react即时匿名通讯聊天室
    12月14日,Progress库
    12月9日,timer库
    12月7日,BOOST库安装及配置
    12月7日开始学习Boost
    is not allowed to connect to this MySQL server解决办法
    清华学堂练习题——传纸条
    makefile经典教程
    启动mysql服务命令
  • 原文地址:https://www.cnblogs.com/luoyc/p/12097974.html
Copyright © 2011-2022 走看看