zoukankan      html  css  js  c++  java
  • kafka_consumer

    使用pykafka进行消费

    #coding:utf8
    from pykafka import KafkaClient
    import time
    from pykafka.common import OffsetType
    import json
    
    client = KafkaClient(hosts='192.168.1.1:9092')
    
    # print client.topics
    
    topic = client.topics['perfin']
    
    consumer = topic.get_balanced_consumer(consumer_group='perftest',
                                           auto_commit_enable=True,
                                           zookeeper_connect='192.168.1.1:2181',
                                           auto_commit_interval_ms = 1000,
                                           auto_offset_reset = OffsetType.LATEST,
                                           # auto_commit_enable=True, auto_co
                                           )
    
    start = time.time()
    c=0
    tmp = []
    for msg in consumer:
        c+=1
        tmp.append(msg.value)
        if c%1000==0:
            now =time.time()
            print "[%s] avg speed: %s /second"%(time.strftime('%Y-%m-%d %H:%M:%S'),int(c/(now-start)))
            start =now
        if c > 10000:
            open('./kafkamsg.txt','w').write("
    ".join(tmp))
            import sys
            sys.exit(0)
  • 相关阅读:
    数据结构实验2-迷宫
    离散实验4
    关系代数中的除法运算
    数据库中什么叫象集
    (转)汇编-补码
    2014022201
    20140222
    2014022101
    代码20140221
    代码20140215
  • 原文地址:https://www.cnblogs.com/yeyong/p/11855035.html
Copyright © 2011-2022 走看看