zoukankan      html  css  js  c++  java
  • 分布式消息系统之Kafka集群部署

      一、kafka简介

      kafka是基于发布/订阅模式的一个分布式消息队列系统,用java语言研发,是ASF旗下的一个开源项目;类似的消息队列服务还有rabbitmq、activemq、zeromq;kafka最主要的优势具备分布式功能,并且结合zookeeper可以实现动态扩容;kafka对消息保存是通过Topic进行分类,发送消息一方称为producer(生产者),接收消息一方称为consumer(消费者);一个kafka集群有多个kafka server组成,我们把每个kafka server称为broker(消息掮客);

      ActiveMQ、RabbitMQ、kafka对比

      二、kafka集群部署

      环境说明

    主机名 ip地址
    node04 192.168.0.44
    node05 192.168.0.45
    node06 192.168.0.46

      

      提示:在部署kafka集群之前,我们要先把zk集群部署起来,因为kafka是强依赖zk集群;zk集群部署请参考上一篇博客https://www.cnblogs.com/qiuhom-1874/p/13841371.html;上面3台server只是kafka集群的三台server;

      1、安装jdk

    [root@node04 ~]# yum install -y java-1.8.0-openjdk-devel
    

      验证java环境

    [root@node04 ~]# java -version
    openjdk version "1.8.0_262"
    OpenJDK Runtime Environment (build 1.8.0_262-b10)
    OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)
    [root@node04 ~]# 
    

      提示:以上安装Java环境,在kafka集群的每个server都要做一遍;除了上面的java环境,还有基础环境像时间同步,主机名解析,关闭selinux,关闭防火墙,主机免密这些都要提前做好;

      2、下载kafka二进制压缩包

    [root@node04 ~]# ll
    total 0
    [root@node04 ~]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
    --2020-10-21 20:06:28--  https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
    Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1
    Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:443... connected.
    HTTP request sent, awaiting response... 200 OK
    Length: 65671917 (63M) [application/octet-stream]
    Saving to: ‘kafka_2.12-2.6.0.tgz’
    
    100%[================================================================================>] 65,671,917  6.38MB/s   in 13s    
    
    2020-10-21 20:06:41 (4.96 MB/s) - ‘kafka_2.12-2.6.0.tgz’ saved [65671917/65671917]
    
    [root@node04 ~]# ll
    total 64136
    -rw-r--r-- 1 root root 65671917 Aug  5 06:01 kafka_2.12-2.6.0.tgz
    [root@node04 ~]# 
    

      3、解压二进制包,并做软连接

    [root@node04 ~]# tar xf kafka_2.12-2.6.0.tgz -C /usr/local/
    [root@node04 ~]# ln -sv /usr/local/kafka_2.12-2.6.0 /usr/local/kafka
    ‘/usr/local/kafka’ -> ‘/usr/local/kafka_2.12-2.6.0’
    [root@node04 ~]# 
    

      提示:其他server也是相同的操作;

      4、配置node04上的kafka

      提示:broker.id是配置broker的id,这个id在kafka集群中必须唯一;listeners是用来指定当前节点监听的socket;log.dirs用来指定kafka的日志文件路径;log.retention.hours用来指定保存多少小时的日志;zookeeper.conect用来指定zk集群各节点信息,通常是把zk所有节点都写上,用逗号隔开;其他的参数都可以不用变;我这里用到主机名,是因为我在hosts文件对所有节点都做了主机名解析;

      创建日志目录

    [root@node04 config]# mkdir -pv /data/kafka
    mkdir: created directory ‘/data’
    mkdir: created directory ‘/data/kafka’
    [root@node04 config]# 
    

      提示:后面的kafka-logs目录在kafka启动时会自动创建;到此node04就配置好了;

      把node04上的配置文件拷贝到node05

    [root@node04 config]# scp server.properties  node05:/usr/local/kafka/config/
    server.properties                                                                       100% 6882     2.0MB/s   00:00    
    [root@node04 config]# 
    

      修改broker.id和listeners配置

      创建日志目录

    [root@node05 ~]#  mkdir -pv /data/kafka
    mkdir: created directory ‘/data’
    mkdir: created directory ‘/data/kafka’
    [root@node05 ~]# 
    

      把node05的配置文件,复制到node06的kafka配置文件目录

    [root@node05 ~]# scp /usr/local/kafka/config/server.properties node06:/usr/local/kafka/config/server.properties
    The authenticity of host 'node06 (192.168.0.46)' can't be established.
    ECDSA key fingerprint is SHA256:lE8/Vyni4z8hsXaa8OMMlDpu3yOIRh6dLcIr+oE57oE.
    ECDSA key fingerprint is MD5:14:59:02:30:c0:16:b8:6c:1a:84:c3:0f:a7:ac:67:b3.
    Are you sure you want to continue connecting (yes/no)? yes
    Warning: Permanently added 'node06,192.168.0.46' (ECDSA) to the list of known hosts.
    server.properties                                                                       100% 6882     1.9MB/s   00:00    
    [root@node05 ~]# 
    

      修改broker.id和listeners配置,并创建日志目录

      到此,三个节点的kafka就配置好了;

      启动各节点上的kafka

    [root@node04 config]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    [root@node04 config]# ss -tnl
    State      Recv-Q Send-Q                Local Address:Port                               Peer Address:Port              
    LISTEN     0      128                               *:22                                            *:*                  
    LISTEN     0      100                       127.0.0.1:25                                            *:*                  
    LISTEN     0      128                              :::22                                           :::*                  
    LISTEN     0      100                             ::1:25                                           :::*                  
    LISTEN     0      50                               :::39779                                        :::*                  
    LISTEN     0      50              ::ffff:192.168.0.44:9092                                         :::*                  
    [root@node04 config]# 
    

      提示:可以看到node04上的9092处于监听状态;用同样的命令把node05,node06上的kafka都启动起来;

      查看日志

      提示:kafka的启动日志放在安装目录下的logs目录,有个server.log;我们刚才创建的日志目录,主要用来保存集群事务的日志;

      测试kafka

      1、在各节点验证kafka进程是否启动

    [root@node04 config]# jps
    1797 Kafka
    2485 Jps
    [root@node04 config]# ssh node05 'jps'
    1840 Jps
    1772 Kafka
    [root@node04 config]# ssh node06 'jps'
    2321 Kafka
    2388 Jps
    [root@node04 config]# 
    

      2、在zk集群上查看,是否有kafka节点注册到上面

    zk: localhost:2181(CONNECTED) 0] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
    [zk: localhost:2181(CONNECTED) 1] ls -R /
    /
    /admin
    /brokers
    /cluster
    /config
    /consumers
    /controller
    /controller_epoch
    /isr_change_notification
    /latest_producer_id_block
    /log_dir_event_notification
    /zookeeper
    /admin/delete_topics
    /brokers/ids
    /brokers/seqid
    /brokers/topics
    /brokers/ids/0
    /brokers/ids/1
    /brokers/ids/2
    /cluster/id
    /config/brokers
    /config/changes
    /config/clients
    /config/topics
    /config/users
    /zookeeper/config
    /zookeeper/quota
    [zk: localhost:2181(CONNECTED) 2] 
    

      提示:可以看到在zk集群上多了很多节点;

      3、创建名为test,partitions为3,replication为3的topic

    [root@node04 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic test
    Created topic test.
    [root@node04 config]# 
    

      在kafka集群的任意节获取topic

    [root@node06 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181,node01:2181,node03:2181 --topic test
    Topic: test     PartitionCount: 3       ReplicationFactor: 3    Configs: 
            Topic: test     Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
            Topic: test     Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
            Topic: test     Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    [root@node06 ~]# 
    

      提示:从上面的返回的状态信息可以看到test topic有三个分区分别为0、1、2,分区0的leader是2(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

      4、删除topic

      6、创建topic,并发送消息

    [root@node04 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic msgtest
    Created topic msgtest.
    [root@node04 config]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list node04:9092,node05:9092,node06:9092 --topic msgtest
    >hello               
    >hi
    >  
    

      在其他节点获取消息

      使用图形工具kafka-tool工具获取消息

      ok,到此kafka这个消息系统就搭建好了;

  • 相关阅读:
    真-关闭win10安全中心(windows defender)
    HOOK IDT频繁蓝屏(Window 正确 HOOK IDT)
    windows 驱动开发 MDL 内核层 用户层共享内存
    C++将时间格式转换成秒数
    关于HOOK KiPageFault需要用到自旋锁研究
    提高VS2010/VS2012编译速度
    apache2.2服务无法启动 发生服务特定错误:1 的解决办法 (windows服务错误 日志查看方法)
    内核同步对象
    C++/MFC-线程优先级
    python xml转excel
  • 原文地址:https://www.cnblogs.com/qiuhom-1874/p/13854759.html
Copyright © 2011-2022 走看看