zoukankan      html  css  js  c++  java
  • python-使用pykafka模块连接kafka

    1、导入模块

    from pykafka import KafkaClient,common

     2、实例化

    client = KafkaClient(hosts="host:port")   #实例化
    print(client.topics)
    print(client.brokers)

    ps:一个坑
    broker打印出来的host如果是域名形式的(kafka自己设置了只能通过域名连接),实例化KafkaClient时传入的是ip,在没有配置host的情况下,连接会报socket连接错误
    出现socket错误时,首先检查是否是这种情况,如果是,在host文件中配置域名和ip的映射可以解决

    3、生产者demo

    from pykafka import KafkaClient,common
    client = KafkaClient(hosts="ip:port")   #实例化
    print(client.topics)
    print(client.brokers)
    topic = client.topics['test_topic']  #指定topic,没有就新建
    producer = topic.get_producer()
    for i in range(1):
        producer.produce(('test message ' + str(i ** 2)).encode())
    producer.stop()

    4、消费者demo

    from pykafka import KafkaClient,common
    import time
    import json
    class KafkaTest(object):
        def __init__(self, host):
            self.host = host
            self.client = KafkaClient(hosts=self.host)
    
        def balance_consumer(self, topic, offset=0):
            """
            使用balance consumer去消费kafka
            :return:
            """
            result=[]
    
            topic = self.client.topics[topic.encode()]
            # managed=True 设置后,使用新式reblance分区方法,不需要使用zk,而False是通过zk来实现reblance的需要使用zk,必须指定        # zookeeper_connect = "zookeeperIp",consumer_group='test_group',
            consumer = topic.get_balanced_consumer(consumer_group='test_group',
                                        auto_commit_enable=True,managed=True,
                                                consumer_timeout_ms=1000)
            partitions = topic.partitions
            print("分区 {}".format(partitions))
            earliest_offsets = topic.earliest_available_offsets()
            print("最早可用offset {}".format(earliest_offsets))
            last_offsets = topic.latest_available_offsets()
            print("最近可用offset {}".format(last_offsets))
            offset = consumer.held_offsets
            print("当前消费者分区offset情况{}".format(offset))
            while True:
                msg = consumer.consume()
                if msg:
                    offset = consumer.held_offsets
                    print("当前位移:{}".format(offset))
                    result.append(eval(msg.value.decode()))
                    print(msg.value.decode())
                    consumer.commit_offsets()   #commit一下
                    
                else:
                    print("没有数据")
    
    
    
    if __name__ == '__main__':
        host = 'ip:port' 
        kafka_ins = KafkaTest(host)
        topic = 'test_topic'
        kafka_ins.balance_consumer(topic)
  • 相关阅读:
    uva 10369 Arctic Network
    uvalive 5834 Genghis Khan The Conqueror
    uvalive 4848 Tour Belt
    uvalive 4960 Sensor Network
    codeforces 798c Mike And Gcd Problem
    codeforces 796c Bank Hacking
    codeforces 768c Jon Snow And His Favourite Number
    hdu 1114 Piggy-Bank
    poj 1276 Cash Machine
    bzoj 2423 最长公共子序列
  • 原文地址:https://www.cnblogs.com/luoyc/p/12097974.html
Copyright © 2011-2022 走看看