zoukankan      html  css  js  c++  java
  • kafka分布式集群搭建

    一、版本

    CentOS 7.5

    zookeeper-3.4.12

    kafka _2.12-1.1.0

    二、zookeeper安装

    1、下载解压zookeeper压缩包

     
    tar -zvxf zookeeper-3.4.12.tar.gz
     

    2、创建数据与日志文件夹

    mkdir /usr/local/zookeeper-3.4.12/data
    mkdir /usr/local/zookeeper-3.4.12/logs

    3、复制配置文件

    进入conf目录,复制zoo_sample.cfg

    cp zoo_sample.cfg zoo.cfg

    4、进入data目录,执行命令

    echo 1 > myid
    创建myid文件并输入值为1,依次在另外两台机器上执行同样的操作,myid的数值依次为2,3配置成功;

    5、修改配置文件

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/usr/local/zookeeper-3.4.12/data
    dataLogDir=/usr/local/zookeeper-3.4.12/logs
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    #集群服务器地址
    server.1=IP1:2888:3888
    server.2=IP2:2888:3888
    server.3=IP3:2888:3888

    6、启动zookeeper

    sh zkServer.sh start

    三 、kafka安装

    1、下载并解压kafka压缩包

    tar -zvxf  kafka_2.12-1.1.0.tgz

    2、修改配置文件

    打开kafka配置文件

    vim server.properties

    修改相关配置

    # 服务器Id,设置为唯一数字。三台服务器可分别设置为1、2、3
    broker.id=1
    
    #监听地址
    advertised.listeners=PLAINTEXT://IP地址:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    #kafka网络通信的线程数
    num.network.threads=3
    
    #kafka IO操作的线程数
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    #数据存储路径
    log.dirs=/tmp/kafka-logs
    
    #默认partition的数量
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    #集群状态下,为保证可用性,需要设置为大于1,这里设置为3
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=3
    
    ############################# Log Flush Policy #############################
    
    
    #日志保存时长
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=IP1:2181,IP2:2181,IP3:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    #group.initial.rebalance.delay.ms=0
    
    # 是否自动创建主题 flase 否  true 是
    auto.create.topics.enable=false
    
    ## 允许删除主题,默认是false
    delete.topic.enable=true

    3、启动kafka

    bin/kafka-server-start.sh -daemon config/server.properties &

    四、相关参数

    broker 服务端 配置

    message.max.bytes ( 默认:1M) – broker能接收消息的最大字节数,该值应该大于等于生产者的max.request.size,小于等于消费者的fetch.message.max.bytes,否则broker就会因为消费端无法使用这个消息而挂起。
    log.segment.bytes (默认:1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
    replica.fetch.max.bytes (默认::1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

    Consumer 消费者 配置

    fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

    Producer 生产者 配置

    buffer.memory(默认:32M)– 生产者缓冲区大小设置,如果缓冲区足够大,生产者可以一直写入,但并不代表消息被真正send;

    batch.size(默认:16384 byte)– 每个数据包的大小设置,数据包达到指定大小后就可以被发送了,缓冲区内会存在多个数据包;

    linger.ms –  如果数据包大小一直没有达到batch.size,设置最多等待多久,消息会发送出去;

    max.request.size(默认:1M) 生产者一次发送数据的最大大小,它的值要大于batch.size 的大小

    五、注意事项

    1、为保证所有分区可用,offsets.topic.replication.factor至少配置为3;

    2、关闭自动创建主题,同时尽量保证集群所有broker启动后,再开始客户端消费,否则无法保证partition及其副本均匀分布,影响高可用;

    3、集群启动后,可通过命令查看分区及其副本分布情况;

    bin/kafka-topics.sh --describe --zookeeper localhost:2182 --topic __consumer_offsets

    关注微信公众号,查看更多技术文章。

     

  • 相关阅读:
    __declspec(noinline)
    硬件遮挡查询
    #pragma pack(*) 与 __declspec(align(*))
    Interesting. 如何获取一个数组长度
    __declspec(novtable)
    如何将一个float的小数部分保存成RGBA4个8位的byte
    plain old C++ functions, base模板函数与特化的模板函数
    LeetCode 5: Longest Palindromic Substring
    LeetCode 335:Self Crossing 自交
    LeetCode 649:Dota2 Senate
  • 原文地址:https://www.cnblogs.com/dafanjoy/p/11394474.html
Copyright © 2011-2022 走看看