zoukankan      html  css  js  c++  java
  • Apache Kafka 分布式消息队列中间件安装与配置 转载

    bin/zkServer.sh start /home/guym/down/kafka_2.8.0-0.8.0/config/zookeeper.properties&

    bin/kafka-server-start.sh config/server.properties

    bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic mykafka

    bin/kafka-list-topic.sh --zookeeper localhost:2181
     

    bin/kafka-console-producer.sh --zookeeper 127.0.0.1:2181 --topic mykafka

    bin/kafka-list-topic.sh --zookeeper localhost:2181


    bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic mykafka

    本文地址: http://blog.csdn.net/wangjia184/article/details/37921183

    本文演示从1个zookeeper+1个kafka broker到3个zookeeper+2个kafka broker集群的配置过程。

    kafka依赖于zookeeper, 首先下载zookeeper和kafka

    1. $ wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz  
    2. $ gzip -d zookeeper-3.4.6.tar.gz  
    3. $ tar -xvf zookeeper-3.4.6.tar  
    4.   
    5. $ wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz  
    6. $ gtar xvzf kafka_2.8.0-0.8.1.1.tgz  

    对于CentOS来说在,在本地试验可能会遇到莫名其妙的问题,这一般是由于主机名不能正确识别导致。为了避免可能遇到的问题,首先查询本机主机名,

    1. $ hostname  
    2. HOME  
    然后加入一条本地解析到/etc/hosts文件中
    1. 127.0.0.1      HOME       




    一个zookeeper  + 一个kafka broker的配置

    将zookeeper/conf/下的zoo_sample.cfg改名成zoo.cfg。 zookeeper默认会读取此配置文件。配置文件暂时不用改,默认即可

    1. $mv zookeeper-3.4.6/conf/zoo_sample.cfg zookeeper-3.4.6/conf/zoo.cfg   

    启动Zookeeper服务, Zookeeper启动成功后在2181端口监听

    1. $ zookeeper-3.4.6/bin/zkServer.sh start  
    2. JMX enabled by default  
    3. Using config: /home/wj/event/zookeeper-3.4.6/bin/../conf/zoo.cfg  
    4. Starting zookeeper ... STARTED  

    启动Kafka服务,启动成功后在9092端口监听

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-server-start.sh kafka_2.8.0-0.8.1.1/config/server.properties  

    开始测试

    1. # 连接zookeeper, 创建一个名为test的topic, replication-factor 和 partitions 后面会解释,先设置为1  
    2. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  
    3. Created topic "test".  
    4.   
    5. # 查看已经创建的topic列表  
    6. $ bin/kafka-topics.sh --list --zookeeper localhost:2181  
    7. test  
    8.   
    9. # 查看此topic的属性  
    10. $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  
    11. Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:  
    12.     Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0  
    13.   
    14. # 生产者连接Kafka Broker发布一个消息  
    15. $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test   
    16. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    17. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    18. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    19. Hello World  

    消费者连接Zookeeper获取消息

    1. $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
    2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    3. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    5. Hello World  

    一个zookeeper  + 两个kafka broker的配置


    为 避免Kafka Broker的Singal-Point-Failure(单点失败), 需要建立多个Kafka Broker。先将kakfa目录中的/config/server.properties复制为/config/server- 2.properties然后编辑它的内容, 具体见注释
    1. # The id of the broker. This must be set to a unique integer for each broker.  
    2. broker.id=1  #每个Kafka Broker应该配置一个唯一的ID  
    3.   
    4. ############################# Socket Server Settings #############################  
    5.   
    6. # The port the socket server listens on  
    7. port=19092   #因为是在同一台机器上开多个Broker,所以使用不同的端口号区分  
    8.   
    9. # Hostname the broker will bind to. If not set, the server will bind to all interfaces  
    10. #host.name=localhost   #如果有多个网卡地址,也可以将不同的Broker绑定到不同的网卡  
    11.   
    12. ############################# Log Basics #############################  
    13.   
    14. # A comma seperated list of directories under which to store log files  
    15. log.dirs=/tmp/kafka-logs-2  #因为是在同一台机器上开多个Broker,需要确保使用不同的日志目录来避免冲突  

    现在就可以用新建的配置文件启动这个一个Broker了
    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-server-start.sh kafka_2.8.0-0.8.1.1/config/server-2.properties  

    现在新创建一个topic, replication-factor表示该topic需要在不同的broker中保存几份,这里replication-factor设置为2, 表示在两个broker中保存。 

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test2  

    然后查看此topic的属性。

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2  
    2. Topic:test2 PartitionCount:1    ReplicationFactor:2 Configs:  
    3.     Topic: test2    Partition: 0    Leader: 0   Replicas: 0,1   Isr: 0,1  
    • Leader: 如果有多个brokerBroker保存同一个topic,那么同时只能有一个Broker负责该topic的读写,其它的Broker作为实时备份。负责读写的Broker称为Leader.
    • Replicas : 表示该topic的0分区在0号和1号broker中保存
    • Isr : 表示当前有效的broker, Isr是Replicas的子集

    在test2这个topic下发布新的消息验证工作正常

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2  
    2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    3. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    5. HHH  
    6. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper lochost:2181 --from-beginning --topic test2  
    7. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    8. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    9. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    10. HHH  

    现在杀掉第一个Broker,模拟此点的崩溃
    1. $ ps aux | grep server.properties  
    2. user        2620  1.5  5.6 2082704 192424 pts/1  Sl+  08:57   0:25 java   
    3. $ kill 2620  

    重新查询此topic的属性,会发现Leader已经进行了切换,而0号Broker也从Isr中消失了。
    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2  
    2. Topic:test2 PartitionCount:1    ReplicationFactor:2 Configs:  
    3.     Topic: test2    Partition: 0    Leader: 1   Replicas: 0,1   Isr: 1  
    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-producer.sh --broker-list localhost:19092 --topic test2  
    2. # 使用1号broker再发布一个消息到test2下  
    3. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    4. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    5. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    6. Another message  
    7.   
    8. # 消费者查询,仍然工作正常  
    9. $ kafka_2.8.0-0.8.1.1/bin/kafka-consolconsumer.sh --zookeeper localhost:2181 --from-beginning92 --topic test2  
    10. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    11. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    12. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    13. HHH  
    14. Another message  


    三个zookeeper + 两个kafka broker的配置


    同 样,zookeeper也需要搭建cluster, 避免出现Single-Point-Failure. 由于zookeeper采用投票的方式来重新选举某节点失败后的leader, 所以至少需要三个zookeeper才能组成群集。且最好使用奇数个(而非偶数)。

    下面是演示单机搭建最简单的zookeeper cluster, 具体的可以参考http://myjeeva.com/zookeeper-cluster-setup.html

    1. #!/bin/sh  
    2.   
    3. #下载  
    4. wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz  
    5.   
    6. gzip -d zookeeper-3.4.6.tar.gz  
    7. tar xvf zookeeper-3.4.6.tar  
    8.   
    9. #重命名 zoo_sample.cfg 为 zoo.cfg  
    10. mv zookeeper-3.4.6/conf/zoo_sample.cfg zookeeper-3.4.6/conf/zoo.cfg  
    11.   
    12. #新建一个目录  
    13. sudo mkdir /usr/zookeeper-cluster  
    14. sudo chown -R jerry:jerry /usr/zookeeper-cluster  
    15.   
    16. #3个子目录分别对应三个zookeeper服务  
    17. mkdir /usr/zookeeper-cluster/server1  
    18. mkdir /usr/zookeeper-cluster/server2  
    19. mkdir /usr/zookeeper-cluster/server3  
    20.   
    21. #建立三个目录存放各自的数据文件  
    22. mkdir /usr/zookeeper-cluster/data  
    23. mkdir /usr/zookeeper-cluster/data/server1  
    24. mkdir /usr/zookeeper-cluster/data/server2  
    25. mkdir /usr/zookeeper-cluster/data/server3  
    26.   
    27. #建立三个目录存放各自的日志文件  
    28. mkdir /usr/zookeeper-cluster/log  
    29. mkdir /usr/zookeeper-cluster/log/server1  
    30. mkdir /usr/zookeeper-cluster/log/server2  
    31. mkdir /usr/zookeeper-cluster/log/server3  
    32.   
    33. #在每一个数据文件目录中,新建一个myid文件,文件必须是唯一的服务标识,在后面的配置中会用到  
    34. echo '1' > /usr/zookeeper-cluster/data/server1/myid  
    35. echo '2' > /usr/zookeeper-cluster/data/server2/myid  
    36. echo '3' > /usr/zookeeper-cluster/data/server3/myid  
    37.   
    38. #将zookeeper复制三份  
    39. cp -rf zookeeper-3.4.6/* /usr/zookeeper-cluster/server1  
    40. cp -rf zookeeper-3.4.6/* /usr/zookeeper-cluster/server2  
    41. cp -rf zookeeper-3.4.6/* /usr/zookeeper-cluster/server3  

    然后编辑每个zookeeper的zoo.cfg配置文件。
    将dataDir和dataLogDir设置为各自独立的目录;然后保证clientPort不会和其它zookeeper冲突(因为这里演示是3个实例安装于一台服务器上)

    最后加入下面几行

    1. server.1=0.0.0.0:2888:3888  
    2. server.2=0.0.0.0:12888:13888  
    3. server.3=0.0.0.0:22888:23888  
    server.X=IP:port1:port2

    X是在该zookeeper数据文件目录中myid指定的服务ID.

    IP是当前zookeeper绑定的IP地址,因为是演示,所以全都是localhost

    port1 是Quorum Port

    port2 是Leader Election Port

    由于3个zookeeper在同一台机器上,需要使用不同的端口号避免冲突。

    修改后的结果如下

    /usr/zookeeper-cluster/server1/conf/zoo.cfg

    1. # The number of milliseconds of each tick  
    2. tickTime=2000  
    3. # The number of ticks that the initial   
    4. # synchronization phase can take  
    5. initLimit=10  
    6. # The number of ticks that can pass between   
    7. # sending a request and getting an acknowledgement  
    8. syncLimit=5  
    9. # the directory where the snapshot is stored.  
    10. # do not use /tmp for storage, /tmp here is just   
    11. # example sakes.  
    12. dataDir=/usr/zookeeper-cluster/data/server1  
    13. dataLogDir=/usr/zookeeper-cluster/log/server1  
    14. # the port at which the clients will connect  
    15. clientPort=2181  
    16. # the maximum number of client connections.  
    17. # increase this if you need to handle more clients  
    18. #maxClientCnxns=60  
    19. #  
    20. # Be sure to read the maintenance section of the   
    21. # administrator guide before turning on autopurge.  
    22. #  
    23. # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance  
    24. #  
    25. # The number of snapshots to retain in dataDir  
    26. #autopurge.snapRetainCount=3  
    27. # Purge task interval in hours  
    28. # Set to "0" to disable auto purge feature  
    29. #autopurge.purgeInterval=1  
    30.   
    31. server.1=0.0.0.0:2888:3888  
    32. server.2=0.0.0.0:12888:13888  
    33. server.3=0.0.0.0:22888:23888  

     /usr/zookeeper-cluster/server2/conf/zoo.cfg

    1. # The number of milliseconds of each tick  
    2. tickTime=2000  
    3. # The number of ticks that the initial   
    4. # synchronization phase can take  
    5. initLimit=10  
    6. # The number of ticks that can pass between   
    7. # sending a request and getting an acknowledgement  
    8. syncLimit=5  
    9. # the directory where the snapshot is stored.  
    10. # do not use /tmp for storage, /tmp here is just   
    11. # example sakes.  
    12. dataDir=/usr/zookeeper-cluster/data/server2  
    13. dataLogDir=/usr/zookeeper-cluster/log/server2  
    14. # the port at which the clients will connect  
    15. clientPort=12181  
    16. # the maximum number of client connections.  
    17. # increase this if you need to handle more clients  
    18. #maxClientCnxns=60  
    19. #  
    20. # Be sure to read the maintenance section of the   
    21. # administrator guide before turning on autopurge.  
    22. #  
    23. # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance  
    24. #  
    25. # The number of snapshots to retain in dataDir  
    26. #autopurge.snapRetainCount=3  
    27. # Purge task interval in hours  
    28. # Set to "0" to disable auto purge feature  
    29. #autopurge.purgeInterval=1  
    30.   
    31. server.1=0.0.0.0:2888:3888  
    32. server.2=0.0.0.0:12888:13888  
    33. server.3=0.0.0.0:22888:23888  

      /usr/zookeeper-cluster/server3/conf/zoo.cfg

    1. # The number of milliseconds of each tick  
    2. tickTime=2000  
    3. # The number of ticks that the initial   
    4. # synchronization phase can take  
    5. initLimit=10  
    6. # The number of ticks that can pass between   
    7. # sending a request and getting an acknowledgement  
    8. syncLimit=5  
    9. # the directory where the snapshot is stored.  
    10. # do not use /tmp for storage, /tmp here is just   
    11. # example sakes.  
    12. dataDir=/usr/zookeeper-cluster/data/server3  
    13. dataLogDir=/usr/zookeeper-cluster/log/server3  
    14.   
    15. # the port at which the clients will connect  
    16. clientPort=22181  
    17. # the maximum number of client connections.  
    18. # increase this if you need to handle more clients  
    19. #maxClientCnxns=60  
    20. #  
    21. # Be sure to read the maintenance section of the   
    22. # administrator guide before turning on autopurge.  
    23. #  
    24. # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance  
    25. #  
    26. # The number of snapshots to retain in dataDir  
    27. #autopurge.snapRetainCount=3  
    28. # Purge task interval in hours  
    29. # Set to "0" to disable auto purge feature  
    30. #autopurge.purgeInterval=1  
    31.   
    32. server.1=0.0.0.0:2888:3888  
    33. server.2=0.0.0.0:12888:13888  
    34. server.3=0.0.0.0:22888:23888  

    然后分别启动3个zookeeper服务

    1. $ /usr/zookeeper-cluster/server1/bin/zkServer.sh start  
    2. JMX enabled by default  
    3. Using config: /usr/zookeeper-cluster/server1/bin/../conf/zoo.cfg  
    4. Starting zookeeper ... STARTED  
    5. $ /usr/zookeeper-cluster/server2/bin/zkServer.sh start  
    6. JMX enabled by default  
    7. Using config: /usr/zookeeper-cluster/server2/bin/../conf/zoo.cfg  
    8. Starting zookeeper ... STARTED  
    9. $ /usr/zookeeper-cluster/server3/bin/zkServer.sh start  
    10. JMX enabled by default  
    11. Using config: /usr/zookeeper-cluster/server3/bin/../conf/zoo.cfg  
    12. Starting zookeeper ... STARTED  

    启动完成后查看每个服务的状态,下面可以看到server2被选为了leader. 而其它2个服务为follower.

    1. $ /usr/zookeeper-cluster/server1/bin/zkServer.sh status  
    2. JMX enabled by default  
    3. Using config: /usr/zookeeper-cluster/server1/bin/../conf/zoo.cfg  
    4. Mode: follower  
    5. $ /usr/zookeeper-cluster/server2/bin/zkServer.sh status  
    6. JMX enabled by default  
    7. Using config: /usr/zookeeper-cluster/server2/bin/../conf/zoo.cfg  
    8. Mode: leader  
    9. $ /usr/zookeeper-cluster/server3/bin/zkServer.sh status  
    10. JMX enabled by default  
    11. Using config: /usr/zookeeper-cluster/server3/bin/../conf/zoo.cfg  
    12. Mode: follower  

    接下来修改kafka的server.properties配置文件

    kafka_2.8.0-0.8.1.1/config/server.properties和kafka_2.8.0-0.8.1.1/config/server-2.properties

    将3个zookeeper的地址加入到zookeeper.connect中,如下:

    1. zookeeper.connect=localhost:2181,localhost:12181,localhost:22181  

    启动2个Kafka broker

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-server-start.sh kafka_2.8.0-0.8.1.1/config/server.properties  
    2. $ kafka_2.8.0-0.8.1.1/bin/kafka-server-start.sh kafka_2.8.0-0.8.1.1/config/server-2.properties  

    接下来验证一下

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test3 --from-beginning  
    2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    3. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    5. fhsjdfhdsa  
    6. fjdsljfdsadsfdas  
    7.   
    8. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --zookeepecalhost:12181 --topic test3 --from-beginning  
    9. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    10. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    11. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    12. fhsjdfhdsa  
    13. fjdsljfdsadsfdas  
    14.   
    15. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --zookeepecalhost:22181 --topic test3 --from-beginning  
    16. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    17. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    18. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    19. fhsjdfhdsa  
    20. fjdsljfdsadsfdas  

    现在模拟leader挂掉的情况,直接将server2 的zookeeper杀掉

    1. $ ps aux | grep server2  
    2. user     2493  1.0  1.8 1661116 53792 pts/0   Sl   14:46   0:02 java  
    3. $ kill 2493  

    重新查询一次各zookeeper的状态,会发现leader发生了改变

    1. $ /usr/zookeeper-cluster/server3/bin/zkServer.sh status  
    2. JMX enabled by default  
    3. Using config: /usr/zookeeper-cluster/server3/bin/../conf/zoo.cfg  
    4. Mode: leader  
    5. $ /usr/zookeeper-cluster/server1/bin/zkServer.sh status  
    6. JMX enabled by default  
    7. Using config: /usr/zookeeper-cluster/server1/bin/../conf/zoo.cfg  
    8. Mode: follower  
    9. $ /usr/zookeeper-cluster/server2/bin/zkServer.sh status  
    10. JMX enabled by default  
    11. Using config: /usr/zookeeper-cluster/server2/bin/../conf/zoo.cfg  
    12. Error contacting service. It is probably not running.  

    再次验证,kafka集群仍然工作正常。

    1. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test3 --from-beginning  
    2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    3. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    5. fhsjdfhdsa  
    6. fjdsljfdsadsfdas  
    7.   
    8. $ kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --zookeepecalhost:22181 --topic test3 --from-beginning  
    9. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    10. SLF4J: Defaulting to no-operation (NOP) logger implementation  
    11. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    12. fhsjdfhdsa  
    13. fjdsljfdsadsfdas  
    14.   
    15. $ kafka_2.8.0-0.8.1.1/bin/katopics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test5  
    16. Created topic "test5".  
    17.   
    18. $ kafka_2.8.0-0.8.1.1/bin/katopics.sh --create --zookeeper localhost:22181 --replication-factor 2 --partitions 2 --topic test6  
    19. Created topic "test6".  
    转载 本文地址: http://blog.csdn.net/wangjia184/article/details/37921183
  • 相关阅读:
    LC.225. Implement Stack using Queues(using two queues)
    LC.232. Implement Queue using Stacks(use two stacks)
    sort numbers with two stacks(many duplicates)
    LC.154. Find Minimum in Rotated Sorted Array II
    LC.81. Search in Rotated Sorted Array II
    LC.35.Search Insert Position
    前后端分离:(一)
    Redis基本使用(一)
    GIT篇章(二)
    GIT篇章(一)
  • 原文地址:https://www.cnblogs.com/gym333/p/4481650.html
Copyright © 2011-2022 走看看