zoukankan      html  css  js  c++  java
  • kafka集群部署

    版本:kafka_2.10-0.10.1.1 + zookeeper-3.4.9

    说明:kafka自带了zookeeper,但是推荐使用独立的zookeeper集群。

    前置环境:(参考Hadoop搭建)

    1、节点配分配:
    192.168.235.138 node1
    192.168.235.139 node2
    192.168.235.140 node3
    2、配置静态IP、hosts、hostname、关闭防火墙、安装JDK
    View Code

    1、zookeeper集群部署

    (1)安装

    上传文件到指定集群上,执行如下步骤

    # 解压文件到/usr/local/目录下
    # tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local/
    # 删除无用文件(可选)
    # cd /usr/local/zookeeper-3.4.9/
    # find . -name "*.cmd" -exec rm -rf {} ;
    # rm -rf docs/
    # 创建数据、日志存放目录及当前节点ID
    # mkdir data 
    # mkdir dataLog
    # echo "1" > data/myid
    View Code

    (2)配置

    # pwd
    /usr/local/zookeeper-3.4.9
    # cp conf/zoo_sample.cfg conf/kafka_zk.cfg 
    # vim conf/kafka_zk.cfg 
    # 在kafka_zk.cfg文件中加入如下配置:
    tickTime=2000
    # 数据文件存放位置
    dataDir=/usr/local/zookeeper-3.4.9/data
    dataLogDir=/usr/local/zookeeper-3.4.9/dataLog
    #服务监听端口
    clientPort=2181
    #选举等待时间
    initLimit=5
    syncLimit=2
    #集群节点信息
    server.1=node1:2888:3888
    server.2=node2:2888:3888
    server.3=node3:2888:3888
    View Code

    (3)分发文件、修改对应节点的ID

    scp -r zookeeper-3.4.9 root@node2:/usr/local/
    scp -r zookeeper-3.4.9 root@node3:/usr/local/
    View Code

    修改node2和node3节点上/usr/local/zookeeper-3.4.9/data目录下的myid文件内容分别为2、3

    (4)查看各节点时间并同步时钟

    # date  #系统时间,系统重启失效
    2017年 04月 14日 星期五 11:28:08 CST
    # 手动更改系统时间
    # date -s '2017/04/14 11:33:50'
    # clock -r    #读取CMOS时间
    2017年04月14日 星期五 19时34分33秒  -1.038372 seconds
    # clock -w   #将当前时间写入CMOS
    View Code

    注:集群各节点之间的时间同步很重要,必要时部署本地集群的时钟服务器

    (5)启动集群

    # /usr/local/zookeeper-3.4.9/bin/zkServer.sh start /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #启动(指定配置文件)
    # jps  #查看进程
    15286 QuorumPeerMain
    15319 Jps
    # /usr/local/zookeeper-3.4.9/bin/zkServer.sh status /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg   #查看状态
    # /usr/local/zookeeper-3.4.9/bin/zkServer.sh stop /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg     #关闭 
    View Code

    注意:

      i 最好在启动、关闭时都指定配置文件,一方面,集群中可能有多个配置文件,另一方面,暴力kill进程可能会带来意想不到的错误。

      ii jps只能查看进程是否存在,在某些情况下进程存在不代表zk集群处于可用状态,可通过查看zk状态或者用自带的客户端连接确认是

    否处于可用状态。连接示例如下:

    # /usr/local/zookeeper-3.4.9/bin/zkCli.sh -server node1:2181
    日志。。。。
    [zk: node1:2181(CONNECTED) 0] ls /
    [zookeeper]
    [zk: node1:2181(CONNECTED) 1] help  #命令参考
    ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit 
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close 
        ls2 path [watch]
        history 
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path
    退出:quit 或 Ctrl + C/D
    View Code

    (6)zk自带工具介绍

    zookeeper自带的工具如下:

    # ls bin/
      zkCleanup.sh  zkCli.sh  zkEnv.sh  zkServer.sh
    # zkCleanup.sh :zookeeper不会自己删除旧的快照和日志文件,需要人工清除,可以自己写脚本清除或者用zk自带的zkCleanup工具。
    # zkCli.sh :zk客户端
    # zkEnv.sh : 主要配置,zookeeper集群启动时配置环境变量的文件
    # zkServer.sh 管理程序文件
    View Code

    日志管理是后期维护的重要一环,要特别注意。至此,zk集群搭建完毕。

    2、kafka集群

    (1)安装

    上传文件到集群节点

    tar -zxvf kafka_2.10-0.10.1.1.tgz -C /usr/local/
    cd /usr/local/
    mv kafka_2.10-0.10.1.1 kafka
    View Code

    (2)配置

    # cd /usr/local/kafka/
    # mkdir logs                      #预设日志存放目录
    # ll config/ | awk '{print $9}'   #配置文件列表
    connect-console-sink.properties
    connect-console-source.properties
    connect-distributed.properties
    connect-file-sink.properties
    connect-file-source.properties
    connect-log4j.properties
    connect-standalone.properties
    consumer.properties         #消费者配置选项
    log4j.properties
    producer.properties         #生产者配置选项
    server.properties            #主要配置文件
    tools-log4j.properties
    zookeeper.properties
    # vim config/server.properties 
    修改如下参数,没有则添加
    broker.id=0                 
    host.name=192.168.235.138  
    log.dirs=/usr/local/kafka/logs   
    
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    
    zookeeper.connect=node1:2181,node2:2181,node3:2181
    View Code

    补充:server.properties可选参数如下

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=19092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
    View Code

    注意:每个节点对应一个broker id,并和IP一一对应

    (3)启动服务

    # cd kafka/
    # bin/kafka-server-start.sh config/server.properties 
    # jps
    View Code

    (4)测试

    创建一个主题,并用node1作为消息生产者(发布者),node2、node3作为消息消费者(订阅者)。

    在node1节点上:

    #启动服务
    # bin/kafka-server-start.sh -daemon config/server.properties  
    # jps
    # 创建一个主题test:一个分区,两个副本
    # bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partition 1 --topic test   
    # 创建一个生产者(消息发布者)
    # bin/kafka-console-producer.sh --broker-list node1:9092 --topic test
    等待输入要发布的消息。。。

    在node2、node3节点上:

    # bin/kafka-server-start.sh -daemon config/server.properties 
    # 创建一个消费者(消息订阅者) # bin
    /kafka-console-consumer.sh --zookeeper node1:2181 --topic test --from beginning
    等待接收消息。。。

    在node1的发布频道里输入消息,可在node2、node3上接收到对应的消息。

    其他一些常用命令参考:

    查看主题
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    查看主题详情
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    删除主题
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
    生产者参数查看:bin/kafka-console-producer.sh
    消费者参数查看:bin/kafka-console-consumer.sh

    (5)补充:

    日志说明:

    server.log #kafka的运行日志
    state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
    controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

    zookeeper查看:

    # bin/zkCli.sh 
    ...
    [zk: localhost:2181(CONNECTED) 0] ls /
    [isr_change_notification, zookeeper, admin, consumers, cluster, config, controller, brokers, controller_epoch]
    每个节点上都可以通过zookeeper查看kafka的节点注册信息

    官方文档:http://kafka.apache.org/documentation.html

    参考:http://www.cnblogs.com/luotianshuai/p/5206662.html

  • 相关阅读:
    python的struct模块
    Linux程序设计学习笔记(独乐乐版)
    理解AndroidX
    Android中的样式和主题
    Android中Fragment的使用
    Android 中Dialog的使用
    直接在apk中添加资源的研究
    Android签名生成和互转
    简单扒一下Volley源码,扩展Volley生命周期
    获取android所有联系人信息
  • 原文地址:https://www.cnblogs.com/chinas/p/6708126.html
Copyright © 2011-2022 走看看