zoukankan      html  css  js  c++  java
  • kafka+zookeeper集群

    参考:  kafka中文文档

         快速搭建kafka+zookeeper高可用集群

         kafka+zookeeper集群搭建

       kafka+zookeeper集群部署

       kafka集群部署

         kafka体系架构讲解

       kafka工作原理

    一. 环境准备

    关闭selinux,关闭防火墙

    kafka 版本:  kafka_2.11-2.1.0

    zookpeeper版本: 3.4.12

    jdk: 1.8

    ip 角色 系统
    172.10.10.226 zookeeper+kafka redhat7.3
    172.10.10.225 zookeeper+kafka redhat7.3
    172.10.10.224 zookeeper+kafka redhat7.3

    二. zookeeper集群搭建

    参考: https://www.cnblogs.com/linuxprobe/p/5851699.html

             http://www.cnblogs.com/ahu-lichang/p/6723826.html

    2.1 jdk 安装

    也可自己下载安装,这里用的yum

    JDK下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html

    rpm -ivh jdk-8u101-linux-x64.rpm
    yum install java-1.8.0

    2.2 zookeeper安装

    Zookeeper链接:http://zookeeper.apache.org/

    wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz -P /mnt
    tar zxvf zookeeper-3.4.12.tar.gz -C /mnt
    cd /mnt && mv zookeeper-3.4.12 zookeeper
    cd zookeeper
    cp conf/zoo_sample.cfg conf/zoo.cfg

    #把zookeeper加入到环境变量

    echo -e "# append zk_env export PATH=$PATH:/opt/zookeeper/bin" >> /etc/profile

    2.3  zookeeper集群配置

    2.3.1 zookeeper配置文件修改.在zookeeper的conf目录创建

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataLogDir=/mnt/zookeeper/logs
    dataDir=/mnt/zookeeper/data
    clientPort=2181
    #autopurge.snapRetainCount=500
    #autopurge.purgeInterval=24
    server.1= 172.10.10.226:2888:3888  #server.1 中的1表示的该node的id,要和后面myid文件中保持一致
    server.2= 172.10.10.225:2888:3888
    server.3= 172.10.10.224:2888:3888

    #######参数说明

    tickTime这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是说每个tickTime时间就会发送一个心跳。

    
    

    initLimit这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。

    
    

    当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20秒。

    
    

    syncLimit这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。

    
    

    dataDir顾名思义就是zookeeper保存数据的目录,默认情况下zookeeper将写数据的日志文件也保存在这个目录里;

    
    

    clientPort这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求;

    
    

    server.A=B:C:D中的A是一个数字,表示这个是第几号服务器,B是这个服务器的IP地址,C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用的端口。

     

    #创建相关目录,三台节点都需要

    mkdir -p /mnt/zookeeper/{logs,data}

    #其余zookeeper节点安装完成之后,同步配置文件zoo.cfg。

    2.3.2  创建serverid表识

    除了修改zoo.cfg配置文件外,zookeeper集群模式下还要配置一个myid文件,这个文件需要放在dataDir目录下。

    这个文件里面有一个数据就是A的值(该A就是zoo.cfg文件中server.A=B:C:D中的A),在zoo.cfg文件中配置的dataDir路径中创建myid文件。

    #在172.10.10.226服务器上面创建myid文件,并设置值为1,同时与zoo.cfg文件里面的server.1保持一致,如下

    echo "1" > /mnt/zookeeper/data/myid

    其它2台id分别为  2  ,3

    2.4. 启动每个服务器上的集群节点

    /mnt/zookeeper/bin/zkServer.sh start    #启动

    /mnt/zookeeper/bin/zkServer.sh status  #查看

    bin/zkCli.sh -server 172.10.10.225:2181  #模拟客户端连接

    Zookeeper集群搭建完毕之后,可以通过客户端脚本连接到zookeeper集群上面,对客户端来说,zookeeper集群是一个整体,连接到zookeeper集群实际上感觉在独享整个集群的服务。    

    三. kafka集群搭建

    1. 每台服务器上下载解压kafka.

    下载地址:  http://kafka.apache.org/downloads

    2. 配置conf目录下的server.properties

    [root@r0 kafka_2.11-2.1.0]# grep -E  -v "^#|^$" config/server.properties 
    broker.id=0  ###集群中唯一
    listeners=PLAINTEXT://172.10.10.226:9092 ####本地ip:端口
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=3  #### 一个topic的partition个数
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    replication.factor=2   ##### 每个partition的副本
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=172.10.10.226:2181,172.10.10.225:2181,172.10.10.224:2181 ####zookeeper集群.
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable=true ########可删除topic
     

    将上述配置文件复制到另外2台服务器,并修改每个节点对应的 server.properties 文件的 broker.id和listenrs

    3. 启动服务

    bin/kafka-server-start.sh config/server.properties &

    4. kafka+zookeeper测试

    创建topic

    bin/kafka-topics.sh --create --zookeeper 172.10.10.226:2181,172.10.10.225:2182,172.10.10.224:2183 --replication-factor 2  --partitions 3 --topic test

    显示topic

    bin/kafka-topics.sh --describe --zookeeper 172.10.10.226:2181,172.10.10.225:2182,172.10.10.224:2183 --topic test
    OpenJDK
    64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Topic:test PartitionCount:3 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0

     创建 producer(生产者):

    [root@r0 kafka_2.11-2.1.0]# bin/kafka-console-producer.sh --broker-list 172.10.10.226:9092 -topic test
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    >
    
    >>1111
    >2222
    >333

     创建 consumer(消费者):

    [root@r2 kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 172.10.10.226:9092,172.10.10.225:9092,172.10.10.224:9092 --topic test --from-beginning
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    message 3
    message 6
    333333333
    5555555555
    5
    5
    
    
    #问题,发现为啥顺序不对,生产者的输入顺序和消费者的读取顺序不一致.

    5. 关闭集群

    # 删除topic
    bin/kafka-topics.sh --delete --zookeeper 172.10.10.226:2181,172.10.10.225:2181,172.10.10.224:2181 --topic test
    
    # 关闭kafka ,3台都执行
    [root@worker2 kafka_2.12-1.1.0]$ bin/kafka-server-stop.sh conf/server.properties
    
    [root@worker2 kafka_2.12-1.1.0]$ bin/kafka-server-stop.sh conf/server.properties
    
    [root@worker2 kafka_2.12-1.1.0]$ bin/kafka-server-stop.sh conf/server.properties
    
    # 关闭zookeeper ,3台都执行
    [root@master zookeeper-3.4.11]$ bin/zkServer.sh stop conf/zoo.cfg
    
    [root@worker1 zookeeper-3.4.11]$ bin/zkServer.shstop conf/zoo.cfg
    
    [root@worker2 zookeeper-3.4.11]$ bin/zkServer.shstop conf/zoo.cfg
     
  • 相关阅读:
    大数据基础---Spark累加器与广播变量
    大数据基础---Spark部署模式与作业提交
    大数据基础---Spark_Transformation和Action算子
    大数据基础---Spark_RDD
    大数据基础---Spark开发环境搭建
    大数据基础---Spark简介
    利用numpy 计算信息量
    三调地类分级字典
    省/直辖市行政区代码表
    设置 Jupyter notebook 运行的浏览器
  • 原文地址:https://www.cnblogs.com/yitianyouyitian/p/10286799.html
Copyright © 2011-2022 走看看