zoukankan      html  css  js  c++  java
  • 使用 pykafka 进行消费

    kafka连接脚本

    环境:python3,用到的模块有 pykafka,kazoo

    # coding=utf-8
    
    import pykafka
    
    
    class KafkaReaderThread(object):
        def __init__(self, hosts, broker_version, topic, consumer_group):
            self.hosts = hosts
            self.broker_version = broker_version
            self.topic = topic
            self.consumer_group = consumer_group
            self.client = self.new_client()
    
        def new_client(self):
            print "start connect..."
            try:
                new_client = pykafka.KafkaClient(
                    hosts=self.hosts,
                    # zookeeper_hosts=self.proc_setting['setting']['zk_server'],
                    broker_version=self.broker_version
                )
                print "connected"
                return new_client
            except Exception as e:
                print("error: {}".format(e))
                return
    
        def fetchmany(self):
            client = self.client
            if client:
                consumer = None
                try:
                    topic = client.topics[self.topic]
    
                    consumer = topic.get_balanced_consumer(
                        consumer_group=self.consumer_group,
                        managed=True,
                        auto_start=False,
                        # auto_commit_enable=True,
                        # auto_commit_interval_ms=1
                        # reset_offset_on_start=False,
                        # auto_offset_reset=pykafka.common.OffsetType.LATEST,
                    )
                    # print topic.partitions
                    consumer.start()
                    # _offset = consumer.held_offsets()
                    for message in consumer:
                        # test modle
    
                        print(message.value, consumer.held_offsets)
    
                        consumer.commit_offsets()
                        # consumer.stop()
                        # continue
    
                except Exception as e:
                    print("error: {}".format(e))
                    try:
                        consumer.stop()
                    except Exception as e:
                        pass
    
    
    if __name__ == '__main__':
        hosts = "172.16.1.249:9092"
        broker_version = '0.9.0'
        topic = "BAYONET_VEHICLEALARM"
        consumer_group = "consumer_group_police_seemmo"
    
        kafka = KafkaReaderThread(hosts, broker_version, topic, consumer_group)
        if kafka.client:
            kafka.fetchmany()
    每天都要遇到更好的自己.
  • 相关阅读:
    wso2使用
    wso2安装
    CLR 编译函数的两种结果的原因
    hduoj4311
    通过Git在本地局域网中的两台电脑间同步代码
    Git基本操作之强制推送覆盖仓库
    设置Mac共享网络给其他设备
    谷歌浏览器设置无图浏览模式
    加载到SGA中的库缓存对象超过阈值
    webBrowser 禁止屏蔽alert confirm open showModalDialog
  • 原文地址:https://www.cnblogs.com/kaichenkai/p/10795140.html
Copyright © 2011-2022 走看看