zoukankan      html  css  js  c++  java
  • Kafka备忘

     

    官网 http://kafka.apache.org/

    多生产者多消费者

    多topic和多分区

    多消费者组。每组中消息不能重复消费,组间不影响

    启动

    RunKafka(){
        cd $kafka_home
        nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> zk$dayStr.log  & 
        echo 'Starting zookeeper...'
        sleep 5s #wait a monment until zookeeper is ready
    
        nohup ./bin/kafka-server-start.sh ./config/server.properties  >> kafka$dayStr.log & 
        echo 'Starting kafka-server...'
    }

    流程:启动zookeeper -> 启动kafka-server -> 创建topic -> 创建生产者 -> 创建消费者

    基本命令

    # kafka basic common
    # ./app/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic huashi
    # ./app/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
    # ./app/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic huashi
    # ./app/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic huashi
    
    # ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic huashi
    # ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic huashi --from-beginning
    # ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic huashi --partitions 4
    
    # ==================delete consumer group======
    # ~/kafka/bin/zookeeper-shell.sh localhost:2181
    # ls /consumers
    # rmr /consumers/bd/offsets
    # rmr /consumers/test/offsets/huashi20151108
    
    
    # ls /tmp/kafka-logs/

    Python包

    https://github.com/mumrah/kafka-python

    用法:http://kafka-python.readthedocs.org/en/latest/usage.html 

    FakeProducer.py

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    
    __author__ = 'manhua'
    
    from kafka import SimpleProducer, KafkaClient
    import time
    # To send messages synchronously
    kafka = KafkaClient('localhost:9092')
    producer = SimpleProducer(kafka)
    n=0
    while True:
        producer.send_messages(b'huashi', str(n))
        n+=1
        time.sleep(1)
        print n

    ConsumerTest.py

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    
    __author__ = 'manhua'
    
    from kafka import KafkaConsumer
    import sys
    
    
    class UnzipConsumer:
        def __init__(self, topic, partition_id, gid='bd', bs_server='localhost:9092'):
            self.partition_id = partition_id
            self.consumer = KafkaConsumer((topic, int(partition_id)),  # must specify an id, or it will quite slow
                                          group_id=gid,
                                          bootstrap_servers=[bs_server],
                                          auto_offset_reset='smallest'  # ,
                                          # consumer_timeout_ms=1000*60*30
                                          )
    
        def run(self):
            for message in self.consumer:
                print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                 message.offset, message.key,
                                                 message.value))
                self.consumer.task_done(message)
                self.consumer.commit()
    
    
    if __name__ == '__main__':
    
        if len(sys.argv) == 3:
            obj = UnzipConsumer(sys.argv[1], sys.argv[2])
            obj.run()
        else:
            print 'Parameters: [topic] [id]'
    
    
    # python unzipConsumer.py  huashi #0

    监控工具

    https://github.com/quantifind/KafkaOffsetMonitor

    实时监控kafka的consumer以及他们在partition中的offset

    因为KafkaOffsetMonitor中有些资源文件(css,js)是访问google资源,所以有人做了修改版 http://pan.baidu.com/s/1qWH05q8 

    java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost --refresh 15.minutes --retain 5.day --port 5354
    

      

    Ref:

    http://blog.csdn.net/lizhitao/article/details/27199863

  • 相关阅读:
    【hibernate】常用注解
    【Maven】常用命令
    【Eclipse】安装配置
    【Eclipse】Spring Tool Suite插件
    【Git】远程分支
    【Git】本地分支
    日地拉格朗日L2点轨道的卫星运行
    SDK Manager的使用
    Appium Python API
    输入的中文,屏蔽软键盘
  • 原文地址:https://www.cnblogs.com/manhua/p/4956071.html
Copyright © 2011-2022 走看看