zoukankan      html  css  js  c++  java
  • Kafka集群和CMAK部署整理文档

    1.环境说明
    JDK    		    1.8 +    【不要安装OpenJdk】
    kafka   		2.11-2.1.1
    

    2. 源码安装

    Kafka的安装包下载地址:

    https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz 	
    
    1.1、上传到服务器/data目录
    # 解压 
    
    tar -zxvf kafka_2.11-2.1.1.tgz
    
    # 删除无用的包
    
    rm -f kafka_2.11-2.1.1.tgz
    
    # 重命名
    
    mv  kafka_2.11-2.1.1  kafka
    
    
    1.2、修改配置文件
    vim /data/kafka/config/server.properties 
    

    主要修改地方如下:

    #broker.id=0  #每台服务器的broker.id都不能相同,这个一定要注释掉
    
    #监听地址 记得改成你部署服务器的IP:9092
    
    listeners=PLAINTEXT://192.168.2.128:9092 
    
    advertised.listeners=PLAINTEXT://192.168.2.128:9092
    
    #日志存放位置,先手动创建 logs 文件夹
    
    log.dirs=/data/log/kafka/logs
    
    #设置zookeeper的连接端口
    
    zookeeper.connect=192.168.2.128:2181,192.168.2.129:2181,192.168.2.130:2181
    
    

    修改后如下所示:

    # Licensed to the Apache Software Foundation (ASF) under one or more
    
    # contributor license agreements.  See the NOTICE file distributed with
    
    # this work for additional information regarding copyright ownership.
    
    # The ASF licenses this file to You under the Apache License, Version 2.0
    
    # (the "License"); you may not use this file except in compliance with
    
    # the License.  You may obtain a copy of the License at
    
    #
    
    #   http://www.apache.org/licenses/LICENSE-2.0
    
    #
    
    # Unless required by applicable law or agreed to in writing, software
    
    # distributed under the License is distributed on an "AS IS" BASIS,
    
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    # See the License for the specific language governing permissions and
    
    # limitations under the License.
    
     
    
    # see kafka.server.KafkaConfig for additional details and defaults
    
     
    
    ############################# Server Basics #############################
    
     
    
    # The id of the broker. This must be set to a unique integer for each broker.
    
    #broker.id=0
    
     
    
    ############################# Socket Server Settings #############################
    
     
    
    # The address the socket server listens on. It will get the value returned from 
    
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    
    #  FORMAT:
    
    # listeners = listener_name://host_name:port
    
    #  EXAMPLE:
    
    #   listeners = PLAINTEXT://your.host.name:9092
    
    #listeners=PLAINTEXT://:9092
    
     
    
     
    
     
    
     
    
    listeners=PLAINTEXT://192.168.2.128:9092
    
     
    
     
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    
    # returned from java.net.InetAddress.getCanonicalHostName().
    
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    
     
    
     
    
    advertised.listeners=PLAINTEXT://192.168.2.128:9092
    
     
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
     
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    
    num.network.threads=3
    
     
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    
    num.io.threads=8
    
     
    
    # The send buffer (SO_SNDBUF) used by the socket server
    
    socket.send.buffer.bytes=102400
    
     
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    
    socket.receive.buffer.bytes=102400
    
     
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    
    socket.request.max.bytes=104857600
    
     
    
     
    
    ############################# Log Basics #############################
    
     
    
    # A comma separated list of directories under which to store log files
    
    log.dirs=/data/log/kafka-logs
    
     
    
    # The default number of log partitions per topic. More partitions allow greater
    
    # parallelism for consumption, but this will also result in more files across
    
    # the brokers.
    
    num.partitions=1
    
     
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    
    num.recovery.threads.per.data.dir=1
    
     
    
    ############################# Internal Topic Settings  #############################
    
    # The replication factor for the group metadata internal topics "consumer_offsets" and "transaction_state"
    
    # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
    
    offsets.topic.replication.factor=1
    
    transaction.state.log.replication.factor=1
    
    transaction.state.log.min.isr=1
    
     
    
    ############################# Log Flush Policy #############################
    
     
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    
    # the OS cache lazily. The following configurations control the flush of data to disk.
    
    # There are a few important trade-offs here:
    
    #   1. Durability: Unflushed data may be lost if you are not using replication.
    
    #   2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    
    #   3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
     
    
    # The number of messages to accept before forcing a flush of data to disk
    
    #log.flush.interval.messages=10000
    
     
    
    # The maximum amount of time a message can sit in a log before we force a flush
    
    #log.flush.interval.ms=1000
    
     
    
    ############################# Log Retention Policy #############################
    
     
    
    # The following configurations control the disposal of log segments. The policy can
    
    # be set to delete segments after a period of time, or after a given size has accumulated.
    
    # A segment will be deleted whenever either of these criteria are met. Deletion always happens
    
    # from the end of the log.
    
     
    
    # The minimum age of a log file to be eligible for deletion due to age
    
    log.retention.hours=168
    
     
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    
    #log.retention.bytes=1073741824
    
     
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    
    log.segment.bytes=1073741824
    
     
    
    # The interval at which log segments are checked to see if they can be deleted according
    
    # to the retention policies
    
    log.retention.check.interval.ms=300000
    
     
    
    ############################# Zookeeper #############################
    
     
    
    # Zookeeper connection string (see zookeeper docs for details).
    
    # This is a comma separated host:port pairs, each corresponding to a zk
    
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    
    # You can also append an optional chroot string to the urls to specify the
    
    # root directory for all kafka znodes.
    
    zookeeper.connect=192.168.2.128:2181,192.168.2.129:2181,172.31.3.26:2181
    
     
    
    # Timeout in ms for connecting to zookeeper
    
    zookeeper.connection.timeout.ms=6000
    
     
    
     
    
    ############################# Group Coordinator Settings #############################
    
     
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    
    # The default value for this is 3 seconds.
    
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    
    group.initial.rebalance.delay.ms=0
     
    

    t02PUO.png

    1.3、在另外的机器上也做如上的操作
    1.4、启动kafka,后台启动方式

    在配置好的主机上,分别启动kafka,切换到kafka 目录下

    # 后台启动 
    
    cd /data/kafka
    
    bin/kafka-server-start.sh -daemon config/server.properties & 
    
    

    查看kafka进程

     ps axu |grep kafka
    

    t021Pg.png

    1.5、测试kafka

    创建一个 message_topic

     bin/kafka-topics.sh --create --zookeeper 192.168.2.128:2181,192.168.2.129:2181,192.168.2.130:2181 --replication-factor 1 --partitions 1 --topic message_topic
    

    t02Nq0.png

    1.6、配置开机自启
    1. 在 /lib/systemd/system/ 目录下创建 zookeeper服务和kafka服务 的配置文件
    vim kafka.service 
    

    kafka.service 添加内容:

    [Unit]
    
    Description=Apache Kafka server (broker)
    
    After=network.target  zookeeper.service
    
     
    
    [Service]
    
    Type=simple
    
    Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/java/jdk-11.0.1/bin"
    
    User=root
    
    Group=root
    
    ExecStart=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh /opt/kafka/kafka_2.11-2.1.0/config/server.properties
    
    ExecStop=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh
    
    Restart=on-failure
    
     
    
    [Install]
    
    WantedBy=multi-user.target
    
    

    注:以上两个文件 根据自己的 jdk 和 kafka 安装目录相应的修改。

    1. 刷新配置
    systemctl daemon-reload
    
    1. kafka服务加入开机自启。
    systemctl enable kafka
    
    
    1. 使用systemctl启动/关闭/重启 kafka服务
    systemctl start/stop/restart  zookeeper/kafka
    

    注:启动kafka前必须先启动zookeeper 。

     systemctl start zookeeper
    
     systemctl start kafka
    
    
    1. 查看状态
    systemctl status zookeeper
    

    Kafka-manager(CMAK)部署文档

    1.环境说明

    JDK     		1.8 +    【不要安装OpenJdk】
    kafka   		2.11-2.1.1
    Kafka-manager   1.3.3.7 
    

    2.源码安装

    安装包地址

    #获取源码包
    
    wget https://github.com/yahoo/kafka-manager/archive/1.3.3.7.zip 
    
    #解压
    
    unzip 1.3.3.7.zip
    
    # 删除多余的安装包
    
    rm -f 1.3.3.7.zip
    
    # 改名
    
    mv  CMAK-1.3.3.7 cmak
    
    yum安装sbt(因为kafka-manager需要sbt编译)
    
     
    
    curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
    
     sudo mv bintray--sbt-rpm.repo /etc/yum.repos.d/
    
     sudo yum install sbt
    
    

    修改仓库地址:(sbt 默认下载库文件很慢, 还时不时被打断),我们可以采用阿里云的镜像进行替代

    mkdir ~/.sbt ; vim ~/.sbt/repositories
    

    一定要严格按格式来,每行后面不要有空格

    [repositories] 
    
    local
    
    aliyun: http://maven.aliyun.com/nexus/content/groups/public/
    
    typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala[scalaVersion]/)(sbt[sbtVersion]/)[revision]/[type]s/artifact.[ext], bootOnly
    
    sonatype-oss-releases
    
    maven-central
    
    sonatype-oss-snapshots
    
    

    查看版本

    sbt -version
    

    编译kafka-manager

    cd /data/cmak/
    
    ./sbt clean dist
    
    

    3. 修改配置文件

    # 进入配置文件目录
    
    cd /data/cmak/conf
    
    # 修改配置目录文件
    
    vim application.conf
    
    #修改如下:
    
    修改 kafka-manager.zkhosts和cmak.zkhosts,如下所示:
    
        kafka-manager.zkhosts="192.168.2.128:2181,192.168.2.129:2181,192.168.2.130:2181"
    
     这里的地址就是我们 zookeeper 的地址,保存退出
    
    

    t02sz9.png

    4. 启动CMAK

    Kafka-manager 默认端口是 9000,可以通过 -Dhttp.port,指定端口;-Dconfig.file=conf/application.conf 指定配置文件,命令如下:
    
        # nohup  是用于 log 输出的,没输入路径 默认当前 nohup.out 文件存放日志
    
        nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &
    
    稍等几秒钟,查看 kafka-manager 是否启动成功:
    
         ps -ef | grep 9000
    
    Ok,我们到浏览器上,使用 ip:端口 访问
    
    

    5. 界面相关

    1.1、访问主界面
    http://192.168.2.128:9000/
    

    t024iD.png

    1.2、添加Cluster

    Name可以任意写,建议写集群的ip

    Zookeper的集群地址填写

    然后其他的可以默认即可

    点击save

    t0RPLq.png

    1.3、查看集群信息

    ![img](file:///C:UsersadminAppDataLocalTempksohtml11868wps22.jpg)

    查看topic

    t0R1w6.png

  • 相关阅读:
    区分JS的空值
    死锁
    高效的SQLSERVER分页方案
    IIS经典模式VS集成模式
    MVC过滤器
    Request接收参数乱码原理解析
    int三种转化区别
    Area使用
    Action和Partial等区别
    Log4Net
  • 原文地址:https://www.cnblogs.com/charlypage/p/13058182.html
Copyright © 2011-2022 走看看