zoukankan      html  css  js  c++  java
  • kafka-2.11-0.11集群搭建

    kafka集群依赖于zookeeper,所以需要先搭建zookeeper集群,kafka默认自带了内建的zookeeper,建议使用自己外搭建的zookeeper,这样比较灵活并且解耦服务,同时也可以让其他需要zookeeper的服务使用。注意kafka-2.11-0.11版本与zookeeper-3.4.10.tar.gz对应,

     

    一、安装

    1、下载zookeeper:

    # wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

    生产环境使用集群至少需要三台服务器,官方建议至少3或5台,并且集群节点个数必须是奇数。需要java环境,jdk1.8+(自己先配置好,本文不涉及)。

    创建目录:

    #mkdir  /data

    #mkdir  /data/zookeeper/{data,logs}  -p

    #tar xf zookeeper-3.4.10.tar.gz

    #rm -rf zookeeper-3.4.10.tar.gz

    修改配置文件:

    创建配置文件zoo.cfg

    #cat  zoo.cfg

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data/zookeeper/data
    clientPort=2181
    maxClientCnxns=100
    autopurge.snapRetainCount=3
    autopurge.purgeInterval=24
    quorumListenOnAllIPs=true
    dataLogDir=/data/zookeeper/logs
    server.3=10.10.11.92:2888:3888
    server.6=10.10.9.29:2888:3888
    server.9=10.10.8.16:2888:3888 

    创建myid,用于唯一标示一个zk节点,在创建的/data/zookeeper/data目录下面创建:

    #echo 3  >  /data/zookeeper/data/myid

    将配置好的zookeeper服务整个分别copy到另外两台服务器上,并创建需要的目录。

    #mkdir  /data/zookeeper/{data,logs}  -p

    并在相应的data目录下创建各自的myid文件:

    #echo 6  >  /data/zookeeper/data/myid

    #echo 9  >  /data/zookeeper/data/myid

    切换到bin目录,分别启动zk服务:

    ./zkServer.sh start

    三台都启动完成后,发现报错了:

    2017-11-07 15:31:01,300 [myid:3] - WARN  [WorkerSender[myid=3]:QuorumCnxManager@588] - Cannot open channel to 9 at election address /10.10.8.16:3888
    java.net.ConnectException: Connection refused (Connection refused)

    排错:

    1、怀疑是防火墙的问题,这是内网啊,所以不是防火墙;

    2、端口被占用了,lsof  -i:3888,没有输出啊,所以端口也没占用;

    3、ip地址不是本机的网卡

    云服务器采用虚拟化的技术,监听的网卡是属于物理网关的网卡,而虚拟化机内部自然没有这个网卡。

    在配置文件添加一下参数,重启服务正常:

    quorumListenOnAllIPs=true

    2、查看zk状态

    #./zkServer.sh  status

    3、zookeepr常用的四字节命令:

    ZooKeeper四字命令
    功能描述
    conf 3.3.0版本引入的。打印出服务相关配置的详细信息。
    cons 3.3.0版本引入的。列出所有连接到这台服务器的客户端全部连接/会话详细信息。包括"接受/发送"的包数量、会话id、操作延迟、最后的操作执行等等信息。
    crst 3.3.0版本引入的。重置所有连接的连接和会话统计信息。
    dump 列出那些比较重要的会话和临时节点。这个命令只能在leader节点上有用。
    envi 打印出服务环境的详细信息。
    reqs 列出未经处理的请求
    ruok 测试服务是否处于正确状态。如果确实如此,那么服务返回"imok",否则不做任何相应。
    stat 输出关于性能和连接的客户端的列表。
    srst 重置服务器的统计。
    srvr 3.3.0版本引入的。列出连接服务器的详细信息
    wchs 3.3.0版本引入的。列出服务器watch的详细信息。
    wchc 3.3.0版本引入的。通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表。
    wchp 3.3.0版本引入的。通过路径列出服务器watch的详细信息。它输出一个与session相关的路径。
    mntr

    3.4.0版本引入的。输出可用于检测集群健康状态的变量列表

     

    #echo  mntr  | nc  ip  port

    ip:zk服务器的IP地址; port:zk服务的端口

    二、kafka集群

    将下载好的包上传到服务器,并解压到/data目录下:

    #tar xf  kafka_2.11-0.11.0.0.tgz

    只需要修改server.properties文件、consumer.properties、producer.properties文件

    server.properties文件主要注意三个地方:

    broker.id=0       #每个broker在集群中必须是唯一的,如有三个节点可以分别设置为(0、1、2)

    host.name=10.10.8.16      #host.name为个服务器的IP地址,根据自己的情况填写

    zookeeper.connect=10.10.8.16:2181,10.10.9.29:2181,10.10.11.92:2181

    #cat server.properties

    broker.id=0
    delete.topic.enable=true
    listeners=PLAINTEXT://10.10.89.219:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs
    num.partitions=4
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=2
    transaction.state.log.replication.factor=2
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.10.8.16:2181,10.10.9.29:2181,10.10.11.92:2181
    zookeeper.connection.timeout.ms=100000
    zookeeper.session.timeout.ms=50000
    group.initial.rebalance.delay.ms=0

    将配置好的服务分别copy到另外两台服务器上面:

    !!!注意修改broker.id、host.name,为各自服务器对应的。

    切换到bin目录,启动kafka服务:./kafka-server-start.sh -daemon ../config/server.properties &

    2、创建topic

    #./kafka-topics.sh --create --zookeeper 10.10.11.92:2181,10.10.8.16:2181,10.10.9.29:2181 --replication-factor 2 --partitions 9 --topic userlog

    参数解释:
    zookeeper:zookeepr集群的地址
    replication-factor:复制两份
    partitions:创建9个partition
    topic: topic名称

    使用 describe 命令来显示 topic 详情

    #./kafka-topics.sh --describe --zookeeper 10.10.11.92:2181,10.10.8.16:2181,10.10.9.29:2181 --topic test
    Topic:test    PartitionCount:1    ReplicationFactor:2    Configs:
        Topic: test    Partition: 0    Leader: 3    Replicas: 3,2    Isr: 3,2
    1Leader 是给定分区的节点编号,每个分区的部分数据会随机指定不同的节点
    2Replicas 是该日志会保存的复制
    3Isr 表示正在同步的复制
     

     

  • 相关阅读:
    无法卸载Visual Studio 2005,提示:"H:\vs\vs_setup.msi could not be opened"
    在两个DB的table之间同步数据
    用于标记系统是否需要重启动的注册表键值
    提高性能——存储过程最佳实践 [译自MSDN]
    几个常见的位运算问题
    [存档] 非递归后根遍历二叉树
    [存档] 用真值表设计非递归二叉树遍历算法
    补码
    卸载Google Chrome导致Outlook, Word不能打开超链接
    编程题: 将一个矩阵(二维数组)顺时针旋转90度
  • 原文地址:https://www.cnblogs.com/cuishuai/p/7800503.html
Copyright © 2011-2022 走看看