zoukankan      html  css  js  c++  java
  • 使用 pykafka 进行消费

    kafka连接脚本

    环境:python3,用到的模块有 pykafka,kazoo

    # coding=utf-8
    
    import pykafka
    
    
    class KafkaReaderThread(object):
        def __init__(self, hosts, broker_version, topic, consumer_group):
            self.hosts = hosts
            self.broker_version = broker_version
            self.topic = topic
            self.consumer_group = consumer_group
            self.client = self.new_client()
    
        def new_client(self):
            print "start connect..."
            try:
                new_client = pykafka.KafkaClient(
                    hosts=self.hosts,
                    # zookeeper_hosts=self.proc_setting['setting']['zk_server'],
                    broker_version=self.broker_version
                )
                print "connected"
                return new_client
            except Exception as e:
                print("error: {}".format(e))
                return
    
        def fetchmany(self):
            client = self.client
            if client:
                consumer = None
                try:
                    topic = client.topics[self.topic]
    
                    consumer = topic.get_balanced_consumer(
                        consumer_group=self.consumer_group,
                        managed=True,
                        auto_start=False,
                        # auto_commit_enable=True,
                        # auto_commit_interval_ms=1
                        # reset_offset_on_start=False,
                        # auto_offset_reset=pykafka.common.OffsetType.LATEST,
                    )
                    # print topic.partitions
                    consumer.start()
                    # _offset = consumer.held_offsets()
                    for message in consumer:
                        # test modle
    
                        print(message.value, consumer.held_offsets)
    
                        consumer.commit_offsets()
                        # consumer.stop()
                        # continue
    
                except Exception as e:
                    print("error: {}".format(e))
                    try:
                        consumer.stop()
                    except Exception as e:
                        pass
    
    
    if __name__ == '__main__':
        hosts = "172.16.1.249:9092"
        broker_version = '0.9.0'
        topic = "BAYONET_VEHICLEALARM"
        consumer_group = "consumer_group_police_seemmo"
    
        kafka = KafkaReaderThread(hosts, broker_version, topic, consumer_group)
        if kafka.client:
            kafka.fetchmany()
    每天都要遇到更好的自己.
  • 相关阅读:
    jQuery 折叠,自动完成,小提示,树,验证插件(bassistance.de)
    多样化的连结提示效果(Tips)
    Ext开源 Extjs2.0 人力资源管理(ASP.NET)
    JavaScript面向对象编程
    访问Ext.data.store的数据
    Ext核心代码分析之Function.createDelegate
    支持firefox的省略符
    Ext 2.0下Jquery的整合使用示例
    多样化的垂直菜单(OUTLOOK菜单)
    使用 jQuery 简化 Ajax 开发
  • 原文地址:https://www.cnblogs.com/kaichenkai/p/10795140.html
Copyright © 2011-2022 走看看