zoukankan      html  css  js  c++  java
  • kafka集群部署

    版本:kafka_2.10-0.10.1.1 + zookeeper-3.4.9

    说明:kafka自带了zookeeper,但是推荐使用独立的zookeeper集群。

    前置环境:(参考Hadoop搭建)

    1、节点配分配:
    192.168.235.138 node1
    192.168.235.139 node2
    192.168.235.140 node3
    2、配置静态IP、hosts、hostname、关闭防火墙、安装JDK
    View Code

    1、zookeeper集群部署

    (1)安装

    上传文件到指定集群上,执行如下步骤

    # 解压文件到/usr/local/目录下
    # tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local/
    # 删除无用文件(可选)
    # cd /usr/local/zookeeper-3.4.9/
    # find . -name "*.cmd" -exec rm -rf {} ;
    # rm -rf docs/
    # 创建数据、日志存放目录及当前节点ID
    # mkdir data 
    # mkdir dataLog
    # echo "1" > data/myid
    View Code

    (2)配置

    # pwd
    /usr/local/zookeeper-3.4.9
    # cp conf/zoo_sample.cfg conf/kafka_zk.cfg 
    # vim conf/kafka_zk.cfg 
    # 在kafka_zk.cfg文件中加入如下配置:
    tickTime=2000
    # 数据文件存放位置
    dataDir=/usr/local/zookeeper-3.4.9/data
    dataLogDir=/usr/local/zookeeper-3.4.9/dataLog
    #服务监听端口
    clientPort=2181
    #选举等待时间
    initLimit=5
    syncLimit=2
    #集群节点信息
    server.1=node1:2888:3888
    server.2=node2:2888:3888
    server.3=node3:2888:3888
    View Code

    (3)分发文件、修改对应节点的ID

    scp -r zookeeper-3.4.9 root@node2:/usr/local/
    scp -r zookeeper-3.4.9 root@node3:/usr/local/
    View Code

    修改node2和node3节点上/usr/local/zookeeper-3.4.9/data目录下的myid文件内容分别为2、3

    (4)查看各节点时间并同步时钟

    # date  #系统时间,系统重启失效
    2017年 04月 14日 星期五 11:28:08 CST
    # 手动更改系统时间
    # date -s '2017/04/14 11:33:50'
    # clock -r    #读取CMOS时间
    2017年04月14日 星期五 19时34分33秒  -1.038372 seconds
    # clock -w   #将当前时间写入CMOS
    View Code

    注:集群各节点之间的时间同步很重要,必要时部署本地集群的时钟服务器

    (5)启动集群

    # /usr/local/zookeeper-3.4.9/bin/zkServer.sh start /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #启动(指定配置文件)
    # jps  #查看进程
    15286 QuorumPeerMain
    15319 Jps
    # /usr/local/zookeeper-3.4.9/bin/zkServer.sh status /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg   #查看状态
    # /usr/local/zookeeper-3.4.9/bin/zkServer.sh stop /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg     #关闭 
    View Code

    注意:

      i 最好在启动、关闭时都指定配置文件,一方面,集群中可能有多个配置文件,另一方面,暴力kill进程可能会带来意想不到的错误。

      ii jps只能查看进程是否存在,在某些情况下进程存在不代表zk集群处于可用状态,可通过查看zk状态或者用自带的客户端连接确认是

    否处于可用状态。连接示例如下:

    # /usr/local/zookeeper-3.4.9/bin/zkCli.sh -server node1:2181
    日志。。。。
    [zk: node1:2181(CONNECTED) 0] ls /
    [zookeeper]
    [zk: node1:2181(CONNECTED) 1] help  #命令参考
    ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit 
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close 
        ls2 path [watch]
        history 
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path
    退出:quit 或 Ctrl + C/D
    View Code

    (6)zk自带工具介绍

    zookeeper自带的工具如下:

    # ls bin/
      zkCleanup.sh  zkCli.sh  zkEnv.sh  zkServer.sh
    # zkCleanup.sh :zookeeper不会自己删除旧的快照和日志文件,需要人工清除,可以自己写脚本清除或者用zk自带的zkCleanup工具。
    # zkCli.sh :zk客户端
    # zkEnv.sh : 主要配置,zookeeper集群启动时配置环境变量的文件
    # zkServer.sh 管理程序文件
    View Code

    日志管理是后期维护的重要一环,要特别注意。至此,zk集群搭建完毕。

    2、kafka集群

    (1)安装

    上传文件到集群节点

    tar -zxvf kafka_2.10-0.10.1.1.tgz -C /usr/local/
    cd /usr/local/
    mv kafka_2.10-0.10.1.1 kafka
    View Code

    (2)配置

    # cd /usr/local/kafka/
    # mkdir logs                      #预设日志存放目录
    # ll config/ | awk '{print $9}'   #配置文件列表
    connect-console-sink.properties
    connect-console-source.properties
    connect-distributed.properties
    connect-file-sink.properties
    connect-file-source.properties
    connect-log4j.properties
    connect-standalone.properties
    consumer.properties         #消费者配置选项
    log4j.properties
    producer.properties         #生产者配置选项
    server.properties            #主要配置文件
    tools-log4j.properties
    zookeeper.properties
    # vim config/server.properties 
    修改如下参数,没有则添加
    broker.id=0                 
    host.name=192.168.235.138  
    log.dirs=/usr/local/kafka/logs   
    
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    
    zookeeper.connect=node1:2181,node2:2181,node3:2181
    View Code

    补充:server.properties可选参数如下

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=19092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
    View Code

    注意:每个节点对应一个broker id,并和IP一一对应

    (3)启动服务

    # cd kafka/
    # bin/kafka-server-start.sh config/server.properties 
    # jps
    View Code

    (4)测试

    创建一个主题,并用node1作为消息生产者(发布者),node2、node3作为消息消费者(订阅者)。

    在node1节点上:

    #启动服务
    # bin/kafka-server-start.sh -daemon config/server.properties  
    # jps
    # 创建一个主题test:一个分区,两个副本
    # bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partition 1 --topic test   
    # 创建一个生产者(消息发布者)
    # bin/kafka-console-producer.sh --broker-list node1:9092 --topic test
    等待输入要发布的消息。。。

    在node2、node3节点上:

    # bin/kafka-server-start.sh -daemon config/server.properties 
    # 创建一个消费者(消息订阅者) # bin
    /kafka-console-consumer.sh --zookeeper node1:2181 --topic test --from beginning
    等待接收消息。。。

    在node1的发布频道里输入消息,可在node2、node3上接收到对应的消息。

    其他一些常用命令参考:

    查看主题
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    查看主题详情
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    删除主题
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
    生产者参数查看:bin/kafka-console-producer.sh
    消费者参数查看:bin/kafka-console-consumer.sh

    (5)补充:

    日志说明:

    server.log #kafka的运行日志
    state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
    controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

    zookeeper查看:

    # bin/zkCli.sh 
    ...
    [zk: localhost:2181(CONNECTED) 0] ls /
    [isr_change_notification, zookeeper, admin, consumers, cluster, config, controller, brokers, controller_epoch]
    每个节点上都可以通过zookeeper查看kafka的节点注册信息

    官方文档:http://kafka.apache.org/documentation.html

    参考:http://www.cnblogs.com/luotianshuai/p/5206662.html

  • 相关阅读:
    vue form dynamic validator All In one
    TypeScript api response interface All In One
    closable VS closeable All In One
    macOS 如何开启 WiFi 热点 All In One
    vue css inline style All In One
    vs2010里面 新建网站里面的 asp.net网站 和 新建项目里面的 asp.net Web应用程序 的区别 (下)
    牛腩新闻 59 整合添加新闻页 FreeTextBox 富文本编辑器,检测到有潜在危险的 Request.Form 值,DropDownList 的使用
    牛腩新闻 61尾声: error.aspx的使用 防止报错
    vs2010里面 新建网站里面的 asp.net网站 和 新建项目里面的 asp.net Web应用程序 的区别 (上)
    牛腩新闻 62:尾声续2 asp.net的编译和发布
  • 原文地址:https://www.cnblogs.com/chinas/p/6708126.html
Copyright © 2011-2022 走看看