zoukankan      html  css  js  c++  java
  • ZOOKEEPER+KAFKA 集群搭建

     因为Kafka集群是把状态信息保存在Zookeeper中的,并且Kafka的动态扩容是通过Zookeeper来实现的,所以需要优先搭建Zookeerper集群,建立分布式状态管理。开始准备环境,搭建集群:

    zookeeper是基于Java环境开发的所以需要先安装Java 然后这里使用的zookeeper安装包版本为zookeeper-3.4.14,Kafka的安装包版本为kafka_2.11-2.2.0。

    AMQP协议:Advanced Message Queuing Protocol (高级消息队列协议)是一个标准开放的应用层的消息中间件协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台

    server1:192.168.177.251

    server2:192.168.177.252

    server3:192.168.177.132

    安装之前先检查一下系统有没有自带open-jdk

    命令:

    rpm -qa |grep java

    rpm -qa |grep jdk

    如果没有输入信息表示没有安装。
    检索1.8的列表
    yum list java-1.8*  
    安装1.8.0的所有文件
    yum install java-1.8.0-openjdk* -y

    使用命令检查是否安装成功
    java -version

    cat /etc/hosts

    192.168.177.251 kafka01

    192.168.177.252 kafka02

    192.168.177.133 kafka03

    1.关闭防火墙。selinux

    systemctl stop firewalld

    setenforce 0

    安装zookeeper && kafka 

    wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

    放到/usr/local下

    [root@192 ~]# cd /usr/local/zookeeper/conf/
    [root@192 conf]# mv zoo_sample.cfg zoo.cfg 
    [root@192 conf]# vim zoo.cfg

    tickTime=2000              #zookeeper服务器之间的心跳时间。

    initLimit=10                 #zookeeper的最大连接失败时间

    syncLimit=5          #zookeeper的同步通信时间

    dataDir=/tmp/zookeeper               #zookeeper的存放快照日志的绝对路径

    dataLogDir=/usr/local/zookeeper/zkdatalog                 #zookeeper的存放事物日志的绝对路径

    clientPort=2181              #zookeeper与客户端的连接端口

    server.1=192.168.177.251:2888:3888
    server.2=192.168.177.252:2888:3888
    server.3=192.168.177.132:2888:3888 ###服务器及其编号,服务器IP地址,通信端口,选举端口

     创建myid文件

    [root@192 conf]# mkdir /tmp/zookeeper
    [root@192 conf]# echo "1">/tmp/zookeeper/myid #另外两台节点也要做。重复以上步骤
    

     启动zookeeper

    /usr/local/zookeeper/bin/zkServer.sh start   启动
    /usr/local/zookeeper/bin/zkServer.sh status  查看状态
    

    Mode: leader为主节点,Mode: follower为从节点,zk集群一般只有一个leader多个follower,主一般是响应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

    到此,zookeeper集群搭建结束,接下来基于zookeeper搭建kafka集群:

    Kafka的基本概念:

    主题:Topic特指Kafka处理的消息源(feeds of messages)的不同分类。

    分区:Partition Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

    Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。

    Producers:消息的数据生产者,向Kafka的一个topic发布消息的过程叫做producers。

    Consumers:消息的数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。

    Broker:缓存代理,Kafka集群中的一台或多台服务器统称为broker,这里用的是AMQP协议。

    1. 修改配置文件

    cd /usr/local/kafka/config
     vim server.properties
    broker.id=0                                 #后面的数字唯一的。不可重复
    advertised.listeners=PLAINTEXT://kafka01:9092
    log.retention.hours=168                     #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880                    #消息保存的最大值5M
    default.replication.factor=2                #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880             #取消息的最大直接数
    zookeeper.connect=192.168.177.251:2181,192.168.177.252:2181,192.168.177.132:2181 
    

     注意:先现在把主机名字也给改了

      hostname kafka01 

    2.启动kafka集群

    [root@192 bin]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

    3,检测端口 kafka 端口为9092

    netstat -anpt |grep :9092

    4.测试

    创建topic

    [root@192 config]# cd /usr/local/kafka/bin/
    [root@192 bin]# ./kafka-topics.sh --create --zookeeper 192.168.177.132:2181 --replication-factor 2 --partitions 3 --topic wg01

    #--replication-factor 2                  复制两份

    #--partitions 3                 创建三个分区

    #--topic wg007                 主题为wg007

    5、模拟一个producter:

    ./kafka-console-producer.sh --broker-list 192.168.177.132:9092 --topic wg01

    6、模拟一个consumer:

    ./kafka-console-consumer.sh  --bootstrap-server 192.168.177.132:9092 --topic wg01 --from-beginning

    7、查看topic:

    ./kafka-topics.sh --list --zookeeper 192.168.177.132:2181

  • 相关阅读:
    [Ubuntu]更改所有子文件和子目录所有者权限
    [ubuntu] 外挂硬盘
    HashMap、Hashtable、ConcurrentHashMap的原理与区别
    [Pytorch笔记] scatter_
    [The Annotated Transformer] Iterators
    [python 笔记] __iter__迭代器
    [负数在内存中的存储] 0x80000000 = -2147483648
    [NLP] The Annotated Transformer 代码修正
    [pytorch笔记] 调整网络学习率
    numpy中双冒号的作用
  • 原文地址:https://www.cnblogs.com/wendyluo/p/13232741.html
Copyright © 2011-2022 走看看