zoukankan      html  css  js  c++  java
  • 在Windows环境中安装并使用kafka

    1. 安装部署Java

    java -version

    至于怎么windows怎么安装java,此处不再赘述

    2. 安装zookeeper

    下载zookeeper并解压,下载地址:http://zookeeper.apache.org/releases.html,
    选择自己需要的版本
    进入zookeeper设置目录,将zoo_sample.cfg重命名为:zoo.cfg
    在编辑器中打开zoo.cfg,将dataDir的值改成自己的data目录(需要新建)
    新建zookeeper系统变量ZOOKEEPER_HOME={zookeeper根目录路径},并把bin目录添加到系统的path变量中

    打开新的cmd,输入zkserver,运行Zookeeper服务器,如果安装成功,启动的界面如下:

    ZooKeeper JMX enabled by default
    Using config: d:workspacesoftware/zookeeper-3.4.12in..confzoo.cfg
    Starting zookeeper ... STARTED

    说明zookeeper已经安装成功并运行在2181端口。

    具体请参见本人另外的博文《Windows安装和使用zookeeper

    3.安装kafka
    下载需要的软件并解压,下载地址:
    http://kafka.apache.org/downloads.html
    进入kafka安装目录的config目录,修改server.properties文件,如修改的地方如下:
    把log.dirs改成自己的目录,一般在kafka安装目录下新建文件夹来存放日志文件
    Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181


    4.运行kafka服务器
    进入kafka安装目录,按下shift +右键,选择 "在此处打开命令窗口",输入如下命令并按回车

    .inwindowskafka-server-start.bat    .configserver.properties

    注意:kafka依赖zookeeper,需要事先启动zookeeper.

    5. 使用kafka

    5.1 创建主题
    进入kafka安装目录的inwindows下按shift +右键,选择“在此处打开命令窗口”,输入如下命令并回车:

    kafak-topics.bat  --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test

    5.2 创建producer 及consumer来测试服务器
    在kafka安装目录的inwindows启动新的命令窗口,producer和consumer需要分别启动命令窗口。
    启动producter,启动命令如下:

    kafka-console-producer.bat  --broker-list localhost:9092  --topic test


    启动consumer,启动命令如下:

    kafka-console-consumer.bat  --zookeeper localhost:2181  --topic test


    在producter窗口输入内容,如果在consumer窗口能看到内容,则说明kafka安装成功

    6. kafka常用命令

    #列出主题
    kafka-topic.bat -list -zookeeper localhost:2181  
    
    #描述主题  
    kafka-topics.bat -describe -zookeeper localhost:2181 -topic [topic name]   
    
    #从头读取消息
    kafka-console-consumer.bat -zookeeper localhost:2181 -topic [topic name] -from-beginning 
    
    #删除主题
    kafka-run-class.bat kafka.admin.TopicCommand -delete -topic [topic_to_delete] -zookeeper localhost:2181  
    #查看topic的详细信息 
    ./kafka-topic.sh -zookeeper localhost:2181 -describe -topic  [topic name]    
    
    #为topic增加副本
    ./kafka-reassign-partitions.sh -zookeeper localhost:2181 -reassignment-json-file json/partitions-to-move.json -execute  
    
    #为topic增加partition
    ./bin/kafka-topics.sh -zookeeper localhost:2181 -alter -partitions 20 -topic [topic name]   
    
    #下线broker
    ./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper localhost:2181 broker [brokerId] --num.retries 3 --retry.interval.ms 60  shutdown broker 
    
    #luanch console producer, and specifiy the parse key, the key and value is splited by blank symbol
    kafka-console-producer.sh --broker-list server-1:9092,server-2:9092,server-3:9092 --topic kafka-action --property parse.key=true --property key.separator=' '
    
    #alter topic    config-test ,  sets the property segment.bytes to 200MB
    kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181  --topic config-test --config segment.bytes=209715200
    
    #alter topic    config-test ,  delete the property config segment.bytes
    kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181  --topic config-test --delete-config segment.bytes=209715200
    
    #查看主题当前己覆盖的配置
    kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181  --topics-with-overrides --topic config-test 
    
    #view the partitions with status under replicated
    kafka-topics.sh --describe --zookeeper server-1:2181,server-2:2181,server-3:2181  --under-replicated  partitions
    
    #查看(某个特定)主题的哪些分区的Leader 己不可用
    kafka-topics . sh --describe --zookeeper server-1:2181,server-2:2181,server-3:2181  --unavailablepartitions   [--topic  {topic_name}]
    
    #查看某个主题各分区对应消息偏移量,time 参数表示查看在指定时间之前的数据,支持-1(latest),-2 (earliest) 两个时间选项,默认取值为-l 。
    kafka-run-class.sh kafka . tools.GetOffsetShell --broker - list server-1:9092,server-2:9092,server-3:9092 --topic kafka- action --time -1
    
    #查看kafka日志消息,
    #files 是必传参数,用于指定要转储( dump )文件的路径, 可同时指定多个文件,多个文件路径之间以逗号分隔。
    kafka-run- class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka-logs/producer-create-topic-0/00000000000000000000.log  --print-data-log
    
    #性能测试工具,向一个名为producer-perιtest的主题发送100 万条消息,每条消息大小为1000字节,
    kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000  --topic producer-perf-test --throughput 1000000 
    --producer-props  bootstrap.servers=server-1:9092,server-2:9092,server-3:9092 acks=all
    
    #老版本consumer消费kafka-action数据,offset保存在zookeeper中,并在结束时删除offsets
    kafka-console-consurner.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --topic kafka-action --consumer-property group.id=old-consumer-test  --consumer-property consumer.id=old-consumer-cl  
    --from-beginning --delete-consumer-offsets #新版本consumer消费kafka-action数据,默认offset保存在kafka的__consumer-offsets内部主题中 #若以参数bootstrap-server方式启动,则默认调用的是新版消费者,此时可以不用设置new-consumer 参数 kafka-console consumer.sh -bootstrap-server server-1:9092,server-2:9092,server-3:9092 --new-consumer --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-cl --topic kafka-action #查看主题ka:fka-action 各分区的偏移量信息 #time 参数表示查看在指定时间之前的数据,支持-1(latest),-2 (earliest) 两个时间选项,默认取值为-l 。 kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server-1:9092,server-2:9092,server-3:9092 --topic kafka-action -time -1 #查看__consumer-offsets主题编号为6的分区的信息 kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 6 --broker-list server-1:9092,server-2:9092,server-3:9092 --formatter "kafka.coordinator.GroupMetadataManager$0ffsetsMessageFormatter" #同时消费多个主题 kafka-console-consumer.sh --bootstrap-server server-1:9092,server-2:9092,server-3:9092 --new-consumer --consumer-property group.id=consume-multi-topic --whitelist "kafka-action|producer-perf-test" #查看消费组的消费偏移量,如果待查询消费组的状态维Dead,则无法查看到 kafka-consumer-groups.sh --bootstrap-server server-1:9092,server-2:9092,server-3:9092 --describe --group consumer-offset-test --new-consumer #删除消费组 kafka-consumer-groups.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --delete--group old-consumer-test #消费者性能测试工具, 还可以通过--consumer.config加载property文件来设置属性 kafka-consumer-perf-test.sh --broker-list server-1:9092,server-2:9092,server-3:9092 --threads 5 --messages 1000000 --message-size 1000 --num-fetch-threads 2 --group consumer-perf-test --topic producer-perf-test --new-consumer #查看指定主题所有的覆盖配置 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --describe-entity-type topics --entity-name config-test #增加特定主题的配置,如果有多个配置,以逗号分隔如--alter --add-config flush.messages=2,max.message.bytes=102400 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --describe-entity-type topics --entity-name config-test --alter --add-config flush.messages=2 #删除配置 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --describe-entity-type topics --entity-name config-test --alter --delete-config flush.messages,max.message.bytes #broker限流:对server-1对应的代理(broker.id=1)上分布的Leader副本和Follower副本的复制速率控制为10MB/s kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type brokers --entity-name 1 --alter --add-config follower.replication.throttled.rate=l0485760,leader.replication.throttled.rate=l0485760 #boker限流:查看broker-1的限制配置信息 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type brokers --entity-name 1 --describe #broker限流:删除配置 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type brokers --entity-name 1 --alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate #主题级别限流 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type topics --entity-name kafka-action --alter

    --add-config leader.replication.throttled.replicas=[O:l,1:2,2:3],follower.replication.throttled.replicas=[0:2,1:3,2:1] #客户端流控:为用户lenmom添加限流,前提:kafka集群添加了认证机制,否则不可用此法 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --alter --add-config 'producer_byte_rate=l024,consumer_byte_ rate=2048' --entity-type users --entity-name lenmom #客户端流控:查看用户流控信息 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 describe --entity-type users #客户端流控:在producer/consumer的clientid等于acl-client时,该配置将会生效 kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=l024 , consumer_byte_rate=2048' --entity type clients --entity-name acl-client #客户端流控:为用户lenmom的客户端user-client-config添加流控 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --alter --add-config 'producer byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name lenmom --entity-type clients --entity-name user-client-config #leader平衡,对topic:kafka-action的分区1进行再平衡 #echo '{"partitions": [{"topic":"kafka-action","partition": 1}]}' >partitions-leader-election.json kafka-preferred-replica-election.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --path-to-json-file partitions-leader-election.json #节点下线&分区迁移, 下线broker2 #topics-to-move.json: 其中,version为固定值 #{"topics":[{"topic":"reassign-partitions"}],"version":1} kafka-reassign-partitions.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --topics-to-move-json-file ../config/topics-to-move.json --broker-list " 1,3" --generate #增加指定主题的分区数 kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181 --partitions 6 --topic kafka-action #增加指定主题的副本数,需要事先准备副本分配json文件 kafka-reassign-partitions.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --reassignment-json-file .. /config/replica-extends.json --execute #查看分区副本重分配执行状态 kafka-reassign-partitions.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --reassignment-json-file .. /config/replica-extends.json --verify
  • 相关阅读:
    Ecshop后台管理增加成本价和毛利润统计功能
    如何在Ecshop首页调用积分商城里的商品
    Ecshop调用促销商品
    Python 精选内置函数
    Python爬虫原理
    android中解析后台返回的json字符串
    JSONObject与JSONArray的使用
    python的字符串截取
    centos6系统下网卡bond模式介绍
    fio工具中的iodepth参数与numjobs参数-对测试结果的影响
  • 原文地址:https://www.cnblogs.com/lenmom/p/10301428.html
Copyright © 2011-2022 走看看